← назад к разделу

В обычном приложении один и тот же код читает и пишет данные. Сначала всё просто, но потом возникает напряжение: запрос на запись требует одних индексов, запрос на чтение — других. JOIN для записи и JOIN для чтения начинают мешать друг другу. Таблица растёт, и ни одна операция не работает оптимально.

CQRS решает это разделением: данные хранятся в форме, удобной для записи (агрегаты, нормализованная схема), а для чтения создаётся отдельное представление — read-model. Это заранее подготовленная, денормализованная проекция: один SELECT без JOIN возвращает готовый объект для UI или API.

В Go контракт читающей стороны выражается через интерфейс <X>ViewRepository; реализацию под него генерирует sqlc.

Почему данные для чтения выглядят иначе

Возьмём заказ. В write-схеме он разложен по нескольким таблицам:

order(id, customer_id, status)
order_item(order_id, qty, price)
customer(id, name, email_hash)

Чтобы показать карточку заказа пользователю, нужен JOIN трёх таблиц. На тысячах запросов в секунду это ощутимо.

Read-model складывает всё это в одну таблицу заранее:

order_summary(
    order_id,
    customer_name,      ← уже вставлено из customer, join не нужен
    customer_email_hash,
    status,
    item_count,         ← посчитано из order_item заранее
    total_amount        ← тоже посчитано
)

Теперь SELECT * FROM order_summary WHERE order_id = $1 — и готово. Никаких join'ов.

Цена — данные не всегда актуальны мгновенно: между записью в write-side и обновлением read-model проходит короткое время (обычно 100мс–1с). Это называется итоговой согласованностью (eventual consistency). В большинстве случаев это приемлемо.

Где хранить read-model

Хранилище выбирают под паттерн чтения, а не «одно универсальное».

Денормализованная PG-таблица — почти всегда первый вариант. Работает со стандартной реляционной базой, которая уже есть в проекте. Хорошо для табличных запросов с пагинацией, фильтром и сортировкой.

-- migration: создаём read-таблицу для заказов
CREATE TABLE order_summary (
    order_id       uuid PRIMARY KEY,
    customer_id    uuid NOT NULL,
    customer_name  text NOT NULL,
    status         text NOT NULL,
    item_count     int  NOT NULL,
    total_amount   bigint NOT NULL,
    currency       text NOT NULL,
    created_at     timestamptz NOT NULL,
    confirmed_at   timestamptz,
    updated_at     timestamptz NOT NULL,
    version        bigint NOT NULL DEFAULT 0
);
CREATE INDEX ix_os_customer    ON order_summary (customer_id, created_at DESC);
CREATE INDEX ix_os_status_date ON order_summary (status, created_at DESC);

Поле version нужно для того, чтобы обработка повторного события не затёрла более свежие данные.

PG materialized view — когда нужна тяжёлая агрегация, которую пересчитывать на лету слишком дорого. Например, выручка по продуктам за последний месяц. Обновляется командой REFRESH MATERIALIZED VIEW CONCURRENTLY — либо по расписанию, либо по событию.

CREATE MATERIALIZED VIEW product_revenue_daily AS
SELECT
    p.product_id,
    p.name,
    DATE(oi.created_at) AS day,
    SUM(oi.quantity * oi.unit_price) AS revenue,
    COUNT(DISTINCT o.id) AS order_count
FROM order_item oi
JOIN product p ON p.product_id = oi.product_id
JOIN "order" o ON o.id = oi.order_id
WHERE o.status IN ('CONFIRMED', 'SHIPPED', 'DELIVERED')
GROUP BY p.product_id, p.name, DATE(oi.created_at);

CREATE UNIQUE INDEX ux_prd_pk ON product_revenue_daily (product_id, day);

Redis — когда latency критична. Подходит для данных, которые читаются на каждый запрос по ключу: например, текущий тарифный план клиента. Важное отличие от обычного кеша: read-model в Redis — основной источник ответа, а не резервный при промахе.

