Опирается на правила: R-CQRS-RM-1R-CQRS-RM-4 и R-CQRS-RM-X1R-CQRS-RM-X3 из CQRS Style Guide → раздел 4. Read-model.

Важно знать

  • Read-model — данные в форме, удобной для чтения: денормализованные, pre-aggregated, индексированные под конкретный паттерн доступа.
  • Хранилище выбирается под нагрузку: PG-таблица, materialized view, Redis (go-redis/v9), ElasticSearch. Не одно «универсальное».
  • Schema read-model независима от write-схемы. Один атрибут может появиться в нескольких read-DTO под разными именами и в разных формах.
  • Обновляется через события, не синхронно в write-транзакции. pgx.TxOptions{AccessMode: pgx.ReadOnly} — только для чтения; запись идёт через outbox.
  • Read-model восстановима из write-side: отдельный rebuild-скрипт (cmd/rebuild/main.go) проходит по агрегатам и заново строит проекцию.
  • Read-model — проекция, не источник правды. Source of truth — write-side агрегаты в PG.
  • Никакой бизнес-логики в read-model (CHECK-инварианты бизнес-правил, PG-триггеры). Логика — в агрегатах.
  • Никакого bidirectional sync. Поток: write → events → read. Обратно — никогда.

Read-model — денормализованное представление, в котором данные уже сложены так, как их хочет потребитель: один SELECT без join'ов возвращает готовый объект для UI / API. Цена — eventual consistency и дополнительная инфраструктура; выгода — порядки разницы в latency и пропускной способности. В Go контракт читающей стороны выражается через <X>ViewRepository-интерфейс с read-DTO; sqlc генерирует реализацию.

Где хранить read-model

R-CQRS-RM-1: выбор хранилища — функция от паттерна чтения.

Паттерн чтенияХранилищеПочему
Tabular query с пагинацией, фильтром, сортировкойДенормализованная PG-таблицаРеляционка хорошо умеет такой workload, transactional sync через outbox
Тяжёлые aggregations (GROUP BY миллионов строк)PG materialized viewPre-computed, REFRESH CONCURRENTLY по расписанию или по событию
Key-lookup hot-keys (по ID, по короткому ключу)RedisSub-millisecond latency, go-redis/v9
Full-text search, multi-field фильтры с relevanceElasticSearch / OpenSearchInverted index, ranking, faceted search

PG-таблица — дефолт

Денормализованная таблица в той же СУБД — почти всегда первый шаг. Никакой новой инфраструктуры, sync через outbox + local consumer.

-- adapters/out/persistence/migration/V3__order_summary.sql
CREATE TABLE order_summary (
    order_id       uuid PRIMARY KEY,
    customer_id    uuid NOT NULL,
    customer_name  text NOT NULL,
    status         text NOT NULL,
    item_count     int  NOT NULL,
    total_amount   bigint NOT NULL,
    currency       text NOT NULL,
    created_at     timestamptz NOT NULL,
    confirmed_at   timestamptz,
    updated_at     timestamptz NOT NULL,
    version        bigint NOT NULL DEFAULT 0
);
CREATE INDEX ix_os_customer    ON order_summary (customer_id, created_at DESC);
CREATE INDEX ix_os_status_date ON order_summary (status, created_at DESC);

Поле version — для idempotent upsert в consumer'е (см. Sync через события).

OrderViewRepository — интерфейс порта

R-CQRS-TIER-3: на Уровне 3 появляется отдельный OrderViewRepository-интерфейс; write-сторона использует OrderRepository. sqlc генерирует реализации под каждый из них независимо.

// core/order/port/view/order_view_repository.go
package view

import (
    "context"
    viewdto "core/order/dto/view"
)

type OrderViewRepository interface {
    SummaryByID(ctx context.Context, orderID string) (viewdto.OrderSummaryDTO, error)
    ListByCustomer(ctx context.Context, customerID string, page Pagination) ([]viewdto.OrderSummaryDTO, error)
}
// core/order/dto/view/order_summary.go
package view

import "time"

type OrderSummaryDTO struct {
    OrderID      string
    CustomerID   string
    CustomerName string
    Status       string
    ItemCount    int
    TotalAmount  int64
    Currency     string
    CreatedAt    time.Time
    ConfirmedAt  *time.Time
}

Read-DTO структурирован под UI/API — не повторяет агрегат. Поле CustomerName денормализовано из customer: в read-запросе join не нужен.

Query-handler читает через ViewRepository

// core/order/handler/get_order_summary_handler.go
package handler

import (
    "context"
    "fmt"

    "github.com/jackc/pgx/v5"
    "github.com/jackc/pgx/v5/pgxpool"

    "core/order/dto/view"
    "core/order/port"
    "core/order/query"
)

