Опирается на правила: R-CQRS-SYNC-1R-CQRS-SYNC-5 и R-CQRS-SYNC-X1R-CQRS-SYNC-X3 из CQRS Style Guide → раздел 5. Синхронизация через события.

Важно знать

  • Outbox-запись и изменение агрегата идут в одной pgx.Tx — атомарность гарантирована; Kafka-недоступность не теряет событие.
  • Relay — отдельная горутина, SELECT … FOR UPDATE SKIP LOCKED LIMIT N из outbox; segmentio/kafka-go producer.
  • Idempotent consumer обязателен (R-CQRS-SYNC-2): at-least-once Kafka означает дубли. Защита: processed_event-таблица или idempotent UPDATE … WHERE version < $new.
  • При первом запуске или потере read-model — batch-rebuild из write-store, не ожидание событий из Kafka.
  • Eventual consistency декларируется в OpenAPI (заголовок или description); клиент не должен угадывать задержку.
  • Read-your-writes при необходимости: version-токен от command, polling или отдельный endpoint без CQRS-split.
  • Синхронный INSERT в order_summary внутри command-транзакции запрещён (R-CQRS-SYNC-X1).
  • PG-триггеры как sync-механизм запрещены (R-CQRS-SYNC-X2).
  • Payload события — явный struct, не sqlc-тип write-схемы (R-CQRS-SYNC-X3).

Сердце CQRS — не разделение моделей, а способ их связать. Когда write и read физически разнесены, нужен надёжный механизм передачи изменений. Outbox + Kafka — стандартная связка; детали ниже — правильное применение в Go-стеке.

Outbox: атомарность с агрегатом

R-CQRS-SYNC-1. Outbox-адаптер пишет событие в ту же pgx.Tx, что и изменение агрегата в command-handler:

// adapters/out/outbox/order_outbox_repository.go
package outbox

import (
    "context"
    "encoding/json"
    "fmt"

    "github.com/jackc/pgx/v5"
)

type OrderOutboxRepository struct{}

func (r *OrderOutboxRepository) Enqueue(ctx context.Context, tx pgx.Tx, evt OrderConfirmedEvent) error {
    payload, err := json.Marshal(evt)
    if err != nil {
        return fmt.Errorf("marshal OrderConfirmed: %w", err)
    }
    _, err = tx.Exec(ctx,
        `INSERT INTO outbox (event_type, payload, created_at)
         VALUES ($1, $2, now())`,
        "order.confirmed", payload,
    )
    return err
}

Схема outbox-таблицы:

CREATE TABLE outbox (
    id          bigserial PRIMARY KEY,
    event_type  text        NOT NULL,
    payload     jsonb       NOT NULL,
    created_at  timestamptz NOT NULL DEFAULT now(),
    published   boolean     NOT NULL DEFAULT false
);

Последовательность в command-handler:

pgx.Tx.Begin()
  UPDATE orders SET status = 'confirmed' WHERE id = $1
  INSERT INTO outbox (event_type, payload) VALUES ('order.confirmed', $2)
pgx.Tx.Commit()

Пока строка в outbox — событие дойдёт до Kafka (relay ретраит). Kafka-сбой не теряет событие; rollback транзакции откатывает и агрегат, и outbox-запись атомарно.

Relay-горутина

// adapters/out/outbox/relay.go
package outbox

import (
    "context"
    "encoding/json"
    "time"

    "github.com/jackc/pgx/v5/pgxpool"
    "github.com/segmentio/kafka-go"
    "log/slog"
)

type Relay struct {
    db       *pgxpool.Pool
    writer   *kafka.Writer
    interval time.Duration
}

func (r *Relay) Run(ctx context.Context) {
    ticker := time.NewTicker(r.interval)
    defer ticker.Stop()
    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            if err := r.publishBatch(ctx); err != nil {
                slog.ErrorContext(ctx, "outbox relay error", "err", err)
            }
        }
    }
}

