Опирается на правила:
R-CQRS-SYNC-1…R-CQRS-SYNC-5иR-CQRS-SYNC-X1…R-CQRS-SYNC-X3из CQRS контракта → раздел 5. Синхронизация через события.
Важно знать
- Sync write → read идёт через outbox + Kafka, не synchronously. Outbox-строка записывается в той же транзакции, что и изменение агрегата.
- Idempotent consumer обязателен: read-model UPDATE может прийти дважды. Защита —
processed_eventтаблица или idempotent UPDATE поversion.- Bootstrap-rebuild при первом запуске или потере read-model:
OnApplicationBootstrapпроходит по агрегатам батчами, не ждём пока придут события за прошедшие дни.- Eventual consistency декларируется в API.
@ApiOperation({ description })у read-projection endpoint явно пишет о задержке.- Read-your-writes при необходимости — sticky session на gateway, polling после commit или отдельный endpoint прямо из write-store.
- Никакого synchronous INSERT/UPDATE read-model в command-handler. Decoupling сразу теряется.
- Никаких PG-триггеров для sync — невидимая магия, ломается на bulk, не масштабируется cross-DB.
- Никаких schema-coupled events: payload — отдельный класс, не TypeORM-entity write-схемы.
Сердце CQRS — это не разделение моделей, а способ их связать. Если write и read физически разнесены, нужен надёжный механизм передачи изменений. Outbox + Kafka — стандартная связка; всё остальное в этой статье — детали её правильного применения в NestJS/TypeScript. Раскрытие раздела 5 CQRS-контракта.
Outbox-pattern: атомарность с агрегатом
R-CQRS-SYNC-1: write-handler регистрирует событие через order.registerEvent(...) внутри доменного метода; на orders.save(order) событие записывается в outbox-таблицу той же БД, в той же транзакции, что и изменение order.
COMMIT того, что изменилось:
1. UPDATE order SET status = 'CONFIRMED' WHERE id = $1
2. INSERT INTO outbox (event_type, payload, aggregate_id)
VALUES ('OrderConfirmed', '{...}', $1)
3. (атомарно — либо обе строки, либо ни одной)
Агрегат регистрирует событие внутри своего метода:
// core/order/domain/order.aggregate.ts
export class Order extends AggregateRoot {
confirm(now: Date): void {
if (this.status !== OrderStatus.NEW) {
throw new OrderAlreadyConfirmedError(this.id, this.status);
}
this.status = OrderStatus.CONFIRMED;
this.confirmedAt = now;
this.registerEvent(new OrderConfirmed(this.id, this.status, now));
}
}
Репозиторий при save записывает зарегистрированные события в outbox:
// adapters/out/persistence/typeorm-order.repository.ts
@Injectable()
export class TypeOrmOrderRepository implements OrderRepository {
constructor(
private readonly dataSource: DataSource,
@Inject(OUTBOX_WRITER) private readonly outbox: OutboxWriter,
) {}
async save(order: Order): Promise<void> {
const manager = this.dataSource.createEntityManager();
await manager.save(OrderOrmEntity, toOrmEntity(order));
for (const event of order.getUncommittedEvents()) {
await this.outbox.write(manager, event);
}
order.commit();
}
}
После commit:
- Outbox-relay — отдельный
@Cron-планировщик, который делаетSELECT ... FOR UPDATE SKIP LOCKED LIMIT 100изoutbox, публикует в Kafka, помечает как опубликованное. - Producer idempotent. Kafka гарантирует at-least-once с дедупом на уровне partition.
Зачем outbox, почему нельзя «после commit пушнуть в Kafka напрямую»:
- Commit прошёл, Kafka недоступна — событие потеряно, read-model рассинхронизируется.
- Commit failed, Kafka уже получила — phantom event, в БД нет соответствующего state.
- Outbox решает обе проблемы: пока строка в outbox — relay будет ретраить до успешной публикации.
Idempotent consumer
R-CQRS-SYNC-2: consumer read-стороны обязан быть идемпотентным. At-least-once в Kafka означает: одно сообщение может прийти дважды. Без защиты read-model становится несогласованной.
Два варианта защиты:
processed_event таблица
CREATE TABLE processed_event (
event_id UUID PRIMARY KEY,
consumer TEXT NOT NULL,
processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
// adapters/in/kafka/order-summary.projector.ts
@Injectable()
export class OrderSummaryProjector {
constructor(
private readonly dataSource: DataSource,
@Inject(ORDER_SUMMARY_REPOSITORY) private readonly summaries: OrderSummaryRepository,
@Inject(PROCESSED_EVENT_REPOSITORY) private readonly processedEvents: ProcessedEventRepository,
@Inject(TX_RUNNER) private readonly tx: TransactionRunner,
) {}
@EventPattern('order.events')
async onOrderConfirmed(@Payload() event: OrderConfirmedDto): Promise<void> {
await this.tx.run(async () => {
const isDuplicate = await this.processedEvents.exists(event.eventId, 'order-summary-projector');
if (isDuplicate) return;
await this.processedEvents.markProcessed(event.eventId, 'order-summary-projector');
await this.summaries.updateStatus(event.orderId, OrderStatus.CONFIRMED, event.confirmedAt);
});
}
}
Плюс: точная гарантия дедупа. Минус: отдельная таблица под нагрузкой записи.
Idempotent UPDATE по version
В read-таблице есть колонка version (или last_event_seq). UPDATE применяется только если событие новее текущей версии:
// adapters/in/kafka/order-summary.projector.ts
@EventPattern('order.events')
async onOrderConfirmed(@Payload() event: OrderConfirmedDto): Promise<void> {
const updated = await this.summaries.updateStatusIfNewer(
event.orderId,
OrderStatus.CONFIRMED,
event.confirmedAt,
event.aggregateVersion,
);
if (updated === 0) {
// событие устаревшее или дубликат — пропускаем
}
}
UPDATE order_summary
SET status = $1, confirmed_at = $2, version = $3, updated_at = NOW()
WHERE order_id = $4 AND version < $3
Плюс: не нужна отдельная таблица. Минус: требует aggregateVersion в каждом событии; подходит когда события одного агрегата строго упорядочены (одна партиция Kafka по aggregate_id).
Bootstrap и disaster recovery
R-CQRS-SYNC-3: при первом запуске нового read-store или после его потери — не ждём пока события придут из Kafka. Запускаем batch-rebuild.
// adapters/in/lifecycle/order-summary-bootstrap.ts
@Injectable()
export class OrderSummaryBootstrap implements OnApplicationBootstrap {
constructor(
@Inject(ORDER_SUMMARY_REPOSITORY) private readonly summaries: OrderSummaryRepository,
@Inject(ORDER_REPOSITORY) private readonly orders: OrderRepository,
) {}
async onApplicationBootstrap(): Promise<void> {
const isEmpty = await this.summaries.isEmpty();
if (!isEmpty) return;
this.logger.log('order_summary is empty — running bootstrap rebuild');
await this.rebuildAll();
}
private async rebuildAll(): Promise<void> {
let lastId = 0n;
while (true) {
const batch = await this.orders.findAllAfter(lastId, 1000);
if (batch.length === 0) break;
await this.summaries.upsertBatch(batch.map(o => toSummary(o)));
lastId = batch[batch.length - 1].id.value;
}
}
}
Не путать с обычной работой: bootstrap — разовый при создании read-store, не при каждом старте. Триггеры:
- Новый read-store (например, добавили Redis-кэш проекции — он пустой).
- Disaster recovery (read-store потерян, восстанавливаем из write-side).
- Структурная миграция (добавили колонку в
order_summary, нужно дозаполнить).
Тот же скрипт пригодится для ручного rebuild по одной записи, если consumer пропустил событие из-за бага.
Eventual consistency декларируется в API
R-CQRS-SYNC-4: если endpoint обслуживает read-model — это должно быть видно в OpenAPI. Клиент не должен догадываться о природе задержки.
// adapters/in/http/order.controller.ts
@Get(':id/summary')
@ApiOperation({
summary: 'Get order summary (read-projection)',
description:
'Возвращает read-проекцию заказа.\n\n' +
'Возможна задержка до 1 секунды между write-операцией и обновлением проекции.\n\n' +
'Для немедленной согласованности (например, сразу после POST /orders) ' +
'используйте GET /orders/:id (полный агрегат из write-store).',
})
@ApiOkResponse({ type: OrderSummaryDto })
async getSummary(@Param('id', ParseUUIDPipe) id: string): Promise<OrderSummaryDto> {
const result = await this.handler.execute(new GetOrderSummary(OrderId.of(id)));
if (!result) throw new NotFoundException();
return result;
}
Зачем:
- Клиент знает, что после
POST /ordersGET /orders/:id/summaryможет вернуть404или старое состояние. - Если нужна immediate consistency — он использует другой endpoint.
- В тестах и troubleshooting eventual consistency — задокументированное архитектурное свойство, не баг.
Read-your-writes — три механизма
R-CQRS-SYNC-5: иногда нужно гарантировать, что клиент после своего write сразу увидит этот write в read-проекции. Три варианта, в порядке возрастания инвазивности:
1. Sticky session на gateway
Запросы одного клиента приходят на тот же pod, который только что обработал его write. Если consumer в том же процессе обновил in-memory проекцию синхронно после commit — клиент увидит свежие данные.
Не работает cross-pod, cross-service. Хрупко.
2. Polling после commit
// core/order/command/create-order.handler.ts
async execute(cmd: CreateOrder): Promise<OrderId> {
const orderId = await this.tx.run(async () => {
const order = Order.create(cmd.customerId, cmd.items);
await this.orders.save(order);
return order.id;
});
// ждём появления в read-model до 2 секунд
await this.pollUntilVisible(orderId, 2000);
return orderId;
}
private async pollUntilVisible(id: OrderId, timeoutMs: number): Promise<void> {
const deadline = Date.now() + timeoutMs;
while (Date.now() < deadline) {
const found = await this.summaries.exists(id);
if (found) return;
await sleep(50);
}
this.logger.warn('read-model not visible after write within timeout', { id });
}
Минус: p99 latency POST /orders становится 2 секунды. Не подходит для частых write-операций.
3. Отдельный endpoint из write-store
Самое простое: для сценариев с требованием immediate consistency — отдельный endpoint, читающий прямо из write-store через агрегат.
@Get(':id')
@ApiOperation({ summary: 'Get order (immediate consistency, from write-store)' })
async getOrder(@Param('id', ParseUUIDPipe) id: string): Promise<OrderDto> { ... }
@Get(':id/summary')
@ApiOperation({ summary: 'Get order summary (eventual consistency, from read-projection)' })
async getSummary(@Param('id', ParseUUIDPipe) id: string): Promise<OrderSummaryDto> { ... }
В большинстве случаев — правильное решение: два endpoint-а явно показывают trade-off клиенту.
Пример сквозного потока: SberPay → read-model
1. POST /payments/charge
→ ChargeAccountHandler.execute(cmd)
→ account.charge(amount, key) // регистрирует AccountCharged в агрегате
→ accountRepository.save(account) // INSERT в outbox в той же TX
2. OutboxRelay (@Cron every 200ms)
→ SELECT ... FOR UPDATE SKIP LOCKED LIMIT 100
→ публикует AccountCharged в Kafka topic payment.events
3. AccountBalanceSummaryProjector
→ @EventPattern('payment.events')
→ idempotent UPDATE account_balance_summary WHERE account_id = $1 AND version < $2
4. GET /accounts/:id/balance-summary
→ читает из account_balance_summary
→ possible lag ~200ms–1s
Цепочка не меняется от домена: Order, Product, Customer — везде один и тот же шаблон.
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
| Synchronous UPDATE read-model в command-handler | R-CQRS-SYNC-X1 | Outbox + Kafka + consumer |
| PG-trigger sync write-таблицы → read-таблицы | R-CQRS-SYNC-X2 | Явный @EventPattern в NestJS |
| Event-payload — TypeORM entity write-схемы | R-CQRS-SYNC-X3 | Независимый DTO-класс с версионированием |
| Consumer без idempotency-защиты | R-CQRS-SYNC-2 | processed_event или idempotent UPDATE по version |
| Eventual consistency не задекларирована в API | R-CQRS-SYNC-4 | @ApiOperation({ description }) с явным описанием |
| Ожидание событий из Kafka при пустой read-model | R-CQRS-SYNC-3 | OnApplicationBootstrap batch-rebuild из write-store |
Куда дальше
- Command side — как outbox-событие регистрируется в агрегате и записывается при
save. - Query side — read-handler-ы с
Query<R>иViewRepository. - Read-model — где и в каком виде хранится проекция, как обеспечить восстановимость.
- Уровень и эволюция — lightweight vs full CQRS, эволюция по уровням.
- Когда CQRS оправдан — когда вводить CQRS, а когда нет.