← назад к разделу

В CQRS write-сторона и read-сторона живут отдельно. Запрос на чтение идёт не в таблицу orders, а в специальную проекцию order_summary — оптимизированную под конкретный экран. Но как изменения из write попадают в read? Кто и когда обновляет проекцию?

Если обновлять order_summary прямо внутри той же транзакции, что меняет orders, — мы возвращаемся к монолитной модели. Если делать это после коммита — рискуем потерять обновление при сбое.

Решение — outbox-паттерн: событие пишется в ту же транзакцию, что и изменение агрегата, а доставка в Kafka (и дальше в read-model) происходит асинхронно.

Outbox: одна транзакция для агрегата и события

Суть проблемы: вы подтвердили заказ и хотите опубликовать событие order.confirmed в Kafka. Если сначала сохранить заказ, потом отправить событие — между этими двумя шагами может случиться сбой. Заказ сохранён, событие потеряно, read-model не обновилась.

Outbox решает это просто: событие сохраняется в таблицу outbox внутри той же pgx.Tx, что и изменение заказа. Либо оба изменения прошли, либо откатились оба.

// adapters/out/outbox/order_outbox_repository.go
package outbox

import (
    "context"
    "encoding/json"
    "fmt"

    "github.com/jackc/pgx/v5"
)

type OrderOutboxRepository struct{}

func (r *OrderOutboxRepository) Enqueue(ctx context.Context, tx pgx.Tx, evt OrderConfirmedEvent) error {
    payload, err := json.Marshal(evt)
    if err != nil {
        return fmt.Errorf("marshal OrderConfirmed: %w", err)
    }
    _, err = tx.Exec(ctx,
        `INSERT INTO outbox (event_type, payload, created_at)
         VALUES ($1, $2, now())`,
        "order.confirmed", payload,
    )
    return err
}

Схема таблицы outbox:

CREATE TABLE outbox (
    id          bigserial PRIMARY KEY,
    event_type  text        NOT NULL,
    payload     jsonb       NOT NULL,
    created_at  timestamptz NOT NULL DEFAULT now(),
    published   boolean     NOT NULL DEFAULT false
);

В command-handler последовательность такая:

pgx.Tx.Begin()
  UPDATE orders SET status = 'confirmed' WHERE id = $1
  INSERT INTO outbox (event_type, payload) VALUES ('order.confirmed', $2)
pgx.Tx.Commit()

Пока строка в outbox — событие никуда не денется. Если Kafka временно недоступна, relay просто подождёт и попробует снова.

Relay-горутина: из outbox в Kafka

Отдельная горутина периодически берёт непубликованные записи из outbox и отправляет их в Kafka. FOR UPDATE SKIP LOCKED позволяет запускать несколько relay-инстансов параллельно без конфликтов.

// adapters/out/outbox/relay.go
package outbox

import (
    "context"
    "time"

    "github.com/jackc/pgx/v5/pgxpool"
    "github.com/segmentio/kafka-go"
    "log/slog"
)

type Relay struct {
    db       *pgxpool.Pool
    writer   *kafka.Writer
    interval time.Duration
}

func (r *Relay) Run(ctx context.Context) {
    ticker := time.NewTicker(r.interval)
    defer ticker.Stop()
    for {
        select {
        case <-ctx.Done():
            return
        case <-ticker.C:
            if err := r.publishBatch(ctx); err != nil {
                slog.ErrorContext(ctx, "outbox relay error", "err", err)
            }
        }
    }
}

func (r *Relay) publishBatch(ctx context.Context) error {
    tx, err := r.db.Begin(ctx)
    if err != nil {
        return err
    }
    defer tx.Rollback(ctx)

    rows, err := tx.Query(ctx,
        `SELECT id, event_type, payload FROM outbox
         WHERE published = false
         ORDER BY id
         FOR UPDATE SKIP LOCKED
         LIMIT 100`,
    )
    if err != nil {
        return err
    }
    var msgs []kafka.Message
    var ids []int64
    for rows.Next() {
        var id int64
        var evtType string
        var payload []byte
        if err := rows.Scan(&id, &evtType, &payload); err != nil {
            return err
        }
        msgs = append(msgs, kafka.Message{Topic: evtType, Value: payload})
        ids = append(ids, id)
    }
    rows.Close()
    if err := rows.Err(); err != nil {
        return err
    }
    if len(msgs) == 0 {
        return nil
    }
    if err := r.writer.WriteMessages(ctx, msgs...); err != nil {
        return err
    }
    _, err = tx.Exec(ctx,
        `UPDATE outbox SET published = true WHERE id = ANY($1)`, ids,
    )
    if err != nil {
        return err
    }
    return tx.Commit(ctx)
}

Payload события — отдельный struct

Распространённая ошибка — положить в payload события тот же тип, который генерирует sqlc из схемы базы: db.Order, db.Product. Проблема в том, что любое изменение таблицы (ALTER TABLE orders ADD COLUMN ...) автоматически изменит тип события. Consumer, который читает старую версию события, сломается.