func (r *Relay) publishBatch(ctx context.Context) error {
    tx, err := r.db.Begin(ctx)
    if err != nil {
        return err
    }
    defer tx.Rollback(ctx)

    rows, err := tx.Query(ctx,
        `SELECT id, event_type, payload FROM outbox
         WHERE published = false
         ORDER BY id
         FOR UPDATE SKIP LOCKED
         LIMIT 100`,
    )
    if err != nil {
        return err
    }
    var msgs []kafka.Message
    var ids []int64
    for rows.Next() {
        var id int64
        var evtType string
        var payload []byte
        if err := rows.Scan(&id, &evtType, &payload); err != nil {
            return err
        }
        msgs = append(msgs, kafka.Message{Topic: evtType, Value: payload})
        ids = append(ids, id)
    }
    rows.Close()
    if len(msgs) == 0 {
        return nil
    }
    if err := r.writer.WriteMessages(ctx, msgs...); err != nil {
        return err
    }
    _, err = tx.Exec(ctx,
        `UPDATE outbox SET published = true WHERE id = ANY($1)`, ids,
    )
    if err != nil {
        return err
    }
    return tx.Commit(ctx)
}

Event payload — независимый struct

R-CQRS-SYNC-X3. Payload события — отдельный struct, не sqlc-тип write-схемы. Любой ALTER TABLE orders не должен ломать consumer:

// core/order/event/order_confirmed.go
package event

import "time"

type OrderConfirmedEvent struct {
    EventID          string    `json:"event_id"`
    OrderID          string    `json:"order_id"`
    CustomerID       string    `json:"customer_id"`
    TotalAmount      int64     `json:"total_amount"`
    ConfirmedAt      time.Time `json:"confirmed_at"`
    AggregateVersion int64     `json:"aggregate_version"`
}

Payload не содержит db.Order или любой другой sqlc-генерированный тип. Отдельный struct версионируется явно (OrderConfirmedEventV2) без поломки существующих consumers.

Idempotent consumer

R-CQRS-SYNC-2. At-least-once гарантия Kafka: одно сообщение может прийти дважды.

Вариант 1 — processed_event-таблица

CREATE TABLE processed_event (
    event_id     text        PRIMARY KEY,
    consumer     text        NOT NULL,
    processed_at timestamptz NOT NULL DEFAULT now()
);
// adapters/in/kafka/order_summary_consumer.go
package kafka

func (c *OrderSummaryConsumer) handle(ctx context.Context, msg kafka.Message) error {
    var evt event.OrderConfirmedEvent
    if err := json.Unmarshal(msg.Value, &evt); err != nil {
        return fmt.Errorf("unmarshal OrderConfirmed: %w", err)
    }

    tx, err := c.db.BeginTx(ctx, pgx.TxOptions{})
    if err != nil {
        return fmt.Errorf("begin tx: %w", err)
    }
    defer tx.Rollback(ctx)

    var alreadyProcessed bool
    _ = tx.QueryRow(ctx,
        `SELECT EXISTS(SELECT 1 FROM processed_event WHERE event_id = $1 AND consumer = $2)`,
        evt.EventID, "order-summary-projector",
    ).Scan(&alreadyProcessed)
    if alreadyProcessed {
        slog.DebugContext(ctx, "duplicate event skipped", "event_id", evt.EventID)
        return nil
    }

    if err := c.summaries.Upsert(ctx, tx, toSummary(evt)); err != nil {
        return fmt.Errorf("upsert order summary: %w", err)
    }
    _, err = tx.Exec(ctx,
        `INSERT INTO processed_event (event_id, consumer) VALUES ($1, $2)`,
        evt.EventID, "order-summary-projector",
    )
    if err != nil {
        return fmt.Errorf("mark processed: %w", err)
    }
    return tx.Commit(ctx)
}

Вариант 2 — idempotent UPDATE по version

Подходит когда события одного агрегата идут строго в порядке (product.updated по product_id на одной партиции Kafka):

