В CQRS write-side и read-side разделены физически: одна база принимает команды, другая отвечает на запросы. Но разделить — только полдела. Нужно, чтобы изменения из write-side надёжно попадали в read-side. Это и есть задача синхронизации.
Самый популярный способ — события через брокер сообщений. Статья разбирает этот механизм на практике: как не потерять событие, как справляться с дублями и что делать, когда read-model нужно восстановить с нуля.
Почему нельзя просто отправить событие после сохранения
Первый импульс выглядит логично: сохранили агрегат → отправили событие в Kafka. Что может пойти не так?
Всё. Сохранение в БД прошло, а Kafka в этот момент недоступна — событие потеряно, read-model навсегда отстала. Или наоборот: Kafka получила событие, но транзакция в БД откатилась — в read-side появились данные, которых нет в write-side.
Оба сценария ломают согласованность между сторонами.
Outbox: атомарность с агрегатом
Решение — outbox-паттерн. Вместо прямой отправки в Kafka событие записывается в отдельную таблицу (outbox) в той же базе данных и в той же транзакции, что и изменение агрегата. Либо оба изменения применяются, либо ни одно.
COMMIT транзакции:
1. UPDATE orders SET status = 'CONFIRMED' WHERE id = $1
2. INSERT INTO outbox (event_type, payload, aggregate_id)
VALUES ('OrderConfirmed', '{...}', $1)
→ либо обе строки, либо ни одной
Агрегат регистрирует событие внутри своего метода:
// order.aggregate.ts
export class Order extends AggregateRoot {
confirm(now: Date): void {
if (this.status !== OrderStatus.NEW) {
throw new OrderAlreadyConfirmedError(this.id, this.status);
}
this.status = OrderStatus.CONFIRMED;
this.confirmedAt = now;
this.registerEvent(new OrderConfirmed(this.id, this.status, now));
}
}
Репозиторий при save записывает зарегистрированные события в outbox-таблицу:
// typeorm-order.repository.ts
@Injectable()
export class TypeOrmOrderRepository implements OrderRepository {
constructor(
private readonly dataSource: DataSource,
@Inject(OUTBOX_WRITER) private readonly outbox: OutboxWriter,
) {}
async save(order: Order): Promise<void> {
const manager = this.dataSource.createEntityManager();
await manager.save(OrderOrmEntity, toOrmEntity(order));
for (const event of order.getUncommittedEvents()) {
await this.outbox.write(manager, event);
}
order.commit();
}
}
Отдельный компонент — outbox-relay — периодически читает из outbox и публикует сообщения в Kafka. Пока публикация не прошла, строка остаётся в таблице, и relay продолжает попытки.
// outbox-relay — упрощённая логика:
// SELECT ... FOR UPDATE SKIP LOCKED LIMIT 100
// → публикуем в Kafka
// → помечаем как опубликованное
Idempotent consumer: защита от дублей
Kafka гарантирует доставку «хотя бы раз» (at-least-once). Это значит, что одно сообщение может прийти потребителю дважды: например, consumer обработал его, но упал до коммита offset'а.
Без защиты повторная обработка сломает read-model. Поэтому consumer должен быть идемпотентным — обрабатывать дублирующееся сообщение безопасно.
Два способа:
Таблица обработанных событий
Перед обновлением read-model проверяем, не обрабатывали ли мы уже это событие:
// order-summary.projector.ts
@Injectable()
export class OrderSummaryProjector {
@EventPattern('order.events')
async onOrderConfirmed(@Payload() event: OrderConfirmedDto): Promise<void> {
await this.tx.run(async () => {
const isDuplicate = await this.processedEvents.exists(
event.eventId,
'order-summary-projector',
);
if (isDuplicate) return;
await this.processedEvents.markProcessed(event.eventId, 'order-summary-projector');
await this.summaries.updateStatus(event.orderId, OrderStatus.CONFIRMED, event.confirmedAt);
});
}
}
Структура таблицы:
CREATE TABLE processed_event (
event_id UUID PRIMARY KEY,
consumer TEXT NOT NULL,
processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
Плюс: точная гарантия. Минус: лишняя запись при каждом событии.
Условный UPDATE по версии
Если в каждом событии есть aggregateVersion, можно использовать его как защиту:
@EventPattern('order.events')
async onOrderConfirmed(@Payload() event: OrderConfirmedDto): Promise<void> {
await this.summaries.updateStatusIfNewer(
event.orderId,
OrderStatus.CONFIRMED,
event.confirmedAt,
event.aggregateVersion,
);
}
UPDATE order_summary
SET status = $1, confirmed_at = $2, version = $3, updated_at = NOW()
WHERE order_id = $4 AND version < $3
Если событие уже применено или пришло старое — ничего не произойдёт. Плюс: не нужна отдельная таблица. Минус: нужен aggregateVersion в событии и порядок доставки по партиции Kafka.
Восстановление read-model с нуля
Иногда read-model нужно пересобрать: при первом запуске нового хранилища, после сбоя, при структурной миграции. Ждать, пока события придут из Kafka — не вариант: события могут быть старыми или их retention уже истёк.
Правильный подход — пакетное восстановление из write-store при старте приложения:
// order-summary-bootstrap.ts
@Injectable()
export class OrderSummaryBootstrap implements OnApplicationBootstrap {
private readonly logger = new Logger(OrderSummaryBootstrap.name);
constructor(
@Inject(ORDER_SUMMARY_REPOSITORY) private readonly summaries: OrderSummaryRepository,
@Inject(ORDER_REPOSITORY) private readonly orders: OrderRepository,
) {}
async onApplicationBootstrap(): Promise<void> {
const isEmpty = await this.summaries.isEmpty();
if (!isEmpty) return;
this.logger.log('order_summary пустая — запускаем восстановление');
await this.rebuildAll();
}
private async rebuildAll(): Promise<void> {
let lastId = 0n;
while (true) {
const batch = await this.orders.findAllAfter(lastId, 1000);
if (batch.length === 0) break;
await this.summaries.upsertBatch(batch.map(o => toSummary(o)));
lastId = batch[batch.length - 1].id.value;
}
}
}
Логика проверяет пустоту read-store и только тогда запускает обход. При следующих стартах, когда данные уже есть, инициализация пропускается.
Eventual consistency и API
Когда write и read разделены через брокер, между ними всегда есть небольшая задержка — обычно доли секунды. Клиент должен об этом знать, иначе он будет считать задержку ошибкой.
Описывайте это в OpenAPI-аннотации endpoint'а, который читает из проекции:
@Get(':id/summary')
@ApiOperation({
summary: 'Get order summary (read-projection)',
description:
'Возвращает read-проекцию заказа.\n\n' +
'Возможна задержка до 1 секунды между write-операцией и обновлением проекции.\n\n' +
'Для немедленной согласованности используйте GET /orders/:id — он читает из write-store.',
})
@ApiOkResponse({ type: OrderSummaryDto })
async getSummary(@Param('id', ParseUUIDPipe) id: string): Promise<OrderSummaryDto> {
const result = await this.handler.execute(new GetOrderSummary(OrderId.of(id)));
if (!result) throw new NotFoundException();
return result;
}
Явная документация помогает в тестах и при разборе проблем: задержка — намеренное архитектурное свойство системы, а не баг.
Read-your-writes: когда клиент хочет сразу увидеть своё изменение
Иногда требование жёстче: клиент отправил команду и тут же хочет увидеть результат в read-проекции. Три варианта, от простого к сложному:
Два endpoint'а. Самое чистое решение: один endpoint читает из write-store (немедленная согласованность), другой — из read-проекции (с возможной задержкой). Клиент выбирает нужный по ситуации.
@Get(':id')
@ApiOperation({ summary: 'Get order (из write-store, немедленная согласованность)' })
async getOrder(@Param('id', ParseUUIDPipe) id: string): Promise<OrderDto> { ... }
@Get(':id/summary')
@ApiOperation({ summary: 'Get order summary (из read-проекции, с возможной задержкой)' })
async getSummary(@Param('id', ParseUUIDPipe) id: string): Promise<OrderSummaryDto> { ... }
Ожидание в обработчике. После commit'а транзакции обработчик опрашивает read-store до появления записи или до таймаута. Подходит только когда read-your-writes действительно нужен и нагрузка невысока — p99 latency команды увеличивается на время ожидания.
Sticky routing на шлюзе. Запросы одного клиента направляются на один pod. Работает только если consumer в том же процессе обновляет проекцию синхронно, и ломается при масштабировании на несколько узлов.
В большинстве случаев подходит первый вариант.
Частые ошибки
Обновление read-model прямо в обработчике команды. Выглядит как удобный ярлык, но уничтожает смысл разделения: handler знает о read-стороне, они снова связаны.
PG-триггеры для синхронизации таблиц. Невидимы при просмотре кода, ломаются на массовых операциях, не работают если read-model в другой базе данных.
Событие — это TypeORM-сущность write-схемы. Любое изменение структуры write-стороны сразу ломает consumer. Событие — отдельный класс с собственным версионированием.
Как выглядит полный поток
1. POST /orders
→ CreateOrderHandler
→ order.create(...) // регистрирует OrderCreated в агрегате
→ orderRepository.save() // INSERT в outbox в той же транзакции
2. OutboxRelay (каждые ~200ms)
→ SELECT ... FOR UPDATE SKIP LOCKED LIMIT 100
→ публикует OrderCreated в Kafka topic order.events
3. OrderSummaryProjector
→ @EventPattern('order.events')
→ idempotent UPDATE order_summary
4. GET /orders/:id/summary
→ читает из order_summary
→ возможная задержка ~200ms–1s
Этот шаблон одинаков для любого агрегата в системе.
Коротко
- Прямая отправка в Kafka после сохранения ненадёжна: либо потеряете событие, либо получите «фантомное». Outbox решает обе проблемы — событие записывается в ту же транзакцию, что и изменение агрегата.
- Consumer должен быть идемпотентным: Kafka доставляет «хотя бы раз». Защита — либо таблица обработанных событий, либо условный UPDATE по версии.
- При пустой read-model не ждите Kafka: запускайте пакетное восстановление из write-store через
OnApplicationBootstrap. - Eventual consistency — задокументированное свойство, не баг. Укажите это в OpenAPI-описании endpoint'а.
- Если нужна немедленная согласованность — заведите отдельный endpoint, читающий из write-store.
Что почитать дальше
- Command side в NestJS — как outbox-событие регистрируется в агрегате и записывается при
save. - Query side в NestJS — read-handler'ы и ViewRepository.
- Когда CQRS оправдан — когда вводить CQRS, а когда нет.