Опирается на правила:
R-CQRS-SYNC-1…R-CQRS-SYNC-5иR-CQRS-SYNC-X1…R-CQRS-SYNC-X3из CQRS Style Guide → раздел 5. Синхронизация через события.
Важно знать
- Outbox-запись и изменение агрегата идут в одной
pgx.Tx— атомарность гарантирована; Kafka-недоступность не теряет событие.- Relay — отдельная горутина,
SELECT … FOR UPDATE SKIP LOCKED LIMIT Nиз outbox;segmentio/kafka-goproducer.- Idempotent consumer обязателен (
R-CQRS-SYNC-2): at-least-once Kafka означает дубли. Защита:processed_event-таблица или idempotentUPDATE … WHERE version < $new.- При первом запуске или потере read-model — batch-rebuild из write-store, не ожидание событий из Kafka.
- Eventual consistency декларируется в OpenAPI (заголовок или
description); клиент не должен угадывать задержку.- Read-your-writes при необходимости: version-токен от command, polling или отдельный endpoint без CQRS-split.
- Синхронный
INSERTвorder_summaryвнутри command-транзакции запрещён (R-CQRS-SYNC-X1).- PG-триггеры как sync-механизм запрещены (
R-CQRS-SYNC-X2).- Payload события — явный
struct, неsqlc-тип write-схемы (R-CQRS-SYNC-X3).
Сердце CQRS — не разделение моделей, а способ их связать. Когда write и read физически разнесены, нужен надёжный механизм передачи изменений. Outbox + Kafka — стандартная связка; детали ниже — правильное применение в Go-стеке.
Outbox: атомарность с агрегатом
R-CQRS-SYNC-1. Outbox-адаптер пишет событие в ту же pgx.Tx, что и изменение агрегата в command-handler:
// adapters/out/outbox/order_outbox_repository.go
package outbox
import (
"context"
"encoding/json"
"fmt"
"github.com/jackc/pgx/v5"
)
type OrderOutboxRepository struct{}
func (r *OrderOutboxRepository) Enqueue(ctx context.Context, tx pgx.Tx, evt OrderConfirmedEvent) error {
payload, err := json.Marshal(evt)
if err != nil {
return fmt.Errorf("marshal OrderConfirmed: %w", err)
}
_, err = tx.Exec(ctx,
`INSERT INTO outbox (event_type, payload, created_at)
VALUES ($1, $2, now())`,
"order.confirmed", payload,
)
return err
}
Схема outbox-таблицы:
CREATE TABLE outbox (
id bigserial PRIMARY KEY,
event_type text NOT NULL,
payload jsonb NOT NULL,
created_at timestamptz NOT NULL DEFAULT now(),
published boolean NOT NULL DEFAULT false
);
Последовательность в command-handler:
pgx.Tx.Begin()
UPDATE orders SET status = 'confirmed' WHERE id = $1
INSERT INTO outbox (event_type, payload) VALUES ('order.confirmed', $2)
pgx.Tx.Commit()
Пока строка в outbox — событие дойдёт до Kafka (relay ретраит). Kafka-сбой не теряет событие; rollback транзакции откатывает и агрегат, и outbox-запись атомарно.
Relay-горутина
// adapters/out/outbox/relay.go
package outbox
import (
"context"
"encoding/json"
"time"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/segmentio/kafka-go"
"log/slog"
)
type Relay struct {
db *pgxpool.Pool
writer *kafka.Writer
interval time.Duration
}
func (r *Relay) Run(ctx context.Context) {
ticker := time.NewTicker(r.interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if err := r.publishBatch(ctx); err != nil {
slog.ErrorContext(ctx, "outbox relay error", "err", err)
}
}
}
}
func (r *Relay) publishBatch(ctx context.Context) error {
tx, err := r.db.Begin(ctx)
if err != nil {
return err
}
defer tx.Rollback(ctx)
rows, err := tx.Query(ctx,
`SELECT id, event_type, payload FROM outbox
WHERE published = false
ORDER BY id
FOR UPDATE SKIP LOCKED
LIMIT 100`,
)
if err != nil {
return err
}
var msgs []kafka.Message
var ids []int64
for rows.Next() {
var id int64
var evtType string
var payload []byte
if err := rows.Scan(&id, &evtType, &payload); err != nil {
return err
}
msgs = append(msgs, kafka.Message{Topic: evtType, Value: payload})
ids = append(ids, id)
}
rows.Close()
if len(msgs) == 0 {
return nil
}
if err := r.writer.WriteMessages(ctx, msgs...); err != nil {
return err
}
_, err = tx.Exec(ctx,
`UPDATE outbox SET published = true WHERE id = ANY($1)`, ids,
)
if err != nil {
return err
}
return tx.Commit(ctx)
}
Event payload — независимый struct
R-CQRS-SYNC-X3. Payload события — отдельный struct, не sqlc-тип write-схемы. Любой ALTER TABLE orders не должен ломать consumer:
// core/order/event/order_confirmed.go
package event
import "time"
type OrderConfirmedEvent struct {
EventID string `json:"event_id"`
OrderID string `json:"order_id"`
CustomerID string `json:"customer_id"`
TotalAmount int64 `json:"total_amount"`
ConfirmedAt time.Time `json:"confirmed_at"`
AggregateVersion int64 `json:"aggregate_version"`
}
Payload не содержит db.Order или любой другой sqlc-генерированный тип. Отдельный struct версионируется явно (OrderConfirmedEventV2) без поломки существующих consumers.
Idempotent consumer
R-CQRS-SYNC-2. At-least-once гарантия Kafka: одно сообщение может прийти дважды.
Вариант 1 — processed_event-таблица
CREATE TABLE processed_event (
event_id text PRIMARY KEY,
consumer text NOT NULL,
processed_at timestamptz NOT NULL DEFAULT now()
);
// adapters/in/kafka/order_summary_consumer.go
package kafka
func (c *OrderSummaryConsumer) handle(ctx context.Context, msg kafka.Message) error {
var evt event.OrderConfirmedEvent
if err := json.Unmarshal(msg.Value, &evt); err != nil {
return fmt.Errorf("unmarshal OrderConfirmed: %w", err)
}
tx, err := c.db.BeginTx(ctx, pgx.TxOptions{})
if err != nil {
return fmt.Errorf("begin tx: %w", err)
}
defer tx.Rollback(ctx)
var alreadyProcessed bool
_ = tx.QueryRow(ctx,
`SELECT EXISTS(SELECT 1 FROM processed_event WHERE event_id = $1 AND consumer = $2)`,
evt.EventID, "order-summary-projector",
).Scan(&alreadyProcessed)
if alreadyProcessed {
slog.DebugContext(ctx, "duplicate event skipped", "event_id", evt.EventID)
return nil
}
if err := c.summaries.Upsert(ctx, tx, toSummary(evt)); err != nil {
return fmt.Errorf("upsert order summary: %w", err)
}
_, err = tx.Exec(ctx,
`INSERT INTO processed_event (event_id, consumer) VALUES ($1, $2)`,
evt.EventID, "order-summary-projector",
)
if err != nil {
return fmt.Errorf("mark processed: %w", err)
}
return tx.Commit(ctx)
}
Вариант 2 — idempotent UPDATE по version
Подходит когда события одного агрегата идут строго в порядке (product.updated по product_id на одной партиции Kafka):
// adapters/in/kafka/product_summary_consumer.go
func (c *ProductSummaryConsumer) handle(ctx context.Context, msg kafka.Message) error {
var evt event.ProductUpdatedEvent
if err := json.Unmarshal(msg.Value, &evt); err != nil {
return fmt.Errorf("unmarshal ProductUpdated: %w", err)
}
tx, err := c.db.BeginTx(ctx, pgx.TxOptions{})
if err != nil {
return fmt.Errorf("begin tx: %w", err)
}
defer tx.Rollback(ctx)
tag, err := tx.Exec(ctx,
`UPDATE product_summary
SET name = $1, price = $2, version = $3, updated_at = now()
WHERE product_id = $4 AND version < $3`,
evt.Name, evt.Price, evt.AggregateVersion, evt.ProductID,
)
if err != nil {
return fmt.Errorf("update product_summary: %w", err)
}
if tag.RowsAffected() == 0 {
slog.DebugContext(ctx, "stale or duplicate event skipped",
"product_id", evt.ProductID,
"version", evt.AggregateVersion,
)
}
return tx.Commit(ctx)
}
SQL WHERE version < $new делает повторную доставку безопасной: устаревшее событие не перезапишет более свежее состояние.
Bootstrap и disaster recovery
R-CQRS-SYNC-3. При первом запуске нового read-store или после потери — batch-rebuild из write-store, не ожидание событий из Kafka:
// cmd/rebuild-summaries/main.go
package main
import (
"context"
"log/slog"
"os"
"github.com/jackc/pgx/v5/pgxpool"
)
func main() {
ctx := context.Background()
db, _ := pgxpool.New(ctx, os.Getenv("DATABASE_URL"))
defer db.Close()
if err := rebuildOrderSummaries(ctx, db); err != nil {
slog.ErrorContext(ctx, "rebuild failed", "err", err)
os.Exit(1)
}
slog.InfoContext(ctx, "rebuild complete")
}
func rebuildOrderSummaries(ctx context.Context, db *pgxpool.Pool) error {
var offset int
for {
rows, err := db.Query(ctx,
`SELECT id, customer_id, customer_name, total_amount, status, item_count, created_at
FROM orders
ORDER BY id
LIMIT 500 OFFSET $1`,
offset,
)
if err != nil {
return err
}
var summaries []OrderSummaryDTO
for rows.Next() {
var s OrderSummaryDTO
if err := rows.Scan(
&s.OrderID, &s.CustomerID, &s.CustomerName,
&s.TotalAmount, &s.Status, &s.ItemCount, &s.CreatedAt,
); err != nil {
return err
}
summaries = append(summaries, s)
}
rows.Close()
if len(summaries) == 0 {
return nil
}
if err := upsertBatch(ctx, db, summaries); err != nil {
return err
}
offset += len(summaries)
slog.InfoContext(ctx, "batch rebuilt", "offset", offset)
}
}
Rebuild — отдельная CLI-команда (cmd/rebuild-summaries), не часть основного сервера. Запускается вручную или как initContainer при деплое нового read-store.
Eventual consistency в OpenAPI (code-first)
R-CQRS-SYNC-4. В Go (code-first API через chi) eventual consistency декларируется в комментарии handler-а или через X-Data-Freshness-заголовок в ответе:
// edge/handler/order_summary_handler.go
// Handle GET /orders/{id}/summary
//
// Возвращает read-проекцию заказа.
// X-Data-Freshness: eventual — обновляется асинхронно через Kafka, задержка ≤ 2s.
// Для immediate consistency используйте GET /orders/{id} (из write-store).
func (h *OrderSummaryHandler) Handle(w http.ResponseWriter, r *http.Request) {
orderID := chi.URLParam(r, "id")
summary, err := h.handler.Handle(r.Context(), query.GetOrderSummary{OrderID: orderID})
if err != nil {
httperr.Write(w, err)
return
}
w.Header().Set("X-Data-Freshness", "eventual")
render.JSON(w, r, summary)
}
Клиент видит заголовок и знает: прямо после POST /orders GET /orders/{id}/summary может вернуть 404 или предыдущее состояние. Это архитектурное свойство, не баг.
Read-your-writes
R-CQRS-SYNC-5. Три варианта, в порядке возрастания инвазивности:
Version-токен от command
Command возвращает AggregateVersion. UI ждёт, пока query-handler не вернёт summary с version ≥ N:
// command-handler возвращает version
type ConfirmOrderResult struct {
OrderID string
AggregateVersion int64
}
// query содержит минимальный version
type GetOrderSummary struct {
OrderID string
MinVersion int64
}
// query-handler с polling
func (h *GetOrderSummaryHandler) Handle(ctx context.Context, q query.GetOrderSummary) (view.OrderSummaryDTO, error) {
deadline := time.Now().Add(3 * time.Second)
for time.Now().Before(deadline) {
summary, err := h.views.SummaryByID(ctx, q.OrderID)
if err != nil {
return view.OrderSummaryDTO{}, err
}
if summary.Version >= q.MinVersion {
return summary, nil
}
time.Sleep(100 * time.Millisecond)
}
return view.OrderSummaryDTO{}, apperr.New(apperr.Timeout, "read-model not yet updated")
}
Подходит для критичных сценариев с явной гарантией; p99 latency query растёт до minVersion-задержки.
Два endpoint-а с явным trade-off
Самое простое решение: GET из write-store для immediate consistency, GET из read-model для оптимизированного query:
GET /orders/{id} — из write-store, immediate consistency
GET /orders/{id}/summary — из read-model, eventual consistency, ≤ 2s
Клиент выбирает сам. В большинстве сценариев это правильное архитектурное решение — два маршрута явно показывают trade-off.
Сборка consumer через chi-сервер
// adapters/in/kafka/consumer_runner.go
package kafka
import (
"context"
"encoding/json"
"log/slog"
"github.com/segmentio/kafka-go"
)
type OrderSummaryConsumerRunner struct {
reader *kafka.Reader
consumer *OrderSummaryConsumer
}
func (r *OrderSummaryConsumerRunner) Run(ctx context.Context) {
for {
msg, err := r.reader.FetchMessage(ctx)
if err != nil {
if ctx.Err() != nil {
return
}
slog.ErrorContext(ctx, "kafka fetch error", "err", err)
continue
}
if err := r.consumer.handle(ctx, msg); err != nil {
slog.ErrorContext(ctx, "consumer error, skipping", "err", err, "offset", msg.Offset)
}
_ = r.reader.CommitMessages(ctx, msg)
}
}
Consumer-goroutine запускается рядом с HTTP-сервером в main.go; graceful shutdown через context.WithCancel.
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
tx.Exec("INSERT INTO order_summary …") внутри command-транзакции | R-CQRS-SYNC-X1 | Outbox-запись в той же pgx.Tx, relay публикует в Kafka, consumer обновляет read-model |
PG-триггер AFTER UPDATE ON orders → UPDATE order_summary | R-CQRS-SYNC-X2 | Явный Go-consumer на Kafka-событие: трассируем, тестируем, масштабируем |
payload = db.Order{…} — sqlc-тип write-схемы как event payload | R-CQRS-SYNC-X3 | Отдельный OrderConfirmedEvent struct с EventID и AggregateVersion |
Consumer без дедупа (processed_event или version-guard) | R-CQRS-SYNC-2 | processed_event-таблица или UPDATE … WHERE version < $new |
Eventual consistency не задекларирована (X-Data-Freshness / OpenAPI description) | R-CQRS-SYNC-4 | Явный заголовок или описание в OpenAPI-спецификации |
| Ожидание Kafka-событий при bootstrap (пустой read-store) | R-CQRS-SYNC-3 | cmd/rebuild-* — CLI-команда, batch-rebuild из write-store |
Куда дальше
- Command side — как outbox-событие регистрируется в write-handler через
pgx.Tx. - Query side — read-only транзакция pgx и
<X>ViewRepository. - Read-model — где хранить проекцию: PG-таблица, Redis, ElasticSearch.
- Уровень и эволюция — Tier 2 → 3 → 4: от маркеров к event-driven read-model.
- Когда CQRS оправдан — когда lightweight достаточно, когда нужен outbox+Kafka.