// adapters/in/kafka/product_summary_consumer.go
func (c *ProductSummaryConsumer) handle(ctx context.Context, msg kafka.Message) error {
    var evt event.ProductUpdatedEvent
    if err := json.Unmarshal(msg.Value, &evt); err != nil {
        return fmt.Errorf("unmarshal ProductUpdated: %w", err)
    }

    tx, err := c.db.BeginTx(ctx, pgx.TxOptions{})
    if err != nil {
        return fmt.Errorf("begin tx: %w", err)
    }
    defer tx.Rollback(ctx)

    tag, err := tx.Exec(ctx,
        `UPDATE product_summary
         SET name = $1, price = $2, version = $3, updated_at = now()
         WHERE product_id = $4 AND version < $3`,
        evt.Name, evt.Price, evt.AggregateVersion, evt.ProductID,
    )
    if err != nil {
        return fmt.Errorf("update product_summary: %w", err)
    }
    if tag.RowsAffected() == 0 {
        slog.DebugContext(ctx, "stale or duplicate event skipped",
            "product_id", evt.ProductID,
            "version", evt.AggregateVersion,
        )
    }
    return tx.Commit(ctx)
}

SQL WHERE version < $new делает повторную доставку безопасной: устаревшее событие не перезапишет более свежее состояние.

Bootstrap и disaster recovery

R-CQRS-SYNC-3. При первом запуске нового read-store или после потери — batch-rebuild из write-store, не ожидание событий из Kafka:

// cmd/rebuild-summaries/main.go
package main

import (
    "context"
    "log/slog"
    "os"

    "github.com/jackc/pgx/v5/pgxpool"
)

func main() {
    ctx := context.Background()
    db, _ := pgxpool.New(ctx, os.Getenv("DATABASE_URL"))
    defer db.Close()

    if err := rebuildOrderSummaries(ctx, db); err != nil {
        slog.ErrorContext(ctx, "rebuild failed", "err", err)
        os.Exit(1)
    }
    slog.InfoContext(ctx, "rebuild complete")
}

func rebuildOrderSummaries(ctx context.Context, db *pgxpool.Pool) error {
    var offset int
    for {
        rows, err := db.Query(ctx,
            `SELECT id, customer_id, customer_name, total_amount, status, item_count, created_at
             FROM orders
             ORDER BY id
             LIMIT 500 OFFSET $1`,
            offset,
        )
        if err != nil {
            return err
        }
        var summaries []OrderSummaryDTO
        for rows.Next() {
            var s OrderSummaryDTO
            if err := rows.Scan(
                &s.OrderID, &s.CustomerID, &s.CustomerName,
                &s.TotalAmount, &s.Status, &s.ItemCount, &s.CreatedAt,
            ); err != nil {
                return err
            }
            summaries = append(summaries, s)
        }
        rows.Close()
        if len(summaries) == 0 {
            return nil
        }
        if err := upsertBatch(ctx, db, summaries); err != nil {
            return err
        }
        offset += len(summaries)
        slog.InfoContext(ctx, "batch rebuilt", "offset", offset)
    }
}

Rebuild — отдельная CLI-команда (cmd/rebuild-summaries), не часть основного сервера. Запускается вручную или как initContainer при деплое нового read-store.

Eventual consistency в OpenAPI (code-first)

R-CQRS-SYNC-4. В Go (code-first API через chi) eventual consistency декларируется в комментарии handler-а или через X-Data-Freshness-заголовок в ответе:

// edge/handler/order_summary_handler.go

// Handle GET /orders/{id}/summary
//
// Возвращает read-проекцию заказа.
// X-Data-Freshness: eventual — обновляется асинхронно через Kafka, задержка ≤ 2s.
// Для immediate consistency используйте GET /orders/{id} (из write-store).
func (h *OrderSummaryHandler) Handle(w http.ResponseWriter, r *http.Request) {
    orderID := chi.URLParam(r, "id")
    summary, err := h.handler.Handle(r.Context(), query.GetOrderSummary{OrderID: orderID})
    if err != nil {
        httperr.Write(w, err)
        return
    }
    w.Header().Set("X-Data-Freshness", "eventual")
    render.JSON(w, r, summary)
}

Клиент видит заголовок и знает: прямо после POST /orders GET /orders/{id}/summary может вернуть 404 или предыдущее состояние. Это архитектурное свойство, не баг.

Read-your-writes

R-CQRS-SYNC-5. Три варианта, в порядке возрастания инвазивности:

Version-токен от command