type CustomerPlanCache struct {
    rdb *redis.Client
}

func (c *CustomerPlanCache) Get(ctx context.Context, customerID string) (CustomerPlanDTO, error) {
    raw, err := c.rdb.Get(ctx, fmt.Sprintf("customer:%s:plan", customerID)).Bytes()
    if err != nil {
        return CustomerPlanDTO{}, fmt.Errorf("get plan %s: %w", customerID, err)
    }
    var plan CustomerPlanDTO
    if err := json.Unmarshal(raw, &plan); err != nil {
        return CustomerPlanDTO{}, fmt.Errorf("unmarshal plan: %w", err)
    }
    return plan, nil
}

ElasticSearch — для полнотекстового поиска и сложных фильтров с релевантностью. Когда нужны поиск по нескольким полям сразу, ранжирование результатов или фасетная фильтрация.

Интерфейс ViewRepository

Read-side в Go описывается отдельным интерфейсом — независимо от write-side OrderRepository. Это важно: разные интерфейсы можно развивать независимо, тестировать независимо, заменять реализацию независимо.

// core/order/port/view/order_view_repository.go
type OrderViewRepository interface {
    SummaryByID(ctx context.Context, orderID string) (viewdto.OrderSummaryDTO, error)
    ListByCustomer(ctx context.Context, customerID string, page Pagination) ([]viewdto.OrderSummaryDTO, error)
}
// core/order/dto/view/order_summary.go
type OrderSummaryDTO struct {
    OrderID      string
    CustomerID   string
    CustomerName string     // денормализовано из customer
    Status       string
    ItemCount    int
    TotalAmount  int64
    Currency     string
    CreatedAt    time.Time
    ConfirmedAt  *time.Time
}

DTO структурирован под нужды UI или API — не повторяет агрегат один в один. sqlc генерирует реализацию OrderViewRepository прямо из SQL-запросов.

Query-handler читает только через ViewRepository

Обработчик запроса (query-handler) работает в read-only транзакции. pgx.TxOptions{AccessMode: pgx.ReadOnly} — это не просто пометка: pgx упадёт с ошибкой, если в такой транзакции попытаться что-то записать.

func (h *GetOrderSummaryHandler) Handle(ctx context.Context, q query.GetOrderSummary) (view.OrderSummaryDTO, error) {
    tx, err := h.db.BeginTx(ctx, pgx.TxOptions{AccessMode: pgx.ReadOnly})
    if err != nil {
        return view.OrderSummaryDTO{}, fmt.Errorf("begin read tx: %w", err)
    }
    defer tx.Rollback(ctx)

    summary, err := h.views.SummaryByID(ctx, q.OrderID)
    if err != nil {
        return view.OrderSummaryDTO{}, fmt.Errorf("read order summary %s: %w", q.OrderID, err)
    }
    return summary, nil
}

Обновление через события, а не синхронно

Частая ошибка — обновлять read-model прямо внутри write-транзакции:

// так делать нельзя
tx.Exec(ctx, "UPDATE order SET status = $1 ...", status)
tx.Exec(ctx, "UPDATE order_summary SET status = $1 ...", status) // привязка write к read

Проблемы сразу несколько:

  • Откат write-транзакции не откатит уже применённое изменение в read-DB, если они в разных базах.
  • ALTER TABLE order_summary начинает блокировать write-транзакции — два хранилища переплетаются.
  • При разных базах синхронный sync требует двухфазной фиксации, что крайне сложно и нередко запрещено в архитектуре.

Правильный путь — через события. Command-handler сохраняет изменение и кладёт событие в таблицу outbox в одной транзакции. Отдельный relay читает outbox и публикует в Kafka. Consumer на read-side получает событие и обновляет read-model.

