Опирается на правила:
R-CQRS-RM-1…R-CQRS-RM-4иR-CQRS-RM-X1…R-CQRS-RM-X3из CQRS Style Guide → раздел 4. Read-model.
Важно знать
- Read-model — данные в форме, удобной для чтения: денормализованные, pre-aggregated, без join'ов на горячем пути.
- Хранилище выбирается под паттерн чтения: PG-таблица, Redis, ElasticSearch — не одно «универсальное».
- Schema read-model независима от write-схемы: один атрибут может присутствовать в нескольких read-DTO под разными именами.
- Читается через
<X>ViewRepository— raw select черезDataSource.query(), без транзакции и без блокировок (R-TYPEORM-QRY-4).- Обновляется через события (outbox → Kafka → consumer), не синхронно в write-транзакции (
R-CQRS-RM-3).- Read-model восстановима из write-side: при потере — rebuild-сервис проходит по агрегатам и заново строит проекцию (
R-CQRS-RM-4).- Source of truth — write-side агрегаты. Read-model — производная.
- Никакой бизнес-логики в read-model (CHECK-инварианты, триггеры). Никакого bidirectional sync (
R-CQRS-RM-X1,R-CQRS-RM-X3).
Read-model — денормализованное представление, в котором данные уже сложены так, как их хочет конечный потребитель: один SELECT без join'ов возвращает готовый объект для UI / API. Цена — eventual consistency и дополнительная инфраструктура; выгода — порядки разницы в latency и пропускной способности. Статья раскрывает раздел 4 CQRS Style Guide в идиомах NestJS / TypeScript.
Где хранить read-model
R-CQRS-RM-1: выбор хранилища — функция паттерна чтения, не предпочтений команды.
| Паттерн чтения | Хранилище | Почему |
|---|---|---|
| Tabular query с пагинацией, фильтром, сортировкой | Денормализованная PG-таблица | Реляционка хорошо умеет такой workload, sync через outbox |
| Тяжёлые aggregations (GROUP BY миллионов строк) | PG materialized view | Pre-computed, refresh по расписанию или событию |
| Key-lookup hot-keys (по ID, по короткому ключу) | Redis | Sub-millisecond latency, горизонтальное масштабирование |
| Full-text search, multi-field фильтры с relevance | ElasticSearch / OpenSearch | Inverted index, ranking, faceted search |
PG-таблица — дефолт
Денормализованная таблица в той же СУБД — почти всегда первый шаг. Никакой новой инфраструктуры, синхронизация через outbox.
CREATE TABLE order_summary (
order_id BIGINT PRIMARY KEY,
customer_id BIGINT NOT NULL,
customer_name TEXT NOT NULL,
customer_email TEXT NOT NULL,
status TEXT NOT NULL,
item_count INTEGER NOT NULL,
total_amount NUMERIC(19,4) NOT NULL,
currency TEXT NOT NULL,
created_at TIMESTAMPTZ NOT NULL,
confirmed_at TIMESTAMPTZ,
shipped_at TIMESTAMPTZ,
updated_at TIMESTAMPTZ NOT NULL,
version BIGINT NOT NULL DEFAULT 0
);
CREATE INDEX ix_os_customer ON order_summary (customer_id, created_at DESC);
CREATE INDEX ix_os_status_date ON order_summary (status, created_at DESC);
Поле version — для idempotent UPDATE в consumer'е (см. Sync через события).
PG materialized view — для тяжёлых aggregations
Сводка вроде «оборот по продуктам за месяц» пересчитывается редко, хранится заранее:
CREATE MATERIALIZED VIEW product_revenue_daily AS
SELECT
p.product_id,
p.name,
DATE(oi.created_at) AS day,
SUM(oi.quantity * oi.unit_price) AS revenue,
COUNT(DISTINCT o.id) AS order_count
FROM order_item oi
JOIN product p ON p.product_id = oi.product_id
JOIN "order" o ON o.id = oi.order_id
WHERE o.status IN ('CONFIRMED', 'SHIPPED', 'DELIVERED')
GROUP BY p.product_id, p.name, DATE(oi.created_at);
CREATE UNIQUE INDEX ux_prd_pk ON product_revenue_daily (product_id, day);
Refresh — REFRESH MATERIALIZED VIEW CONCURRENTLY по расписанию (cron job в NestJS через @nestjs/schedule) или по событию OrderConfirmed.
Redis — для hot-key lookup
Проекция Customer → ActiveSubscriptionPlan — читается на каждый запрос:
// adapters/out/persistence/redis-subscription-view.repository.ts
@Injectable()
export class RedisSubscriptionViewRepository implements SubscriptionViewRepository {
constructor(@Inject(REDIS_CLIENT) private readonly redis: Redis) {}
async findByCustomer(customerId: CustomerId): Promise<SubscriptionPlan | null> {
const raw = await this.redis.get(`customer:${customerId.value}:plan`);
return raw ? JSON.parse(raw) as SubscriptionPlan : null;
}
async upsert(customerId: CustomerId, plan: SubscriptionPlan): Promise<void> {
await this.redis.set(
`customer:${customerId.value}:plan`,
JSON.stringify(plan),
'EX', 3600,
);
}
}
Consumer на SubscriptionUpdated вызывает upsert. Разница с обычным кешем: read-model в Redis — источник ответа, не fallback.
ElasticSearch — для full-text search
Поиск по описаниям Product, фильтры по 20+ атрибутам с relevance:
// adapters/out/search/elasticsearch-product-view.repository.ts
@Injectable()
export class ElasticsearchProductViewRepository implements ProductSearchRepository {
constructor(@Inject(ES_CLIENT) private readonly es: Client) {}
async search(params: ProductSearchQuery): Promise<ProductSearchResult[]> {
const { hits } = await this.es.search({
index: 'products',
query: {
bool: {
must: [{ match: { name: params.q } }],
filter: [
...(params.minRating ? [{ range: { rating: { gte: params.minRating } } }] : []),
...(params.inStock ? [{ term: { in_stock: true } }] : []),
],
},
},
});
return hits.hits.map(h => toProductSearchResult(h._source));
}
}
Consumer'ы на ProductCreated, ProductPriceChanged, StockUpdated обновляют документ в индексе.
Schema независимая от write-стороны
R-CQRS-RM-2: read-схема и read-DTO продиктованы потребителем, не агрегатом.
write-схема: read-схема (order_summary):
order(id, customer_id, status) order_summary(
order_item(order_id, qty, price) order_id,
customer(id, name, email) customer_name, ← денормализовано из customer
customer_email, ← денормализовано из customer
status,
item_count ← pre-computed из order_item
)
В TypeScript read-DTO — readonly-класс или Object.freeze():
// core/order/port/out/view/order-summary.view.ts
export class OrderSummaryView {
constructor(
readonly orderId: string,
readonly customerId: string,
readonly customerName: string,
readonly status: string,
readonly itemCount: number,
readonly totalAmount: string,
readonly currency: string,
readonly createdAt: Date,
readonly confirmedAt: Date | null,
) {}
}
export function toOrderSummaryView(row: Record<string, unknown>): OrderSummaryView {
return new OrderSummaryView(
String(row['order_id']),
String(row['customer_id']),
String(row['customer_name']),
String(row['status']),
Number(row['item_count']),
String(row['total_amount']),
String(row['currency']),
new Date(row['created_at'] as string),
row['confirmed_at'] ? new Date(row['confirmed_at'] as string) : null,
);
}
ViewRepository — raw select без транзакции
R-CQRS-QRY-2, R-CQRS-TIER-3: read-side использует отдельный <X>ViewRepository, который читает через DataSource.query() без транзакции, без relations, без lock (R-TYPEORM-QRY-4).
// core/order/port/out/order-view.repository.ts
export interface OrderViewRepository {
summary(orderId: string): Promise<OrderSummaryView | null>;
listByCustomer(customerId: string, limit: number, offset: number): Promise<OrderSummaryView[]>;
}
export const ORDER_VIEW_REPOSITORY = Symbol('ORDER_VIEW_REPOSITORY');
// adapters/out/persistence/typeorm-order-view.repository.ts
@Injectable()
export class TypeOrmOrderViewRepository implements OrderViewRepository {
constructor(private readonly dataSource: DataSource) {}
async summary(orderId: string): Promise<OrderSummaryView | null> {
const rows = await this.dataSource.query(
`SELECT order_id, customer_id, customer_name, customer_email,
status, item_count, total_amount, currency,
created_at, confirmed_at
FROM order_summary
WHERE order_id = $1`,
[orderId],
);
return rows[0] ? toOrderSummaryView(rows[0]) : null;
}
async listByCustomer(customerId: string, limit: number, offset: number): Promise<OrderSummaryView[]> {
const rows = await this.dataSource.query(
`SELECT order_id, customer_id, customer_name, customer_email,
status, item_count, total_amount, currency,
created_at, confirmed_at
FROM order_summary
WHERE customer_id = $1
ORDER BY created_at DESC
LIMIT $2 OFFSET $3`,
[customerId, limit, offset],
);
return rows.map(toOrderSummaryView);
}
}
Query-handler инжектирует ORDER_VIEW_REPOSITORY, не вызывает доменных методов агрегата:
// core/order/usecase/get-order-summary.handler.ts
@Injectable()
export class GetOrderSummaryHandler implements Handler<GetOrderSummary, OrderSummaryView | null> {
constructor(
@Inject(ORDER_VIEW_REPOSITORY) private readonly orderView: OrderViewRepository,
) {}
async execute(query: GetOrderSummary): Promise<OrderSummaryView | null> {
return this.orderView.summary(query.orderId);
}
}
Обновление через события — eventual consistency
R-CQRS-RM-3: read-model обновляется только через outbox → Kafka → consumer. Никогда — в write-транзакции.
1. command-handler сохраняет Order, записывает OrderConfirmed → outbox-таблица (одна транзакция)
2. outbox-relay (SKIP LOCKED loop) публикует событие в Kafka
3. read-side consumer ловит OrderConfirmed
4. UPDATE order_summary SET status = 'CONFIRMED', confirmed_at = $1, version = version + 1
WHERE order_id = $2 AND version < $3
Latency в стационарном режиме — 100ms–1s. Это архитектурно ожидаемо: UI должен понимать eventual consistency.
Почему не synchronous UPDATE в той же транзакции:
- Read-model теряет decoupling.
ALTER TABLE order_summaryблокирует write-транзакции. - При rollback write-агрегата
order_summaryуже могла измениться (если consumer внешний), и это расхождение не откатить. - Cross-DB synchronous sync невозможен без 2PC, который запрещён.
Подробно — в Sync через события.
Read-model восстановима из write-side
R-CQRS-RM-4: для каждой read-model должен существовать скрипт rebuild'а, который проходит по write-side агрегатам и заново строит проекцию.
// core/order/service/order-summary-rebuilder.ts
@Injectable()
export class OrderSummaryRebuilder {
constructor(
@Inject(ORDER_REPOSITORY) private readonly orders: OrderRepository,
@Inject(ORDER_VIEW_REPOSITORY) private readonly orderView: OrderViewRepository,
) {}
async rebuildAll(): Promise<void> {
let lastId = 0n;
const batchSize = 500;
while (true) {
const batch = await this.orders.findAllAfter(lastId, batchSize);
if (batch.length === 0) break;
const rows = batch.map(order => this.toSummaryRow(order));
await this.orderView.upsertBatch(rows);
lastId = batch[batch.length - 1].id.value;
}
}
private toSummaryRow(order: Order): OrderSummaryUpsert {
return {
orderId: order.id.value.toString(),
customerId: order.customerId.value.toString(),
customerName: order.snapshot().customerName,
customerEmail: order.snapshot().customerEmail,
status: order.status,
itemCount: order.items.length,
totalAmount: order.totalAmount.amount.toFixed(4),
currency: order.totalAmount.currency,
createdAt: order.createdAt,
confirmedAt: order.confirmedAt ?? null,
updatedAt: new Date(),
version: 0n,
};
}
}
Rebuild-сервис применяется в трёх сценариях:
- Disaster recovery. Read-model потеряна (отказ Redis cluster, drop в ElasticSearch, миграция).
- Bootstrap нового read-store. Добавляется ElasticSearch — он пустой, надо загнать existing-данные.
- Структурная миграция read-схемы. Новое поле в
order_summary— для старых записей rebuild дозаполнит.
Без rebuild-скрипта read-model становится первичным хранилищем — что нарушает R-CQRS-RM-X2.
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
| CHECK-constraint бизнес-инварианта в read-таблице | R-CQRS-RM-X1 | Инвариант в агрегате; read-model — только проекция |
| Read-model как единственный источник правды (невосстановимая) | R-CQRS-RM-X2 | Source of truth — write-side; rebuild-скрипт обязателен |
| Bidirectional sync: read-handler пишет в write-side | R-CQRS-RM-X3 | Одно направление: write → events → read |
| Синхронный INSERT в read-таблицу внутри write-транзакции | R-CQRS-SYNC-X1 | Outbox + Kafka + consumer |
Грузить агрегат через основной <X>Repository и маппить в read-DTO | R-CQRS-QRY-X2 | Отдельный <X>ViewRepository с raw select |
| Event payload = TypeORM-Entity write-схемы (schema-coupled) | R-CQRS-SYNC-X3 | Версионированный event-contract, независимый от схемы БД |
Маркеры Query<R> без enforcement (handler пишет, использует транзакцию) | R-CQRS-TIER-X1 | Query-handler: только read, без транзакции, без write |
Куда дальше
- CQRS → раздел 4. Read-model — нормативные формулировки
R-CQRS-RM-*. - Sync через события — как outbox + Kafka доставляет события до read-model.
- Query side — как query-handler читает из read-model через
<X>ViewRepository. - Command side — что command-handler возвращает и почему не read-DTO.
- Уровень и эволюция — когда переходить от lightweight к event-driven read-model.
- Когда CQRS оправдан — критерии для полного CQRS vs lightweight маркеров.