В CQRS write-сторона и read-сторона живут отдельно. Запрос на чтение идёт не в таблицу orders, а в специальную проекцию order_summary — оптимизированную под конкретный экран. Но как изменения из write попадают в read? Кто и когда обновляет проекцию?
Если обновлять order_summary прямо внутри той же транзакции, что меняет orders, — мы возвращаемся к монолитной модели. Если делать это после коммита — рискуем потерять обновление при сбое.
Решение — outbox-паттерн: событие пишется в ту же транзакцию, что и изменение агрегата, а доставка в Kafka (и дальше в read-model) происходит асинхронно.
Outbox: одна транзакция для агрегата и события
Суть проблемы: вы подтвердили заказ и хотите опубликовать событие order.confirmed в Kafka. Если сначала сохранить заказ, потом отправить событие — между этими двумя шагами может случиться сбой. Заказ сохранён, событие потеряно, read-model не обновилась.
Outbox решает это просто: событие сохраняется в таблицу outbox внутри той же pgx.Tx, что и изменение заказа. Либо оба изменения прошли, либо откатились оба.
// adapters/out/outbox/order_outbox_repository.go
package outbox
import (
"context"
"encoding/json"
"fmt"
"github.com/jackc/pgx/v5"
)
type OrderOutboxRepository struct{}
func (r *OrderOutboxRepository) Enqueue(ctx context.Context, tx pgx.Tx, evt OrderConfirmedEvent) error {
payload, err := json.Marshal(evt)
if err != nil {
return fmt.Errorf("marshal OrderConfirmed: %w", err)
}
_, err = tx.Exec(ctx,
`INSERT INTO outbox (event_type, payload, created_at)
VALUES ($1, $2, now())`,
"order.confirmed", payload,
)
return err
}
Схема таблицы outbox:
CREATE TABLE outbox (
id bigserial PRIMARY KEY,
event_type text NOT NULL,
payload jsonb NOT NULL,
created_at timestamptz NOT NULL DEFAULT now(),
published boolean NOT NULL DEFAULT false
);
В command-handler последовательность такая:
pgx.Tx.Begin()
UPDATE orders SET status = 'confirmed' WHERE id = $1
INSERT INTO outbox (event_type, payload) VALUES ('order.confirmed', $2)
pgx.Tx.Commit()
Пока строка в outbox — событие никуда не денется. Если Kafka временно недоступна, relay просто подождёт и попробует снова.
Relay-горутина: из outbox в Kafka
Отдельная горутина периодически берёт непубликованные записи из outbox и отправляет их в Kafka. FOR UPDATE SKIP LOCKED позволяет запускать несколько relay-инстансов параллельно без конфликтов.
// adapters/out/outbox/relay.go
package outbox
import (
"context"
"time"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/segmentio/kafka-go"
"log/slog"
)
type Relay struct {
db *pgxpool.Pool
writer *kafka.Writer
interval time.Duration
}
func (r *Relay) Run(ctx context.Context) {
ticker := time.NewTicker(r.interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if err := r.publishBatch(ctx); err != nil {
slog.ErrorContext(ctx, "outbox relay error", "err", err)
}
}
}
}
func (r *Relay) publishBatch(ctx context.Context) error {
tx, err := r.db.Begin(ctx)
if err != nil {
return err
}
defer tx.Rollback(ctx)
rows, err := tx.Query(ctx,
`SELECT id, event_type, payload FROM outbox
WHERE published = false
ORDER BY id
FOR UPDATE SKIP LOCKED
LIMIT 100`,
)
if err != nil {
return err
}
var msgs []kafka.Message
var ids []int64
for rows.Next() {
var id int64
var evtType string
var payload []byte
if err := rows.Scan(&id, &evtType, &payload); err != nil {
return err
}
msgs = append(msgs, kafka.Message{Topic: evtType, Value: payload})
ids = append(ids, id)
}
rows.Close()
if err := rows.Err(); err != nil {
return err
}
if len(msgs) == 0 {
return nil
}
if err := r.writer.WriteMessages(ctx, msgs...); err != nil {
return err
}
_, err = tx.Exec(ctx,
`UPDATE outbox SET published = true WHERE id = ANY($1)`, ids,
)
if err != nil {
return err
}
return tx.Commit(ctx)
}
Payload события — отдельный struct
Распространённая ошибка — положить в payload события тот же тип, который генерирует sqlc из схемы базы: db.Order, db.Product. Проблема в том, что любое изменение таблицы (ALTER TABLE orders ADD COLUMN ...) автоматически изменит тип события. Consumer, который читает старую версию события, сломается.
Правильно — отдельный struct специально для события:
// core/order/event/order_confirmed.go
package event
import "time"
type OrderConfirmedEvent struct {
EventID string `json:"event_id"`
OrderID string `json:"order_id"`
CustomerID string `json:"customer_id"`
TotalAmount int64 `json:"total_amount"`
ConfirmedAt time.Time `json:"confirmed_at"`
AggregateVersion int64 `json:"aggregate_version"`
}
Если потребуется изменить структуру события — создаём OrderConfirmedEventV2 и публикуем параллельно. Существующие consumers продолжают работать.
Idempotent consumer: защита от дублей
Kafka гарантирует доставку «хотя бы раз» (at-least-once). Это значит, что одно и то же сообщение может прийти consumer-у дважды — например, если consumer упал после обработки, но до коммита offset-а.
Без защиты от дублей read-model получит двойное обновление. Есть два подхода.
Таблица обработанных событий
Перед обновлением read-model проверяем, не обрабатывали ли мы уже это событие. Проверка и обновление — в одной транзакции:
func (c *OrderSummaryConsumer) handle(ctx context.Context, msg kafka.Message) error {
var evt event.OrderConfirmedEvent
if err := json.Unmarshal(msg.Value, &evt); err != nil {
return fmt.Errorf("unmarshal OrderConfirmed: %w", err)
}
tx, err := c.db.BeginTx(ctx, pgx.TxOptions{})
if err != nil {
return fmt.Errorf("begin tx: %w", err)
}
defer tx.Rollback(ctx)
var alreadyProcessed bool
_ = tx.QueryRow(ctx,
`SELECT EXISTS(SELECT 1 FROM processed_event WHERE event_id = $1 AND consumer = $2)`,
evt.EventID, "order-summary-projector",
).Scan(&alreadyProcessed)
if alreadyProcessed {
slog.DebugContext(ctx, "duplicate event skipped", "event_id", evt.EventID)
return nil
}
if err := c.summaries.Upsert(ctx, tx, toSummary(evt)); err != nil {
return fmt.Errorf("upsert order summary: %w", err)
}
_, err = tx.Exec(ctx,
`INSERT INTO processed_event (event_id, consumer) VALUES ($1, $2)`,
evt.EventID, "order-summary-projector",
)
if err != nil {
return fmt.Errorf("mark processed: %w", err)
}
return tx.Commit(ctx)
}
Схема таблицы:
CREATE TABLE processed_event (
event_id text PRIMARY KEY,
consumer text NOT NULL,
processed_at timestamptz NOT NULL DEFAULT now()
);
Версионный UPDATE
Если события одного агрегата идут строго по порядку (одна партиция Kafka на product_id), можно обойтись без отдельной таблицы. Добавляем в read-model колонку version и обновляем только если событие свежее:
func (c *ProductSummaryConsumer) handle(ctx context.Context, msg kafka.Message) error {
var evt event.ProductUpdatedEvent
if err := json.Unmarshal(msg.Value, &evt); err != nil {
return fmt.Errorf("unmarshal ProductUpdated: %w", err)
}
tx, err := c.db.BeginTx(ctx, pgx.TxOptions{})
if err != nil {
return fmt.Errorf("begin tx: %w", err)
}
defer tx.Rollback(ctx)
tag, err := tx.Exec(ctx,
`UPDATE product_summary
SET name = $1, price = $2, version = $3, updated_at = now()
WHERE product_id = $4 AND version < $3`,
evt.Name, evt.Price, evt.AggregateVersion, evt.ProductID,
)
if err != nil {
return fmt.Errorf("update product_summary: %w", err)
}
if tag.RowsAffected() == 0 {
slog.DebugContext(ctx, "stale or duplicate event skipped",
"product_id", evt.ProductID,
"version", evt.AggregateVersion,
)
}
return tx.Commit(ctx)
}
Условие WHERE version < $new делает повторную доставку безопасной: устаревшее событие просто не перезапишет более свежее состояние.
Восстановление read-model с нуля
Если read-model потеряна (удалили базу, добавили новый тип проекции), ждать событий из Kafka бесполезно — старые события уже могут быть удалены по retention. Нужно пакетное восстановление из write-store.
Это отдельная CLI-команда, а не часть основного сервера:
// cmd/rebuild-summaries/main.go
func rebuildOrderSummaries(ctx context.Context, db *pgxpool.Pool) error {
var lastID string
for {
rows, err := db.Query(ctx,
`SELECT id, customer_id, customer_name, total_amount, status, item_count, created_at
FROM orders
WHERE id > $1
ORDER BY id
LIMIT 500`,
lastID,
)
if err != nil {
return err
}
var summaries []OrderSummaryDTO
for rows.Next() {
var s OrderSummaryDTO
if err := rows.Scan(
&s.OrderID, &s.CustomerID, &s.CustomerName,
&s.TotalAmount, &s.Status, &s.ItemCount, &s.CreatedAt,
); err != nil {
return err
}
summaries = append(summaries, s)
}
rows.Close()
if err := rows.Err(); err != nil {
return err
}
if len(summaries) == 0 {
return nil
}
if err := upsertBatch(ctx, db, summaries); err != nil {
return err
}
lastID = summaries[len(summaries)-1].OrderID
slog.InfoContext(ctx, "batch rebuilt", "last_id", lastID)
}
}
Запускается вручную или как initContainer при деплое нового read-store.
Eventual consistency и read-your-writes
Read-model обновляется асинхронно — между записью в write-store и появлением данных в read-model есть задержка. Это нормально и называется eventual consistency (согласованность в конечном счёте).
Важно обозначить это явно в API. В Go-сервисе удобно добавить заголовок:
func (h *OrderSummaryHandler) Handle(w http.ResponseWriter, r *http.Request) {
orderID := chi.URLParam(r, "id")
summary, err := h.handler.Handle(r.Context(), query.GetOrderSummary{OrderID: orderID})
if err != nil {
httperr.Write(w, r, err)
return
}
w.Header().Set("X-Data-Freshness", "eventual")
render.JSON(w, r, summary)
}
Клиент видит заголовок и знает: сразу после POST /orders запрос GET /orders/{id}/summary может вернуть предыдущее состояние. Это архитектурное свойство, не ошибка.
Read-your-writes
Иногда пользователь должен сразу увидеть свои изменения — например, после подтверждения заказа редиректим на страницу заказа. Два практичных варианта:
Два endpoint-а с явным выбором. Самое простое решение:
GET /orders/{id} — из write-store, данные всегда актуальны
GET /orders/{id}/summary — из read-model, eventual consistency, ≤ 2s
Клиент выбирает нужный endpoint в зависимости от сценария.
Version-токен. Command возвращает версию агрегата. UI передаёт её в query и ждёт, пока read-model не догонит:
type ConfirmOrderResult struct {
OrderID string
AggregateVersion int64
}
type GetOrderSummary struct {
OrderID string
MinVersion int64
}
Query-handler делает polling с таймаутом:
func (h *GetOrderSummaryHandler) Handle(ctx context.Context, q query.GetOrderSummary) (view.OrderSummaryDTO, error) {
deadline := time.Now().Add(3 * time.Second)
for time.Now().Before(deadline) {
summary, err := h.views.SummaryByID(ctx, q.OrderID)
if err != nil {
return view.OrderSummaryDTO{}, err
}
if summary.Version >= q.MinVersion {
return summary, nil
}
time.Sleep(100 * time.Millisecond)
}
return view.OrderSummaryDTO{}, &ReadModelNotReadyError{OrderID: q.OrderID, MinVersion: q.MinVersion}
}
Этот вариант сложнее, но даёт явную гарантию для критичных сценариев.
Частые ошибки
Обновлять read-model внутри command-транзакции. tx.Exec("INSERT INTO order_summary …") в том же pgx.Tx — это не CQRS, это снова единая модель. Вынесите обновление в consumer через outbox+Kafka.
PG-триггер вместо consumer. AFTER UPDATE ON orders → UPDATE order_summary работает, но такой триггер не трассируется, не тестируется как отдельный компонент, не масштабируется. Consumer на Kafka-событии делает то же самое явно.
Payload — это sqlc-тип. payload = db.Order{…} значит, что схема базы стала публичным контрактом. Один ALTER TABLE — и все consumers требуют обновления. Всегда используйте отдельный event struct.
Consumer без защиты от дублей. Kafka доставляет хотя бы раз. Без processed_event-таблицы или version-guard read-model будет получать двойные обновления.
Восстановление read-model через ожидание Kafka. При пустом read-store нельзя ждать событий: старые события уже удалены. Нужно пакетное восстановление из write-store.
Коротко
- Outbox-запись и изменение агрегата идут в одной
pgx.Tx— или оба прошли, или откатились оба. - Relay-горутина читает непубликованные строки из outbox через
FOR UPDATE SKIP LOCKEDи отправляет в Kafka. - Payload события — отдельный
structсEventIDиAggregateVersion, не sqlc-тип из write-схемы. - Kafka at-least-once означает дубли: consumer должен быть idempotent — через
processed_event-таблицу илиUPDATE … WHERE version < $new. - Eventual consistency — декларируйте явно через заголовок
X-Data-Freshness: eventual. - При потере read-model — пакетное восстановление из write-store, не ожидание событий из Kafka.
- Read-your-writes: проще всего — два endpoint-а с разной consistency-гарантией.
Что почитать дальше
- Command side в CQRS на Go — как outbox-событие регистрируется в command-handler.
- Query side в CQRS на Go — read-only транзакция и view-репозиторий.
- Когда CQRS оправдан — от lightweight до event-driven read-model.