Опирается на правила:
R-CQRS-RM-1…R-CQRS-RM-4иR-CQRS-RM-X1…R-CQRS-RM-X3из CQRS Style Guide → раздел 4. Read-model.
Важно знать
- Read-model — данные в форме, удобной для чтения: денормализованные, pre-aggregated, индексированные под конкретный паттерн доступа.
- Хранилище выбирается под нагрузку: PG-таблица, materialized view, Redis (
go-redis/v9), ElasticSearch. Не одно «универсальное».- Schema read-model независима от write-схемы. Один атрибут может появиться в нескольких read-DTO под разными именами и в разных формах.
- Обновляется через события, не синхронно в write-транзакции.
pgx.TxOptions{AccessMode: pgx.ReadOnly}— только для чтения; запись идёт через outbox.- Read-model восстановима из write-side: отдельный rebuild-скрипт (
cmd/rebuild/main.go) проходит по агрегатам и заново строит проекцию.- Read-model — проекция, не источник правды. Source of truth — write-side агрегаты в PG.
- Никакой бизнес-логики в read-model (CHECK-инварианты бизнес-правил, PG-триггеры). Логика — в агрегатах.
- Никакого bidirectional sync. Поток: write → events → read. Обратно — никогда.
Read-model — денормализованное представление, в котором данные уже сложены так, как их хочет потребитель: один SELECT без join'ов возвращает готовый объект для UI / API. Цена — eventual consistency и дополнительная инфраструктура; выгода — порядки разницы в latency и пропускной способности. В Go контракт читающей стороны выражается через <X>ViewRepository-интерфейс с read-DTO; sqlc генерирует реализацию.
Где хранить read-model
R-CQRS-RM-1: выбор хранилища — функция от паттерна чтения.
| Паттерн чтения | Хранилище | Почему |
|---|---|---|
| Tabular query с пагинацией, фильтром, сортировкой | Денормализованная PG-таблица | Реляционка хорошо умеет такой workload, transactional sync через outbox |
| Тяжёлые aggregations (GROUP BY миллионов строк) | PG materialized view | Pre-computed, REFRESH CONCURRENTLY по расписанию или по событию |
| Key-lookup hot-keys (по ID, по короткому ключу) | Redis | Sub-millisecond latency, go-redis/v9 |
| Full-text search, multi-field фильтры с relevance | ElasticSearch / OpenSearch | Inverted index, ranking, faceted search |
PG-таблица — дефолт
Денормализованная таблица в той же СУБД — почти всегда первый шаг. Никакой новой инфраструктуры, sync через outbox + local consumer.
-- adapters/out/persistence/migration/V3__order_summary.sql
CREATE TABLE order_summary (
order_id uuid PRIMARY KEY,
customer_id uuid NOT NULL,
customer_name text NOT NULL,
status text NOT NULL,
item_count int NOT NULL,
total_amount bigint NOT NULL,
currency text NOT NULL,
created_at timestamptz NOT NULL,
confirmed_at timestamptz,
updated_at timestamptz NOT NULL,
version bigint NOT NULL DEFAULT 0
);
CREATE INDEX ix_os_customer ON order_summary (customer_id, created_at DESC);
CREATE INDEX ix_os_status_date ON order_summary (status, created_at DESC);
Поле version — для idempotent upsert в consumer'е (см. Sync через события).
OrderViewRepository — интерфейс порта
R-CQRS-TIER-3: на Уровне 3 появляется отдельный OrderViewRepository-интерфейс; write-сторона использует OrderRepository. sqlc генерирует реализации под каждый из них независимо.
// core/order/port/view/order_view_repository.go
package view
import (
"context"
viewdto "core/order/dto/view"
)
type OrderViewRepository interface {
SummaryByID(ctx context.Context, orderID string) (viewdto.OrderSummaryDTO, error)
ListByCustomer(ctx context.Context, customerID string, page Pagination) ([]viewdto.OrderSummaryDTO, error)
}
// core/order/dto/view/order_summary.go
package view
import "time"
type OrderSummaryDTO struct {
OrderID string
CustomerID string
CustomerName string
Status string
ItemCount int
TotalAmount int64
Currency string
CreatedAt time.Time
ConfirmedAt *time.Time
}
Read-DTO структурирован под UI/API — не повторяет агрегат. Поле CustomerName денормализовано из customer: в read-запросе join не нужен.
Query-handler читает через ViewRepository
// core/order/handler/get_order_summary_handler.go
package handler
import (
"context"
"fmt"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
"core/order/dto/view"
"core/order/port"
"core/order/query"
)
type GetOrderSummaryHandler struct {
views port.OrderViewRepository
db *pgxpool.Pool
}
func (h *GetOrderSummaryHandler) Handle(ctx context.Context, q query.GetOrderSummary) (view.OrderSummaryDTO, error) {
tx, err := h.db.BeginTx(ctx, pgx.TxOptions{AccessMode: pgx.ReadOnly})
if err != nil {
return view.OrderSummaryDTO{}, fmt.Errorf("begin read tx: %w", err)
}
defer tx.Rollback(ctx)
summary, err := h.views.SummaryByID(ctx, q.OrderID)
if err != nil {
return view.OrderSummaryDTO{}, fmt.Errorf("read order summary %s: %w", q.OrderID, err)
}
return summary, nil
}
pgx.TxOptions{AccessMode: pgx.ReadOnly} — pgx упадёт на попытке записи в read-only транзакции. Это enforcement: маркеры без механизма — карго-культ (R-CQRS-TIER-X1).
PG materialized view — для тяжёлых aggregations
Когда нужна сводка «оборот по продуктам за последний месяц» и пересчитывать её каждый раз слишком дорого:
CREATE MATERIALIZED VIEW product_revenue_daily AS
SELECT
p.product_id,
p.name,
DATE(oi.created_at) AS day,
SUM(oi.quantity * oi.unit_price) AS revenue,
COUNT(DISTINCT o.id) AS order_count
FROM order_item oi
JOIN product p ON p.product_id = oi.product_id
JOIN "order" o ON o.id = oi.order_id
WHERE o.status IN ('CONFIRMED', 'SHIPPED', 'DELIVERED')
GROUP BY p.product_id, p.name, DATE(oi.created_at);
CREATE UNIQUE INDEX ux_prd_pk ON product_revenue_daily (product_id, day);
Refresh через REFRESH MATERIALIZED VIEW CONCURRENTLY по расписанию (cron-job в Go) или по событию OrderConfirmed.
Redis — для hot-key lookup
Проекция Customer → ActiveSubscriptionPlan читается на каждый запрос; latency критична.
// adapters/out/cache/customer_plan_cache.go
package cache
import (
"context"
"encoding/json"
"fmt"
"time"
"github.com/redis/go-redis/v9"
)
type CustomerPlanCache struct {
rdb *redis.Client
}
func (c *CustomerPlanCache) Set(ctx context.Context, customerID string, plan CustomerPlanDTO) error {
payload, err := json.Marshal(plan)
if err != nil {
return fmt.Errorf("marshal plan: %w", err)
}
return c.rdb.Set(ctx, fmt.Sprintf("customer:%s:plan", customerID), payload, time.Hour).Err()
}
func (c *CustomerPlanCache) Get(ctx context.Context, customerID string) (CustomerPlanDTO, error) {
raw, err := c.rdb.Get(ctx, fmt.Sprintf("customer:%s:plan", customerID)).Bytes()
if err != nil {
return CustomerPlanDTO{}, fmt.Errorf("get plan %s: %w", customerID, err)
}
var plan CustomerPlanDTO
if err := json.Unmarshal(raw, &plan); err != nil {
return CustomerPlanDTO{}, fmt.Errorf("unmarshal plan: %w", err)
}
return plan, nil
}
Consumer на SubscriptionUpdated вызывает Set. Разница с обычным кешем: read-model в Redis — источник ответа, а не fallback при промахе кеша.
Schema независимая от write-стороны
R-CQRS-RM-2: read-схема может и должна отличаться от write-схемы. Денормализация — её главный инструмент.
write-схема: read-схема (order_summary):
order(id, customer_id, status) order_summary(
order_item(order_id, qty, price) order_id,
customer(id, name, email_hash) customer_name, ← из customer, без join
customer_email_hash,
status,
item_count, ← pre-computed из order_item
total_amount ← pre-computed
)
Что выигрывается:
- GET без JOIN.
SELECT * FROM order_summary WHERE order_id = $1вместо трёх join'ов. - Индексы под read-запросы. На write-таблице свои индексы (под
FOR UPDATE), на read-таблице свои. - sqlc-маппинг один-в-один.
sqlc generateдерётorder_summaryи выдаётOrderSummary— прямо DTO.
Цена: обновление customer_name должно приходить не только в customer, но и в order_summary через consumer. Это плата за быстрое чтение, и она обычно стоит того.
Обновление через события — eventual consistency
R-CQRS-RM-3: read-model не обновляется в write-транзакции. Только через outbox + segmentio/kafka-go + consumer.
1. command-handler меняет Order → сохраняет → enqueue OrderConfirmed в outbox (та же pgx.Tx)
2. outbox-relay (SKIP LOCKED loop) публикует событие в Kafka
3. read-side consumer (OrderSummaryConsumer) ловит OrderConfirmed
4. UPDATE order_summary SET status = 'CONFIRMED', confirmed_at = $1 WHERE order_id = $2
Latency — 100ms–1s в стационарном режиме. При деградации Kafka может быть больше — архитектурно ожидаемо. UI декларирует eventual (см. Sync через события).
// adapters/out/outbox/order_outbox_repository.go
package outbox
import (
"context"
"encoding/json"
"fmt"
"github.com/jackc/pgx/v5"
)
type OrderOutboxRepository struct{}
func (r *OrderOutboxRepository) Enqueue(ctx context.Context, tx pgx.Tx, evt OrderConfirmedEvent) error {
payload, err := json.Marshal(evt)
if err != nil {
return fmt.Errorf("marshal event: %w", err)
}
_, err = tx.Exec(ctx,
`INSERT INTO outbox (event_type, payload, created_at) VALUES ($1, $2, now())`,
"order.confirmed", payload,
)
return err
}
Почему не synchronous UPDATE в той же транзакции:
- Read-model теряет decoupling.
ALTER TABLE order_summaryблокирует write-транзакции. - Cross-DB synchronous sync невозможен без 2PC, который запрещён (см. Distributed Patterns).
- Rollback
orderв write-side не откатывает уже применённый UPDATE в read-DB.
Подробно — в Sync через события.
Read-model восстановима из write-side
R-CQRS-RM-4: для read-model должен существовать rebuild-скрипт, который проходит по агрегатам write-side и заново строит проекцию.
// cmd/rebuild/main.go — отдельная CLI-команда, не часть основного HTTP-сервера
package main
import (
"context"
"fmt"
"log/slog"
"github.com/jackc/pgx/v5/pgxpool"
)
func rebuildOrderSummaries(ctx context.Context, db *pgxpool.Pool, summaries OrderSummaryRepository) error {
const batchSize = 500
var lastID string
for {
orders, err := loadOrdersBatch(ctx, db, lastID, batchSize)
if err != nil {
return fmt.Errorf("load batch after %s: %w", lastID, err)
}
if len(orders) == 0 {
break
}
for _, o := range orders {
if err := summaries.Upsert(ctx, nil, toSummary(o)); err != nil {
return fmt.Errorf("upsert summary for order %s: %w", o.ID, err)
}
}
lastID = orders[len(orders)-1].ID
slog.Info("rebuild progress", "last_id", lastID, "batch", len(orders))
}
return nil
}
Rebuild пригождается в трёх сценариях:
- Disaster recovery. Read-model потеряна — отказ Redis-кластера, случайный drop, миграция в другое хранилище.
- Bootstrap нового read-store. Решили добавить ElasticSearch — он пустой; нужно загнать в него существующие данные.
- Структурная миграция read-схемы. Добавили поле в
order_summary— для старых записей оно пустое; rebuild дозаполнит.
Без rebuild-скрипта read-model де-факто становится первичным хранилищем — нарушение R-CQRS-RM-X2.
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
| CHECK-инвариант бизнес-правила в read-таблице | R-CQRS-RM-X1 | Инвариант в агрегате; read-model — только проекция |
| Read-model невосстановима из write-side | R-CQRS-RM-X2 | Source of truth — write-side; rebuild-скрипт обязателен |
| Bidirectional sync (read → write) | R-CQRS-RM-X3 | Одно направление: write → events → read |
Synchronous INSERT в order_summary внутри command-транзакции | R-CQRS-SYNC-X1 | outbox + kafka-go + consumer |
Payload события = sqlc-структура write-схемы (db.Order) | R-CQRS-SYNC-X3 | Явный event-struct с версионированием |
Единый OrderRepository для read и write при event-driven read-model | R-CQRS-TIER-X2 | Отдельный OrderViewRepository-интерфейс |
Куда дальше
- Sync через события — как outbox + kafka-go доставляет события до read-model, idempotent consumer.
- Query side — как query-handler читает из read-model через
pgx.ReadOnly. - Command side — как command-handler пишет в outbox в той же pgx-транзакции.
- Уровень и эволюция — когда переходить к event-driven read-model.
- Когда CQRS оправдан — стоимость full CQRS против lightweight на маркерах.