type GetOrderSummaryHandler struct {
    views port.OrderViewRepository
    db    *pgxpool.Pool
}

func (h *GetOrderSummaryHandler) Handle(ctx context.Context, q query.GetOrderSummary) (view.OrderSummaryDTO, error) {
    tx, err := h.db.BeginTx(ctx, pgx.TxOptions{AccessMode: pgx.ReadOnly})
    if err != nil {
        return view.OrderSummaryDTO{}, fmt.Errorf("begin read tx: %w", err)
    }
    defer tx.Rollback(ctx)

    summary, err := h.views.SummaryByID(ctx, q.OrderID)
    if err != nil {
        return view.OrderSummaryDTO{}, fmt.Errorf("read order summary %s: %w", q.OrderID, err)
    }
    return summary, nil
}

pgx.TxOptions{AccessMode: pgx.ReadOnly} — pgx упадёт на попытке записи в read-only транзакции. Это enforcement: маркеры без механизма — карго-культ (R-CQRS-TIER-X1).

PG materialized view — для тяжёлых aggregations

Когда нужна сводка «оборот по продуктам за последний месяц» и пересчитывать её каждый раз слишком дорого:

CREATE MATERIALIZED VIEW product_revenue_daily AS
SELECT
    p.product_id,
    p.name,
    DATE(oi.created_at) AS day,
    SUM(oi.quantity * oi.unit_price) AS revenue,
    COUNT(DISTINCT o.id) AS order_count
FROM order_item oi
JOIN product p ON p.product_id = oi.product_id
JOIN "order" o ON o.id = oi.order_id
WHERE o.status IN ('CONFIRMED', 'SHIPPED', 'DELIVERED')
GROUP BY p.product_id, p.name, DATE(oi.created_at);

CREATE UNIQUE INDEX ux_prd_pk ON product_revenue_daily (product_id, day);

Refresh через REFRESH MATERIALIZED VIEW CONCURRENTLY по расписанию (cron-job в Go) или по событию OrderConfirmed.

Redis — для hot-key lookup

Проекция Customer → ActiveSubscriptionPlan читается на каждый запрос; latency критична.

// adapters/out/cache/customer_plan_cache.go
package cache

import (
    "context"
    "encoding/json"
    "fmt"
    "time"

    "github.com/redis/go-redis/v9"
)

type CustomerPlanCache struct {
    rdb *redis.Client
}

func (c *CustomerPlanCache) Set(ctx context.Context, customerID string, plan CustomerPlanDTO) error {
    payload, err := json.Marshal(plan)
    if err != nil {
        return fmt.Errorf("marshal plan: %w", err)
    }
    return c.rdb.Set(ctx, fmt.Sprintf("customer:%s:plan", customerID), payload, time.Hour).Err()
}

func (c *CustomerPlanCache) Get(ctx context.Context, customerID string) (CustomerPlanDTO, error) {
    raw, err := c.rdb.Get(ctx, fmt.Sprintf("customer:%s:plan", customerID)).Bytes()
    if err != nil {
        return CustomerPlanDTO{}, fmt.Errorf("get plan %s: %w", customerID, err)
    }
    var plan CustomerPlanDTO
    if err := json.Unmarshal(raw, &plan); err != nil {
        return CustomerPlanDTO{}, fmt.Errorf("unmarshal plan: %w", err)
    }
    return plan, nil
}

Consumer на SubscriptionUpdated вызывает Set. Разница с обычным кешем: read-model в Redis — источник ответа, а не fallback при промахе кеша.

Schema независимая от write-стороны

R-CQRS-RM-2: read-схема может и должна отличаться от write-схемы. Денормализация — её главный инструмент.

write-схема:                          read-схема (order_summary):
  order(id, customer_id, status)        order_summary(
  order_item(order_id, qty, price)         order_id,
  customer(id, name, email_hash)           customer_name,   ← из customer, без join
                                           customer_email_hash,
                                           status,
                                           item_count,      ← pre-computed из order_item
                                           total_amount     ← pre-computed
                                       )

Что выигрывается:

  • GET без JOIN. SELECT * FROM order_summary WHERE order_id = $1 вместо трёх join'ов.
  • Индексы под read-запросы. На write-таблице свои индексы (под FOR UPDATE), на read-таблице свои.
  • sqlc-маппинг один-в-один. sqlc generate дерёт order_summary и выдаёт OrderSummary — прямо DTO.

Цена: обновление customer_name должно приходить не только в customer, но и в order_summary через consumer. Это плата за быстрое чтение, и она обычно стоит того.

Обновление через события — eventual consistency

