В обычном приложении один и тот же код читает и пишет данные. Сначала всё просто, но потом возникает напряжение: запрос на запись требует одних индексов, запрос на чтение — других. JOIN для записи и JOIN для чтения начинают мешать друг другу. Таблица растёт, и ни одна операция не работает оптимально.
CQRS решает это разделением: данные хранятся в форме, удобной для записи (агрегаты, нормализованная схема), а для чтения создаётся отдельное представление — read-model. Это заранее подготовленная, денормализованная проекция: один SELECT без JOIN возвращает готовый объект для UI или API.
В Go контракт читающей стороны выражается через интерфейс <X>ViewRepository; реализацию под него генерирует sqlc.
Почему данные для чтения выглядят иначе
Возьмём заказ. В write-схеме он разложен по нескольким таблицам:
order(id, customer_id, status)
order_item(order_id, qty, price)
customer(id, name, email_hash)
Чтобы показать карточку заказа пользователю, нужен JOIN трёх таблиц. На тысячах запросов в секунду это ощутимо.
Read-model складывает всё это в одну таблицу заранее:
order_summary(
order_id,
customer_name, ← уже вставлено из customer, join не нужен
customer_email_hash,
status,
item_count, ← посчитано из order_item заранее
total_amount ← тоже посчитано
)
Теперь SELECT * FROM order_summary WHERE order_id = $1 — и готово. Никаких join'ов.
Цена — данные не всегда актуальны мгновенно: между записью в write-side и обновлением read-model проходит короткое время (обычно 100мс–1с). Это называется итоговой согласованностью (eventual consistency). В большинстве случаев это приемлемо.
Где хранить read-model
Хранилище выбирают под паттерн чтения, а не «одно универсальное».
Денормализованная PG-таблица — почти всегда первый вариант. Работает со стандартной реляционной базой, которая уже есть в проекте. Хорошо для табличных запросов с пагинацией, фильтром и сортировкой.
-- migration: создаём read-таблицу для заказов
CREATE TABLE order_summary (
order_id uuid PRIMARY KEY,
customer_id uuid NOT NULL,
customer_name text NOT NULL,
status text NOT NULL,
item_count int NOT NULL,
total_amount bigint NOT NULL,
currency text NOT NULL,
created_at timestamptz NOT NULL,
confirmed_at timestamptz,
updated_at timestamptz NOT NULL,
version bigint NOT NULL DEFAULT 0
);
CREATE INDEX ix_os_customer ON order_summary (customer_id, created_at DESC);
CREATE INDEX ix_os_status_date ON order_summary (status, created_at DESC);
Поле version нужно для того, чтобы обработка повторного события не затёрла более свежие данные.
PG materialized view — когда нужна тяжёлая агрегация, которую пересчитывать на лету слишком дорого. Например, выручка по продуктам за последний месяц. Обновляется командой REFRESH MATERIALIZED VIEW CONCURRENTLY — либо по расписанию, либо по событию.
CREATE MATERIALIZED VIEW product_revenue_daily AS
SELECT
p.product_id,
p.name,
DATE(oi.created_at) AS day,
SUM(oi.quantity * oi.unit_price) AS revenue,
COUNT(DISTINCT o.id) AS order_count
FROM order_item oi
JOIN product p ON p.product_id = oi.product_id
JOIN "order" o ON o.id = oi.order_id
WHERE o.status IN ('CONFIRMED', 'SHIPPED', 'DELIVERED')
GROUP BY p.product_id, p.name, DATE(oi.created_at);
CREATE UNIQUE INDEX ux_prd_pk ON product_revenue_daily (product_id, day);
Redis — когда latency критична. Подходит для данных, которые читаются на каждый запрос по ключу: например, текущий тарифный план клиента. Важное отличие от обычного кеша: read-model в Redis — основной источник ответа, а не резервный при промахе.
type CustomerPlanCache struct {
rdb *redis.Client
}
func (c *CustomerPlanCache) Get(ctx context.Context, customerID string) (CustomerPlanDTO, error) {
raw, err := c.rdb.Get(ctx, fmt.Sprintf("customer:%s:plan", customerID)).Bytes()
if err != nil {
return CustomerPlanDTO{}, fmt.Errorf("get plan %s: %w", customerID, err)
}
var plan CustomerPlanDTO
if err := json.Unmarshal(raw, &plan); err != nil {
return CustomerPlanDTO{}, fmt.Errorf("unmarshal plan: %w", err)
}
return plan, nil
}
ElasticSearch — для полнотекстового поиска и сложных фильтров с релевантностью. Когда нужны поиск по нескольким полям сразу, ранжирование результатов или фасетная фильтрация.
Интерфейс ViewRepository
Read-side в Go описывается отдельным интерфейсом — независимо от write-side OrderRepository. Это важно: разные интерфейсы можно развивать независимо, тестировать независимо, заменять реализацию независимо.
// core/order/port/view/order_view_repository.go
type OrderViewRepository interface {
SummaryByID(ctx context.Context, orderID string) (viewdto.OrderSummaryDTO, error)
ListByCustomer(ctx context.Context, customerID string, page Pagination) ([]viewdto.OrderSummaryDTO, error)
}
// core/order/dto/view/order_summary.go
type OrderSummaryDTO struct {
OrderID string
CustomerID string
CustomerName string // денормализовано из customer
Status string
ItemCount int
TotalAmount int64
Currency string
CreatedAt time.Time
ConfirmedAt *time.Time
}
DTO структурирован под нужды UI или API — не повторяет агрегат один в один. sqlc генерирует реализацию OrderViewRepository прямо из SQL-запросов.
Query-handler читает только через ViewRepository
Обработчик запроса (query-handler) работает в read-only транзакции. pgx.TxOptions{AccessMode: pgx.ReadOnly} — это не просто пометка: pgx упадёт с ошибкой, если в такой транзакции попытаться что-то записать.
func (h *GetOrderSummaryHandler) Handle(ctx context.Context, q query.GetOrderSummary) (view.OrderSummaryDTO, error) {
tx, err := h.db.BeginTx(ctx, pgx.TxOptions{AccessMode: pgx.ReadOnly})
if err != nil {
return view.OrderSummaryDTO{}, fmt.Errorf("begin read tx: %w", err)
}
defer tx.Rollback(ctx)
summary, err := h.views.SummaryByID(ctx, q.OrderID)
if err != nil {
return view.OrderSummaryDTO{}, fmt.Errorf("read order summary %s: %w", q.OrderID, err)
}
return summary, nil
}
Обновление через события, а не синхронно
Частая ошибка — обновлять read-model прямо внутри write-транзакции:
// так делать нельзя
tx.Exec(ctx, "UPDATE order SET status = $1 ...", status)
tx.Exec(ctx, "UPDATE order_summary SET status = $1 ...", status) // привязка write к read
Проблемы сразу несколько:
- Откат write-транзакции не откатит уже применённое изменение в read-DB, если они в разных базах.
ALTER TABLE order_summaryначинает блокировать write-транзакции — два хранилища переплетаются.- При разных базах синхронный sync требует двухфазной фиксации, что крайне сложно и нередко запрещено в архитектуре.
Правильный путь — через события. Command-handler сохраняет изменение и кладёт событие в таблицу outbox в одной транзакции. Отдельный relay читает outbox и публикует в Kafka. Consumer на read-side получает событие и обновляет read-model.
command-handler → сохраняет Order + кладёт OrderConfirmed в outbox (одна pgx.Tx)
outbox-relay → публикует событие в Kafka
read-side consumer → получает OrderConfirmed → UPDATE order_summary
// запись события в outbox — в той же транзакции, что и изменение агрегата
func (r *OrderOutboxRepository) Enqueue(ctx context.Context, tx pgx.Tx, evt OrderConfirmedEvent) error {
payload, err := json.Marshal(evt)
if err != nil {
return fmt.Errorf("marshal event: %w", err)
}
_, err = tx.Exec(ctx,
`INSERT INTO outbox (event_type, payload, created_at) VALUES ($1, $2, now())`,
"order.confirmed", payload,
)
return err
}
Задержка между записью и обновлением read-model — обычно 100мс–1с. При проблемах с Kafka может быть больше. Это нужно декларировать на уровне API, чтобы UI не удивлялся.
Подробнее про доставку событий и idempotent consumer — в статье Sync через события.
Как восстановить read-model с нуля
Read-model — это проекция. Источник правды — write-side. Это значит, что read-model всегда можно восстановить, пройдясь по агрегатам и заново построив проекцию.
Для этого нужен отдельный скрипт (обычно cmd/rebuild/main.go), не связанный с основным HTTP-сервером.
func rebuildOrderSummaries(ctx context.Context, db *pgxpool.Pool, summaries OrderSummaryRepository) error {
const batchSize = 500
var lastID string
for {
orders, err := loadOrdersBatch(ctx, db, lastID, batchSize)
if err != nil {
return fmt.Errorf("load batch after %s: %w", lastID, err)
}
if len(orders) == 0 {
break
}
for _, o := range orders {
if err := summaries.Upsert(ctx, nil, toSummary(o)); err != nil {
return fmt.Errorf("upsert summary for order %s: %w", o.ID, err)
}
}
lastID = orders[len(orders)-1].ID
slog.Info("rebuild progress", "last_id", lastID, "batch", len(orders))
}
return nil
}
Когда это нужно:
- Восстановление после сбоя. Redis-кластер упал, read-таблица случайно удалена, произошла миграция в другое хранилище.
- Подключение нового хранилища. Добавили ElasticSearch — он пустой; нужно загрузить существующие данные.
- Изменение схемы read-model. Добавили поле в
order_summary— старые записи его не имеют; rebuild дополнит.
Если такого скрипта нет, read-model де-факто становится источником правды — а это нарушает саму идею CQRS.
Частые ошибки
Бизнес-логика в read-таблице. CHECK-ограничения с бизнес-инвариантами ставят не на read-таблицу, а в агрегат. Read-model — только проекция данных, логики в ней нет.
Обратный поток данных. Поток всегда односторонний: write → события → read. Никогда не читают из read-model, чтобы принять бизнес-решение в write-side.
Единый репозиторий для read и write. При event-driven read-model нужен отдельный OrderViewRepository. Один OrderRepository для обеих сторон ломает разделение.
Коротко
- Read-model — денормализованная проекция данных, оптимизированная под конкретный паттерн чтения. Один SELECT без JOIN.
- Хранилище выбирают под задачу: PG-таблица для табличных запросов, materialized view для тяжёлых агрегаций, Redis для быстрого поиска по ключу, ElasticSearch для полнотекстового поиска.
- Схема read-model независима от write-схемы. Денормализация — её главный инструмент.
- Обновление — только через события (outbox + Kafka). Синхронный UPDATE в write-транзакции нарушает разделение и создаёт проблемы при откате.
- Read-model всегда восстановима из write-side. Для этого нужен отдельный rebuild-скрипт.
- Источник правды — write-side. Read-model — производная от него.
Что почитать дальше
- Sync через события — как outbox + kafka-go доставляет события до read-model, idempotent consumer.
- Query side — как query-handler читает из read-model через pgx.ReadOnly.
- Command side — как command-handler пишет в outbox в той же pgx-транзакции.
- Уровень и эволюция CQRS — когда переходить к event-driven read-model.