Правильно — отдельный struct специально для события:

// core/order/event/order_confirmed.go
package event

import "time"

type OrderConfirmedEvent struct {
    EventID          string    `json:"event_id"`
    OrderID          string    `json:"order_id"`
    CustomerID       string    `json:"customer_id"`
    TotalAmount      int64     `json:"total_amount"`
    ConfirmedAt      time.Time `json:"confirmed_at"`
    AggregateVersion int64     `json:"aggregate_version"`
}

Если потребуется изменить структуру события — создаём OrderConfirmedEventV2 и публикуем параллельно. Существующие consumers продолжают работать.

Idempotent consumer: защита от дублей

Kafka гарантирует доставку «хотя бы раз» (at-least-once). Это значит, что одно и то же сообщение может прийти consumer-у дважды — например, если consumer упал после обработки, но до коммита offset-а.

Без защиты от дублей read-model получит двойное обновление. Есть два подхода.

Таблица обработанных событий

Перед обновлением read-model проверяем, не обрабатывали ли мы уже это событие. Проверка и обновление — в одной транзакции:

func (c *OrderSummaryConsumer) handle(ctx context.Context, msg kafka.Message) error {
    var evt event.OrderConfirmedEvent
    if err := json.Unmarshal(msg.Value, &evt); err != nil {
        return fmt.Errorf("unmarshal OrderConfirmed: %w", err)
    }

    tx, err := c.db.BeginTx(ctx, pgx.TxOptions{})
    if err != nil {
        return fmt.Errorf("begin tx: %w", err)
    }
    defer tx.Rollback(ctx)

    var alreadyProcessed bool
    _ = tx.QueryRow(ctx,
        `SELECT EXISTS(SELECT 1 FROM processed_event WHERE event_id = $1 AND consumer = $2)`,
        evt.EventID, "order-summary-projector",
    ).Scan(&alreadyProcessed)
    if alreadyProcessed {
        slog.DebugContext(ctx, "duplicate event skipped", "event_id", evt.EventID)
        return nil
    }

    if err := c.summaries.Upsert(ctx, tx, toSummary(evt)); err != nil {
        return fmt.Errorf("upsert order summary: %w", err)
    }
    _, err = tx.Exec(ctx,
        `INSERT INTO processed_event (event_id, consumer) VALUES ($1, $2)`,
        evt.EventID, "order-summary-projector",
    )
    if err != nil {
        return fmt.Errorf("mark processed: %w", err)
    }
    return tx.Commit(ctx)
}

Схема таблицы:

CREATE TABLE processed_event (
    event_id     text        PRIMARY KEY,
    consumer     text        NOT NULL,
    processed_at timestamptz NOT NULL DEFAULT now()
);

Версионный UPDATE

Если события одного агрегата идут строго по порядку (одна партиция Kafka на product_id), можно обойтись без отдельной таблицы. Добавляем в read-model колонку version и обновляем только если событие свежее:

func (c *ProductSummaryConsumer) handle(ctx context.Context, msg kafka.Message) error {
    var evt event.ProductUpdatedEvent
    if err := json.Unmarshal(msg.Value, &evt); err != nil {
        return fmt.Errorf("unmarshal ProductUpdated: %w", err)
    }

    tx, err := c.db.BeginTx(ctx, pgx.TxOptions{})
    if err != nil {
        return fmt.Errorf("begin tx: %w", err)
    }
    defer tx.Rollback(ctx)

    tag, err := tx.Exec(ctx,
        `UPDATE product_summary
         SET name = $1, price = $2, version = $3, updated_at = now()
         WHERE product_id = $4 AND version < $3`,
        evt.Name, evt.Price, evt.AggregateVersion, evt.ProductID,
    )
    if err != nil {
        return fmt.Errorf("update product_summary: %w", err)
    }
    if tag.RowsAffected() == 0 {
        slog.DebugContext(ctx, "stale or duplicate event skipped",
            "product_id", evt.ProductID,
            "version", evt.AggregateVersion,
        )
    }
    return tx.Commit(ctx)
}

Условие WHERE version < $new делает повторную доставку безопасной: устаревшее событие просто не перезапишет более свежее состояние.

Восстановление read-model с нуля

Если read-model потеряна (удалили базу, добавили новый тип проекции), ждать событий из Kafka бесполезно — старые события уже могут быть удалены по retention. Нужно пакетное восстановление из write-store.

Это отдельная CLI-команда, а не часть основного сервера:

// cmd/rebuild-summaries/main.go
func rebuildOrderSummaries(ctx context.Context, db *pgxpool.Pool) error {
    var lastID string
    for {
        rows, err := db.Query(ctx,
            `SELECT id, customer_id, customer_name, total_amount, status, item_count, created_at
             FROM orders
             WHERE id > $1
             ORDER BY id
             LIMIT 500`,
            lastID,
        )
        if err != nil {
            return err
        }
        var summaries []OrderSummaryDTO
        for rows.Next() {
            var s OrderSummaryDTO
            if err := rows.Scan(
                &s.OrderID, &s.CustomerID, &s.CustomerName,
                &s.TotalAmount, &s.Status, &s.ItemCount, &s.CreatedAt,
            ); err != nil {
                return err
            }
            summaries = append(summaries, s)
        }
        rows.Close()
        if err := rows.Err(); err != nil {
            return err
        }
        if len(summaries) == 0 {
            return nil
        }
        if err := upsertBatch(ctx, db, summaries); err != nil {
            return err
        }
        lastID = summaries[len(summaries)-1].OrderID
        slog.InfoContext(ctx, "batch rebuilt", "last_id", lastID)
    }
}