Command возвращает AggregateVersion. UI ждёт, пока query-handler не вернёт summary с version ≥ N:

// command-handler возвращает version
type ConfirmOrderResult struct {
    OrderID          string
    AggregateVersion int64
}

// query содержит минимальный version
type GetOrderSummary struct {
    OrderID    string
    MinVersion int64
}
// query-handler с polling
func (h *GetOrderSummaryHandler) Handle(ctx context.Context, q query.GetOrderSummary) (view.OrderSummaryDTO, error) {
    deadline := time.Now().Add(3 * time.Second)
    for time.Now().Before(deadline) {
        summary, err := h.views.SummaryByID(ctx, q.OrderID)
        if err != nil {
            return view.OrderSummaryDTO{}, err
        }
        if summary.Version >= q.MinVersion {
            return summary, nil
        }
        time.Sleep(100 * time.Millisecond)
    }
    return view.OrderSummaryDTO{}, apperr.New(apperr.Timeout, "read-model not yet updated")
}

Подходит для критичных сценариев с явной гарантией; p99 latency query растёт до minVersion-задержки.

Два endpoint-а с явным trade-off

Самое простое решение: GET из write-store для immediate consistency, GET из read-model для оптимизированного query:

GET /orders/{id}          — из write-store, immediate consistency
GET /orders/{id}/summary  — из read-model, eventual consistency, ≤ 2s

Клиент выбирает сам. В большинстве сценариев это правильное архитектурное решение — два маршрута явно показывают trade-off.

Сборка consumer через chi-сервер

// adapters/in/kafka/consumer_runner.go
package kafka

import (
    "context"
    "encoding/json"
    "log/slog"

    "github.com/segmentio/kafka-go"
)

type OrderSummaryConsumerRunner struct {
    reader   *kafka.Reader
    consumer *OrderSummaryConsumer
}

func (r *OrderSummaryConsumerRunner) Run(ctx context.Context) {
    for {
        msg, err := r.reader.FetchMessage(ctx)
        if err != nil {
            if ctx.Err() != nil {
                return
            }
            slog.ErrorContext(ctx, "kafka fetch error", "err", err)
            continue
        }
        if err := r.consumer.handle(ctx, msg); err != nil {
            slog.ErrorContext(ctx, "consumer error, skipping", "err", err, "offset", msg.Offset)
        }
        _ = r.reader.CommitMessages(ctx, msg)
    }
}

Consumer-goroutine запускается рядом с HTTP-сервером в main.go; graceful shutdown через context.WithCancel.

Что запрещено

АнтипаттернПравилоЧто взамен
tx.Exec("INSERT INTO order_summary …") внутри command-транзакцииR-CQRS-SYNC-X1Outbox-запись в той же pgx.Tx, relay публикует в Kafka, consumer обновляет read-model
PG-триггер AFTER UPDATE ON orders → UPDATE order_summaryR-CQRS-SYNC-X2Явный Go-consumer на Kafka-событие: трассируем, тестируем, масштабируем
payload = db.Order{…} — sqlc-тип write-схемы как event payloadR-CQRS-SYNC-X3Отдельный OrderConfirmedEvent struct с EventID и AggregateVersion
Consumer без дедупа (processed_event или version-guard)R-CQRS-SYNC-2processed_event-таблица или UPDATE … WHERE version < $new
Eventual consistency не задекларирована (X-Data-Freshness / OpenAPI description)R-CQRS-SYNC-4Явный заголовок или описание в OpenAPI-спецификации
Ожидание Kafka-событий при bootstrap (пустой read-store)R-CQRS-SYNC-3cmd/rebuild-* — CLI-команда, batch-rebuild из write-store

Куда дальше

  • Command side — как outbox-событие регистрируется в write-handler через pgx.Tx.
  • Query side — read-only транзакция pgx и <X>ViewRepository.
  • Read-model — где хранить проекцию: PG-таблица, Redis, ElasticSearch.
  • Уровень и эволюция — Tier 2 → 3 → 4: от маркеров к event-driven read-model.
  • Когда CQRS оправдан — когда lightweight достаточно, когда нужен outbox+Kafka.