Опирается на правила: R-SQLC-BULK-1, R-SQLC-BULK-2, R-SQLC-BULK-3, R-SQLC-BULK-X1, R-SQLC-BULK-X2 из sqlc Style Guide → раздел 8. Bulk-операции.

Важно знать

  • Цикл for _, item := range items { q.InsertOrderItem(...) } — N round-trips для N строк; при 1000 строках разница с CopyFrom — порядок.
  • pgx.CopyFrom не парсирует SQL per-row: данные передаются бинарным протоколом COPY, скорость в 5–10× выше одиночных INSERT.
  • Порог выбора: >1000 строк — pgx.CopyFrom; умеренный объём (десятки–сотни) — sqlc batchexec.
  • pool.CopyFrom (не q.CopyFrom) — метод *pgxpool.Pool, поэтому pool хранится в репозитории наряду с *db.Queries.
  • Ошибки из batch (Batch*-методов sqlc) читаются через br.Close() — не через возврат метода Queue*.
  • Конкатенация SQL-строк с VALUES в Go-коде запрещена даже для bulk: SQL-инъекция и отсутствие параметризации.
  • Bulk-операции внутри транзакции работают штатно: pgx.CopyFrom принимает pgx.Tx, Batch* — через WithTx.

Bulk-вставка — частая задача в import-пайплайнах, batch-обработке событий и инициализации данных. Инструмент выбирается по объёму: протокол COPY (pgx.CopyFrom) выигрывает на больших пакетах, sqlc batchexec — на умеренных, когда нужна параметризованная SQL-семантика с возвратом результатов.

pgx.CopyFrom — протокол COPY для больших объёмов

R-SQLC-BULK-1 и R-SQLC-BULK-2: для вставки >500 строк предпочтителен pgx.CopyFrom. Вместо N SQL-запросов драйвер открывает один COPY-поток, передаёт данные бинарным протоколом и закрывает его. PostgreSQL принимает пакет целиком, без разбора отдельных INSERT-планов.

// adapters/out/persistence/postgres_order_repository.go

func (r *PostgresOrderRepository) BulkInsertItems(ctx context.Context, items []order.Item) error {
    rows := make([][]any, len(items))
    for i, item := range items {
        rows[i] = []any{item.ID, item.OrderID, item.ProductID, item.Quantity, item.Price}
    }
    _, err := r.pool.CopyFrom(
        ctx,
        pgx.Identifier{"order_items"},
        []string{"id", "order_id", "product_id", "quantity", "price"},
        pgx.CopyFromRows(rows),
    )
    if err != nil {
        return fmt.Errorf("bulk insert order items: %w", err)
    }
    return nil
}

pgx.Identifier — экранированный идентификатор таблицы: {"order_items"}"order_items", {"public", "order_items"}"public"."order_items". Строки в слайс []string передаются в том же порядке, что и колонки в [][]any.

Если нужен CopyFrom внутри транзакции, передаём pgx.Tx напрямую — pgx.Tx тоже реализует интерфейс с CopyFrom:

// core/order/handler/import_order_items_handler.go

func (h *ImportOrderItemsHandler) Handle(ctx context.Context, cmd ImportOrderItemsCommand) error {
    tx, err := h.pool.Begin(ctx)
    if err != nil {
        return fmt.Errorf("begin tx: %w", err)
    }
    defer tx.Rollback(ctx)

    repo := h.repo.WithTx(tx)
    if err := repo.BulkInsertItemsTx(ctx, tx, cmd.Items); err != nil {
        return fmt.Errorf("bulk insert: %w", err)
    }
    return tx.Commit(ctx)
}
// adapters/out/persistence/postgres_order_repository.go

func (r *PostgresOrderRepository) BulkInsertItemsTx(ctx context.Context, tx pgx.Tx, items []order.Item) error {
    rows := make([][]any, len(items))
    for i, item := range items {
        rows[i] = []any{item.ID, item.OrderID, item.ProductID, item.Quantity, item.Price}
    }
    _, err := tx.CopyFrom(
        ctx,
        pgx.Identifier{"order_items"},
        []string{"id", "order_id", "product_id", "quantity", "price"},
        pgx.CopyFromRows(rows),
    )
    if err != nil {
        return fmt.Errorf("bulk insert order items tx: %w", err)
    }
    return nil
}

Кастомный CopyFromSource для доменных объектов

pgx.CopyFromRows принимает [][]any — это простейший вариант. Для большего контроля над памятью (стриминг из источника без предварительного создания слайса) реализуем pgx.CopyFromSource:

// adapters/out/persistence/product_copy_source.go

type productCopySource struct {
    products []*product.Product
    idx      int
}

func (s *productCopySource) Next() bool {
    s.idx++
    return s.idx <= len(s.products)
}

func (s *productCopySource) Values() ([]any, error) {
    p := s.products[s.idx-1]
    return []any{p.ID, p.SKU, p.Name, p.Price, p.StockCount}, nil
}

func (s *productCopySource) Err() error { return nil }
func (r *PostgresProductRepository) BulkInsert(ctx context.Context, products []*product.Product) error {
    _, err := r.pool.CopyFrom(
        ctx,
        pgx.Identifier{"products"},
        []string{"id", "sku", "name", "price", "stock_count"},
        &productCopySource{products: products},
    )
    if err != nil {
        return fmt.Errorf("bulk insert products: %w", err)
    }
    return nil
}