R-CQRS-RM-3: read-model не обновляется в write-транзакции. Только через outbox + segmentio/kafka-go + consumer.

1. command-handler меняет Order → сохраняет → enqueue OrderConfirmed в outbox (та же pgx.Tx)
2. outbox-relay (SKIP LOCKED loop) публикует событие в Kafka
3. read-side consumer (OrderSummaryConsumer) ловит OrderConfirmed
4. UPDATE order_summary SET status = 'CONFIRMED', confirmed_at = $1 WHERE order_id = $2

Latency — 100ms–1s в стационарном режиме. При деградации Kafka может быть больше — архитектурно ожидаемо. UI декларирует eventual (см. Sync через события).

// adapters/out/outbox/order_outbox_repository.go
package outbox

import (
    "context"
    "encoding/json"
    "fmt"

    "github.com/jackc/pgx/v5"
)

type OrderOutboxRepository struct{}

func (r *OrderOutboxRepository) Enqueue(ctx context.Context, tx pgx.Tx, evt OrderConfirmedEvent) error {
    payload, err := json.Marshal(evt)
    if err != nil {
        return fmt.Errorf("marshal event: %w", err)
    }
    _, err = tx.Exec(ctx,
        `INSERT INTO outbox (event_type, payload, created_at) VALUES ($1, $2, now())`,
        "order.confirmed", payload,
    )
    return err
}

Почему не synchronous UPDATE в той же транзакции:

  • Read-model теряет decoupling. ALTER TABLE order_summary блокирует write-транзакции.
  • Cross-DB synchronous sync невозможен без 2PC, который запрещён (см. Distributed Patterns).
  • Rollback order в write-side не откатывает уже применённый UPDATE в read-DB.

Подробно — в Sync через события.

Read-model восстановима из write-side

R-CQRS-RM-4: для read-model должен существовать rebuild-скрипт, который проходит по агрегатам write-side и заново строит проекцию.

// cmd/rebuild/main.go — отдельная CLI-команда, не часть основного HTTP-сервера
package main

import (
    "context"
    "fmt"
    "log/slog"

    "github.com/jackc/pgx/v5/pgxpool"
)

func rebuildOrderSummaries(ctx context.Context, db *pgxpool.Pool, summaries OrderSummaryRepository) error {
    const batchSize = 500
    var lastID string

    for {
        orders, err := loadOrdersBatch(ctx, db, lastID, batchSize)
        if err != nil {
            return fmt.Errorf("load batch after %s: %w", lastID, err)
        }
        if len(orders) == 0 {
            break
        }
        for _, o := range orders {
            if err := summaries.Upsert(ctx, nil, toSummary(o)); err != nil {
                return fmt.Errorf("upsert summary for order %s: %w", o.ID, err)
            }
        }
        lastID = orders[len(orders)-1].ID
        slog.Info("rebuild progress", "last_id", lastID, "batch", len(orders))
    }
    return nil
}

Rebuild пригождается в трёх сценариях:

  1. Disaster recovery. Read-model потеряна — отказ Redis-кластера, случайный drop, миграция в другое хранилище.
  2. Bootstrap нового read-store. Решили добавить ElasticSearch — он пустой; нужно загнать в него существующие данные.
  3. Структурная миграция read-схемы. Добавили поле в order_summary — для старых записей оно пустое; rebuild дозаполнит.

Без rebuild-скрипта read-model де-факто становится первичным хранилищем — нарушение R-CQRS-RM-X2.

Что запрещено

АнтипаттернПравилоЧто взамен
CHECK-инвариант бизнес-правила в read-таблицеR-CQRS-RM-X1Инвариант в агрегате; read-model — только проекция
Read-model невосстановима из write-sideR-CQRS-RM-X2Source of truth — write-side; rebuild-скрипт обязателен
Bidirectional sync (read → write)R-CQRS-RM-X3Одно направление: write → events → read
Synchronous INSERT в order_summary внутри command-транзакцииR-CQRS-SYNC-X1outbox + kafka-go + consumer
Payload события = sqlc-структура write-схемы (db.Order)R-CQRS-SYNC-X3Явный event-struct с версионированием
Единый OrderRepository для read и write при event-driven read-modelR-CQRS-TIER-X2Отдельный OrderViewRepository-интерфейс

Куда дальше

  • Sync через события — как outbox + kafka-go доставляет события до read-model, idempotent consumer.
  • Query side — как query-handler читает из read-model через pgx.ReadOnly.
  • Command side — как command-handler пишет в outbox в той же pgx-транзакции.
  • Уровень и эволюция — когда переходить к event-driven read-model.
  • Когда CQRS оправдан — стоимость full CQRS против lightweight на маркерах.