command-handler → сохраняет Order + кладёт OrderConfirmed в outbox (одна pgx.Tx)
outbox-relay → публикует событие в Kafka
read-side consumer → получает OrderConfirmed → UPDATE order_summary
// запись события в outbox — в той же транзакции, что и изменение агрегата
func (r *OrderOutboxRepository) Enqueue(ctx context.Context, tx pgx.Tx, evt OrderConfirmedEvent) error {
    payload, err := json.Marshal(evt)
    if err != nil {
        return fmt.Errorf("marshal event: %w", err)
    }
    _, err = tx.Exec(ctx,
        `INSERT INTO outbox (event_type, payload, created_at) VALUES ($1, $2, now())`,
        "order.confirmed", payload,
    )
    return err
}

Задержка между записью и обновлением read-model — обычно 100мс–1с. При проблемах с Kafka может быть больше. Это нужно декларировать на уровне API, чтобы UI не удивлялся.

Подробнее про доставку событий и idempotent consumer — в статье Sync через события.

Как восстановить read-model с нуля

Read-model — это проекция. Источник правды — write-side. Это значит, что read-model всегда можно восстановить, пройдясь по агрегатам и заново построив проекцию.

Для этого нужен отдельный скрипт (обычно cmd/rebuild/main.go), не связанный с основным HTTP-сервером.

func rebuildOrderSummaries(ctx context.Context, db *pgxpool.Pool, summaries OrderSummaryRepository) error {
    const batchSize = 500
    var lastID string

    for {
        orders, err := loadOrdersBatch(ctx, db, lastID, batchSize)
        if err != nil {
            return fmt.Errorf("load batch after %s: %w", lastID, err)
        }
        if len(orders) == 0 {
            break
        }
        for _, o := range orders {
            if err := summaries.Upsert(ctx, nil, toSummary(o)); err != nil {
                return fmt.Errorf("upsert summary for order %s: %w", o.ID, err)
            }
        }
        lastID = orders[len(orders)-1].ID
        slog.Info("rebuild progress", "last_id", lastID, "batch", len(orders))
    }
    return nil
}

Когда это нужно:

  • Восстановление после сбоя. Redis-кластер упал, read-таблица случайно удалена, произошла миграция в другое хранилище.
  • Подключение нового хранилища. Добавили ElasticSearch — он пустой; нужно загрузить существующие данные.
  • Изменение схемы read-model. Добавили поле в order_summary — старые записи его не имеют; rebuild дополнит.

Если такого скрипта нет, read-model де-факто становится источником правды — а это нарушает саму идею CQRS.

Частые ошибки

Бизнес-логика в read-таблице. CHECK-ограничения с бизнес-инвариантами ставят не на read-таблицу, а в агрегат. Read-model — только проекция данных, логики в ней нет.

Обратный поток данных. Поток всегда односторонний: write → события → read. Никогда не читают из read-model, чтобы принять бизнес-решение в write-side.

Единый репозиторий для read и write. При event-driven read-model нужен отдельный OrderViewRepository. Один OrderRepository для обеих сторон ломает разделение.

Коротко

  • Read-model — денормализованная проекция данных, оптимизированная под конкретный паттерн чтения. Один SELECT без JOIN.
  • Хранилище выбирают под задачу: PG-таблица для табличных запросов, materialized view для тяжёлых агрегаций, Redis для быстрого поиска по ключу, ElasticSearch для полнотекстового поиска.
  • Схема read-model независима от write-схемы. Денормализация — её главный инструмент.
  • Обновление — только через события (outbox + Kafka). Синхронный UPDATE в write-транзакции нарушает разделение и создаёт проблемы при откате.
  • Read-model всегда восстановима из write-side. Для этого нужен отдельный rebuild-скрипт.
  • Источник правды — write-side. Read-model — производная от него.

Что почитать дальше

  • Sync через события — как outbox + kafka-go доставляет события до read-model, idempotent consumer.
  • Query side — как query-handler читает из read-model через pgx.ReadOnly.
  • Command side — как command-handler пишет в outbox в той же pgx-транзакции.
  • Уровень и эволюция CQRS — когда переходить к event-driven read-model.