Запускается вручную или как initContainer при деплое нового read-store.

Eventual consistency и read-your-writes

Read-model обновляется асинхронно — между записью в write-store и появлением данных в read-model есть задержка. Это нормально и называется eventual consistency (согласованность в конечном счёте).

Важно обозначить это явно в API. В Go-сервисе удобно добавить заголовок:

func (h *OrderSummaryHandler) Handle(w http.ResponseWriter, r *http.Request) {
    orderID := chi.URLParam(r, "id")
    summary, err := h.handler.Handle(r.Context(), query.GetOrderSummary{OrderID: orderID})
    if err != nil {
        httperr.Write(w, r, err)
        return
    }
    w.Header().Set("X-Data-Freshness", "eventual")
    render.JSON(w, r, summary)
}

Клиент видит заголовок и знает: сразу после POST /orders запрос GET /orders/{id}/summary может вернуть предыдущее состояние. Это архитектурное свойство, не ошибка.

Read-your-writes

Иногда пользователь должен сразу увидеть свои изменения — например, после подтверждения заказа редиректим на страницу заказа. Два практичных варианта:

Два endpoint-а с явным выбором. Самое простое решение:

GET /orders/{id}          — из write-store, данные всегда актуальны
GET /orders/{id}/summary  — из read-model, eventual consistency, ≤ 2s

Клиент выбирает нужный endpoint в зависимости от сценария.

Version-токен. Command возвращает версию агрегата. UI передаёт её в query и ждёт, пока read-model не догонит:

type ConfirmOrderResult struct {
    OrderID          string
    AggregateVersion int64
}

type GetOrderSummary struct {
    OrderID    string
    MinVersion int64
}

Query-handler делает polling с таймаутом:

func (h *GetOrderSummaryHandler) Handle(ctx context.Context, q query.GetOrderSummary) (view.OrderSummaryDTO, error) {
    deadline := time.Now().Add(3 * time.Second)
    for time.Now().Before(deadline) {
        summary, err := h.views.SummaryByID(ctx, q.OrderID)
        if err != nil {
            return view.OrderSummaryDTO{}, err
        }
        if summary.Version >= q.MinVersion {
            return summary, nil
        }
        time.Sleep(100 * time.Millisecond)
    }
    return view.OrderSummaryDTO{}, &ReadModelNotReadyError{OrderID: q.OrderID, MinVersion: q.MinVersion}
}

Этот вариант сложнее, но даёт явную гарантию для критичных сценариев.

Частые ошибки

Обновлять read-model внутри command-транзакции. tx.Exec("INSERT INTO order_summary …") в том же pgx.Tx — это не CQRS, это снова единая модель. Вынесите обновление в consumer через outbox+Kafka.

PG-триггер вместо consumer. AFTER UPDATE ON orders → UPDATE order_summary работает, но такой триггер не трассируется, не тестируется как отдельный компонент, не масштабируется. Consumer на Kafka-событии делает то же самое явно.

Payload — это sqlc-тип. payload = db.Order{…} значит, что схема базы стала публичным контрактом. Один ALTER TABLE — и все consumers требуют обновления. Всегда используйте отдельный event struct.

Consumer без защиты от дублей. Kafka доставляет хотя бы раз. Без processed_event-таблицы или version-guard read-model будет получать двойные обновления.

Восстановление read-model через ожидание Kafka. При пустом read-store нельзя ждать событий: старые события уже удалены. Нужно пакетное восстановление из write-store.

Коротко

  • Outbox-запись и изменение агрегата идут в одной pgx.Tx — или оба прошли, или откатились оба.
  • Relay-горутина читает непубликованные строки из outbox через FOR UPDATE SKIP LOCKED и отправляет в Kafka.
  • Payload события — отдельный struct с EventID и AggregateVersion, не sqlc-тип из write-схемы.
  • Kafka at-least-once означает дубли: consumer должен быть idempotent — через processed_event-таблицу или UPDATE … WHERE version < $new.
  • Eventual consistency — декларируйте явно через заголовок X-Data-Freshness: eventual.
  • При потере read-model — пакетное восстановление из write-store, не ожидание событий из Kafka.
  • Read-your-writes: проще всего — два endpoint-а с разной consistency-гарантией.

Что почитать дальше

  • Command side в CQRS на Go — как outbox-событие регистрируется в command-handler.
  • Query side в CQRS на Go — read-only транзакция и view-репозиторий.
  • Когда CQRS оправдан — от lightweight до event-driven read-model.