CopyFromSource обрабатывает строки по одной — полезно, когда входной поток большой и создавать [][]any заранее расточительно.

sqlc batchexec — параметризованные batch-запросы

R-SQLC-BULK-3: для умеренного объёма (десятки–сотни строк) sqlc генерирует Batch*-методы из аннотации :batchexec. Они сохраняют SQL-семантику (параметры $1/$2, проверки ограничений per-row) и возвращают ошибки через br.Close().

-- db/queries/customers.sql

-- name: UpsertCustomer :batchexec
INSERT INTO customers (id, name, email, tier, updated_at)
VALUES ($1, $2, $3, $4, $5)
ON CONFLICT (id) DO UPDATE
    SET name       = EXCLUDED.name,
        email      = EXCLUDED.email,
        tier       = EXCLUDED.tier,
        updated_at = EXCLUDED.updated_at;

sqlc генерирует:

// db/batch.go (сгенерированный код, не редактировать)

type UpsertCustomerBatchResults struct { /* ... */ }

func (q *Queries) UpsertCustomer(ctx context.Context, arg []UpsertCustomerParams) *UpsertCustomerBatchResults

Репозиторий вызывает его так:

// adapters/out/persistence/postgres_customer_repository.go

func (r *PostgresCustomerRepository) BulkUpsert(ctx context.Context, customers []*customer.Customer) error {
    params := make([]db.UpsertCustomerParams, len(customers))
    for i, c := range customers {
        params[i] = db.UpsertCustomerParams{
            ID:        c.ID,
            Name:      c.Name,
            Email:     c.Email,
            Tier:      string(c.Tier),
            UpdatedAt: c.UpdatedAt,
        }
    }

    br := r.q.UpsertCustomer(ctx, params)
    if err := br.Close(); err != nil {
        return fmt.Errorf("bulk upsert customers: %w", err)
    }
    return nil
}

br.Close() — единственная точка, где batch сбрасывается на сервер и где возвращается первая встреченная ошибка. Если нужен результат каждой строки (:batchone / :batchmany), используем br.QueryRow или br.Query в цикле перед br.Close().

Ошибки per-row в batch

Когда batch содержит разнородные данные и нужно знать, какая именно строка упала:

func (r *PostgresCustomerRepository) BulkUpsertWithRowErrors(
    ctx context.Context,
    customers []*customer.Customer,
) ([]error, error) {
    params := make([]db.UpsertCustomerParams, len(customers))
    for i, c := range customers {
        params[i] = db.UpsertCustomerParams{
            ID:        c.ID,
            Name:      c.Name,
            Email:     c.Email,
            Tier:      string(c.Tier),
            UpdatedAt: c.UpdatedAt,
        }
    }

    br := r.q.UpsertCustomer(ctx, params)
    rowErrs := make([]error, len(customers))
    br.Exec(func(i int, err error) {
        if err == nil {
            return
        }
        var pgErr *pgconn.PgError
        if errors.As(err, &pgErr) && pgErr.Code == "23505" {
            rowErrs[i] = &customer.AlreadyExistsError{ID: customers[i].ID}
            return
        }
        rowErrs[i] = fmt.Errorf("batch row %d: %w", i, err)
    })
    if err := br.Close(); err != nil {
        return nil, fmt.Errorf("close batch: %w", err)
    }
    return rowErrs, nil
}

Этот вариант нужен редко — в большинстве случаев первая ошибка валидна как сигнал отказа всей операции. Partial-success обычно оформляется иначе — через pre-validation или idempotent upsert.

Выбор между CopyFrom и batchexec

Критерийpgx.CopyFromsqlc batchexec
Объём>500–1000 строкдесятки–сотни строк
Скорость5–10× быстреестандартная
SQL-логиканет WHERE/ON CONFLICTполный SQL
Возврат per-rowнетесть (:batchone)
Ошибки per-rowвесь batch откатываетсядоступны через итерацию

CopyFrom в транзакции ведёт себя как обычный DML: при ошибке весь batch откатится вместе с транзакцией. batchexec без транзакции — каждый запрос коммитится отдельно, partial success возможен.

Что запрещено

АнтипаттернПравилоЧто взамен
for _, item := range items { q.InsertOrderItem(ctx, ...) }R-SQLC-BULK-X1pgx.CopyFrom или sqlc batchexec
Конкатенация "INSERT INTO ... VALUES " + строки в Go-кодеR-SQLC-BULK-X2Параметризованные запросы через $1/$2 в .sql-файлах
pgx.CopyFrom с неэкранированным именем таблицы строкойR-SQLC-BULK-X2pgx.Identifier{"order_items"}
Игнорирование br.Close() после Batch*R-SQLC-BULK-X1if err := br.Close(); err != nil { ... }

Куда дальше

  • Транзакции в Go — pgx.Tx на handler'е — как CopyFrom и batchexec вписываются в границу транзакции Handler.
  • Repository pattern в Go — sqlc + pgx/v5 — почему pool хранится рядом с *db.Queries и нужен именно для bulk-операций.
  • Маппинг sqlc ↔ domain в Go — как формировать []db.UpsertCustomerParams из доменных объектов без бизнес-логики в репозитории.
  • PostgreSQL: массовая запись — COPY vs INSERT, autovacuum и fill factor при bulk-паттернах.