Опирается на правила:
R-SQLC-BULK-1,R-SQLC-BULK-2,R-SQLC-BULK-3,R-SQLC-BULK-X1,R-SQLC-BULK-X2из sqlc Style Guide → раздел 8. Bulk-операции.
Важно знать
- Цикл
for _, item := range items { q.InsertOrderItem(...) }— N round-trips для N строк; при 1000 строках разница сCopyFrom— порядок.pgx.CopyFromне парсирует SQL per-row: данные передаются бинарным протоколом COPY, скорость в 5–10× выше одиночных INSERT.- Порог выбора: >1000 строк —
pgx.CopyFrom; умеренный объём (десятки–сотни) —sqlc batchexec.pool.CopyFrom(неq.CopyFrom) — метод*pgxpool.Pool, поэтомуpoolхранится в репозитории наряду с*db.Queries.- Ошибки из batch (
Batch*-методов sqlc) читаются черезbr.Close()— не через возврат методаQueue*.- Конкатенация SQL-строк с VALUES в Go-коде запрещена даже для bulk: SQL-инъекция и отсутствие параметризации.
- Bulk-операции внутри транзакции работают штатно:
pgx.CopyFromпринимаетpgx.Tx,Batch*— черезWithTx.
Bulk-вставка — частая задача в import-пайплайнах, batch-обработке событий и инициализации данных. Инструмент выбирается по объёму: протокол COPY (pgx.CopyFrom) выигрывает на больших пакетах, sqlc batchexec — на умеренных, когда нужна параметризованная SQL-семантика с возвратом результатов.
pgx.CopyFrom — протокол COPY для больших объёмов
R-SQLC-BULK-1 и R-SQLC-BULK-2: для вставки >500 строк предпочтителен pgx.CopyFrom. Вместо N SQL-запросов драйвер открывает один COPY-поток, передаёт данные бинарным протоколом и закрывает его. PostgreSQL принимает пакет целиком, без разбора отдельных INSERT-планов.
// adapters/out/persistence/postgres_order_repository.go
func (r *PostgresOrderRepository) BulkInsertItems(ctx context.Context, items []order.Item) error {
rows := make([][]any, len(items))
for i, item := range items {
rows[i] = []any{item.ID, item.OrderID, item.ProductID, item.Quantity, item.Price}
}
_, err := r.pool.CopyFrom(
ctx,
pgx.Identifier{"order_items"},
[]string{"id", "order_id", "product_id", "quantity", "price"},
pgx.CopyFromRows(rows),
)
if err != nil {
return fmt.Errorf("bulk insert order items: %w", err)
}
return nil
}
pgx.Identifier — экранированный идентификатор таблицы: {"order_items"} → "order_items", {"public", "order_items"} → "public"."order_items". Строки в слайс []string передаются в том же порядке, что и колонки в [][]any.
Если нужен CopyFrom внутри транзакции, передаём pgx.Tx напрямую — pgx.Tx тоже реализует интерфейс с CopyFrom:
// core/order/handler/import_order_items_handler.go
func (h *ImportOrderItemsHandler) Handle(ctx context.Context, cmd ImportOrderItemsCommand) error {
tx, err := h.pool.Begin(ctx)
if err != nil {
return fmt.Errorf("begin tx: %w", err)
}
defer tx.Rollback(ctx)
repo := h.repo.WithTx(tx)
if err := repo.BulkInsertItemsTx(ctx, tx, cmd.Items); err != nil {
return fmt.Errorf("bulk insert: %w", err)
}
return tx.Commit(ctx)
}
// adapters/out/persistence/postgres_order_repository.go
func (r *PostgresOrderRepository) BulkInsertItemsTx(ctx context.Context, tx pgx.Tx, items []order.Item) error {
rows := make([][]any, len(items))
for i, item := range items {
rows[i] = []any{item.ID, item.OrderID, item.ProductID, item.Quantity, item.Price}
}
_, err := tx.CopyFrom(
ctx,
pgx.Identifier{"order_items"},
[]string{"id", "order_id", "product_id", "quantity", "price"},
pgx.CopyFromRows(rows),
)
if err != nil {
return fmt.Errorf("bulk insert order items tx: %w", err)
}
return nil
}
Кастомный CopyFromSource для доменных объектов
pgx.CopyFromRows принимает [][]any — это простейший вариант. Для большего контроля над памятью (стриминг из источника без предварительного создания слайса) реализуем pgx.CopyFromSource:
// adapters/out/persistence/product_copy_source.go
type productCopySource struct {
products []*product.Product
idx int
}
func (s *productCopySource) Next() bool {
s.idx++
return s.idx <= len(s.products)
}
func (s *productCopySource) Values() ([]any, error) {
p := s.products[s.idx-1]
return []any{p.ID, p.SKU, p.Name, p.Price, p.StockCount}, nil
}
func (s *productCopySource) Err() error { return nil }
func (r *PostgresProductRepository) BulkInsert(ctx context.Context, products []*product.Product) error {
_, err := r.pool.CopyFrom(
ctx,
pgx.Identifier{"products"},
[]string{"id", "sku", "name", "price", "stock_count"},
&productCopySource{products: products},
)
if err != nil {
return fmt.Errorf("bulk insert products: %w", err)
}
return nil
}
CopyFromSource обрабатывает строки по одной — полезно, когда входной поток большой и создавать [][]any заранее расточительно.
sqlc batchexec — параметризованные batch-запросы
R-SQLC-BULK-3: для умеренного объёма (десятки–сотни строк) sqlc генерирует Batch*-методы из аннотации :batchexec. Они сохраняют SQL-семантику (параметры $1/$2, проверки ограничений per-row) и возвращают ошибки через br.Close().
-- db/queries/customers.sql
-- name: UpsertCustomer :batchexec
INSERT INTO customers (id, name, email, tier, updated_at)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (id) DO UPDATE
SET name = EXCLUDED.name,
email = EXCLUDED.email,
tier = EXCLUDED.tier,
updated_at = EXCLUDED.updated_at;
sqlc генерирует:
// db/batch.go (сгенерированный код, не редактировать)
type UpsertCustomerBatchResults struct { /* ... */ }
func (q *Queries) UpsertCustomer(ctx context.Context, arg []UpsertCustomerParams) *UpsertCustomerBatchResults
Репозиторий вызывает его так:
// adapters/out/persistence/postgres_customer_repository.go
func (r *PostgresCustomerRepository) BulkUpsert(ctx context.Context, customers []*customer.Customer) error {
params := make([]db.UpsertCustomerParams, len(customers))
for i, c := range customers {
params[i] = db.UpsertCustomerParams{
ID: c.ID,
Name: c.Name,
Email: c.Email,
Tier: string(c.Tier),
UpdatedAt: c.UpdatedAt,
}
}
br := r.q.UpsertCustomer(ctx, params)
if err := br.Close(); err != nil {
return fmt.Errorf("bulk upsert customers: %w", err)
}
return nil
}
br.Close() — единственная точка, где batch сбрасывается на сервер и где возвращается первая встреченная ошибка. Если нужен результат каждой строки (:batchone / :batchmany), используем br.QueryRow или br.Query в цикле перед br.Close().
Ошибки per-row в batch
Когда batch содержит разнородные данные и нужно знать, какая именно строка упала:
func (r *PostgresCustomerRepository) BulkUpsertWithRowErrors(
ctx context.Context,
customers []*customer.Customer,
) ([]error, error) {
params := make([]db.UpsertCustomerParams, len(customers))
for i, c := range customers {
params[i] = db.UpsertCustomerParams{
ID: c.ID,
Name: c.Name,
Email: c.Email,
Tier: string(c.Tier),
UpdatedAt: c.UpdatedAt,
}
}
br := r.q.UpsertCustomer(ctx, params)
rowErrs := make([]error, len(customers))
br.Exec(func(i int, err error) {
if err == nil {
return
}
var pgErr *pgconn.PgError
if errors.As(err, &pgErr) && pgErr.Code == "23505" {
rowErrs[i] = &customer.AlreadyExistsError{ID: customers[i].ID}
return
}
rowErrs[i] = fmt.Errorf("batch row %d: %w", i, err)
})
if err := br.Close(); err != nil {
return nil, fmt.Errorf("close batch: %w", err)
}
return rowErrs, nil
}
Этот вариант нужен редко — в большинстве случаев первая ошибка валидна как сигнал отказа всей операции. Partial-success обычно оформляется иначе — через pre-validation или idempotent upsert.
Выбор между CopyFrom и batchexec
| Критерий | pgx.CopyFrom | sqlc batchexec |
|---|---|---|
| Объём | >500–1000 строк | десятки–сотни строк |
| Скорость | 5–10× быстрее | стандартная |
| SQL-логика | нет WHERE/ON CONFLICT | полный SQL |
| Возврат per-row | нет | есть (:batchone) |
| Ошибки per-row | весь batch откатывается | доступны через итерацию |
CopyFrom в транзакции ведёт себя как обычный DML: при ошибке весь batch откатится вместе с транзакцией. batchexec без транзакции — каждый запрос коммитится отдельно, partial success возможен.
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
for _, item := range items { q.InsertOrderItem(ctx, ...) } | R-SQLC-BULK-X1 | pgx.CopyFrom или sqlc batchexec |
Конкатенация "INSERT INTO ... VALUES " + строки в Go-коде | R-SQLC-BULK-X2 | Параметризованные запросы через $1/$2 в .sql-файлах |
pgx.CopyFrom с неэкранированным именем таблицы строкой | R-SQLC-BULK-X2 | pgx.Identifier{"order_items"} |
Игнорирование br.Close() после Batch* | R-SQLC-BULK-X1 | if err := br.Close(); err != nil { ... } |
Куда дальше
- Транзакции в Go — pgx.Tx на handler'е — как
CopyFromиbatchexecвписываются в границу транзакции Handler. - Repository pattern в Go — sqlc + pgx/v5 — почему
poolхранится рядом с*db.Queriesи нужен именно для bulk-операций. - Маппинг sqlc ↔ domain в Go — как формировать
[]db.UpsertCustomerParamsиз доменных объектов без бизнес-логики в репозитории. - PostgreSQL: массовая запись — COPY vs INSERT, autovacuum и fill factor при bulk-паттернах.