Опирается на правила: R-CQRS-SYNC-1R-CQRS-SYNC-5 и R-CQRS-SYNC-X1R-CQRS-SYNC-X3 из CQRS контракта → раздел 5. Синхронизация через события.

Важно знать

  • Sync write → read идёт через outbox + Kafka, не synchronously. Outbox-строка записывается в той же транзакции, что и изменение агрегата.
  • Idempotent consumer обязателен: read-model UPDATE может прийти дважды. Защита — processed_event таблица или idempotent UPDATE по version.
  • Bootstrap-rebuild при первом запуске или потере read-model: OnApplicationBootstrap проходит по агрегатам батчами, не ждём пока придут события за прошедшие дни.
  • Eventual consistency декларируется в API. @ApiOperation({ description }) у read-projection endpoint явно пишет о задержке.
  • Read-your-writes при необходимости — sticky session на gateway, polling после commit или отдельный endpoint прямо из write-store.
  • Никакого synchronous INSERT/UPDATE read-model в command-handler. Decoupling сразу теряется.
  • Никаких PG-триггеров для sync — невидимая магия, ломается на bulk, не масштабируется cross-DB.
  • Никаких schema-coupled events: payload — отдельный класс, не TypeORM-entity write-схемы.

Сердце CQRS — это не разделение моделей, а способ их связать. Если write и read физически разнесены, нужен надёжный механизм передачи изменений. Outbox + Kafka — стандартная связка; всё остальное в этой статье — детали её правильного применения в NestJS/TypeScript. Раскрытие раздела 5 CQRS-контракта.

Outbox-pattern: атомарность с агрегатом

R-CQRS-SYNC-1: write-handler регистрирует событие через order.registerEvent(...) внутри доменного метода; на orders.save(order) событие записывается в outbox-таблицу той же БД, в той же транзакции, что и изменение order.

COMMIT того, что изменилось:
  1. UPDATE order SET status = 'CONFIRMED' WHERE id = $1
  2. INSERT INTO outbox (event_type, payload, aggregate_id)
       VALUES ('OrderConfirmed', '{...}', $1)
  3. (атомарно — либо обе строки, либо ни одной)

Агрегат регистрирует событие внутри своего метода:

// core/order/domain/order.aggregate.ts
export class Order extends AggregateRoot {
  confirm(now: Date): void {
    if (this.status !== OrderStatus.NEW) {
      throw new OrderAlreadyConfirmedError(this.id, this.status);
    }
    this.status = OrderStatus.CONFIRMED;
    this.confirmedAt = now;
    this.registerEvent(new OrderConfirmed(this.id, this.status, now));
  }
}

Репозиторий при save записывает зарегистрированные события в outbox:

// adapters/out/persistence/typeorm-order.repository.ts
@Injectable()
export class TypeOrmOrderRepository implements OrderRepository {
  constructor(
    private readonly dataSource: DataSource,
    @Inject(OUTBOX_WRITER) private readonly outbox: OutboxWriter,
  ) {}

  async save(order: Order): Promise<void> {
    const manager = this.dataSource.createEntityManager();
    await manager.save(OrderOrmEntity, toOrmEntity(order));
    for (const event of order.getUncommittedEvents()) {
      await this.outbox.write(manager, event);
    }
    order.commit();
  }
}

После commit:

  • Outbox-relay — отдельный @Cron-планировщик, который делает SELECT ... FOR UPDATE SKIP LOCKED LIMIT 100 из outbox, публикует в Kafka, помечает как опубликованное.
  • Producer idempotent. Kafka гарантирует at-least-once с дедупом на уровне partition.

Зачем outbox, почему нельзя «после commit пушнуть в Kafka напрямую»:

  • Commit прошёл, Kafka недоступна — событие потеряно, read-model рассинхронизируется.
  • Commit failed, Kafka уже получила — phantom event, в БД нет соответствующего state.
  • Outbox решает обе проблемы: пока строка в outbox — relay будет ретраить до успешной публикации.

Idempotent consumer

R-CQRS-SYNC-2: consumer read-стороны обязан быть идемпотентным. At-least-once в Kafka означает: одно сообщение может прийти дважды. Без защиты read-model становится несогласованной.

Два варианта защиты:

processed_event таблица

CREATE TABLE processed_event (
    event_id     UUID PRIMARY KEY,
    consumer     TEXT NOT NULL,
    processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
// adapters/in/kafka/order-summary.projector.ts
@Injectable()
export class OrderSummaryProjector {
  constructor(
    private readonly dataSource: DataSource,
    @Inject(ORDER_SUMMARY_REPOSITORY) private readonly summaries: OrderSummaryRepository,
    @Inject(PROCESSED_EVENT_REPOSITORY) private readonly processedEvents: ProcessedEventRepository,
    @Inject(TX_RUNNER) private readonly tx: TransactionRunner,
  ) {}

  @EventPattern('order.events')
  async onOrderConfirmed(@Payload() event: OrderConfirmedDto): Promise<void> {
    await this.tx.run(async () => {
      const isDuplicate = await this.processedEvents.exists(event.eventId, 'order-summary-projector');
      if (isDuplicate) return;

      await this.processedEvents.markProcessed(event.eventId, 'order-summary-projector');
      await this.summaries.updateStatus(event.orderId, OrderStatus.CONFIRMED, event.confirmedAt);
    });
  }
}

Плюс: точная гарантия дедупа. Минус: отдельная таблица под нагрузкой записи.

Idempotent UPDATE по version

В read-таблице есть колонка version (или last_event_seq). UPDATE применяется только если событие новее текущей версии:

// adapters/in/kafka/order-summary.projector.ts
@EventPattern('order.events')
async onOrderConfirmed(@Payload() event: OrderConfirmedDto): Promise<void> {
  const updated = await this.summaries.updateStatusIfNewer(
    event.orderId,
    OrderStatus.CONFIRMED,
    event.confirmedAt,
    event.aggregateVersion,
  );
  if (updated === 0) {
    // событие устаревшее или дубликат — пропускаем
  }
}
UPDATE order_summary
SET    status = $1, confirmed_at = $2, version = $3, updated_at = NOW()
WHERE  order_id = $4 AND version < $3

Плюс: не нужна отдельная таблица. Минус: требует aggregateVersion в каждом событии; подходит когда события одного агрегата строго упорядочены (одна партиция Kafka по aggregate_id).

Bootstrap и disaster recovery

R-CQRS-SYNC-3: при первом запуске нового read-store или после его потери — не ждём пока события придут из Kafka. Запускаем batch-rebuild.

// adapters/in/lifecycle/order-summary-bootstrap.ts
@Injectable()
export class OrderSummaryBootstrap implements OnApplicationBootstrap {
  constructor(
    @Inject(ORDER_SUMMARY_REPOSITORY) private readonly summaries: OrderSummaryRepository,
    @Inject(ORDER_REPOSITORY)         private readonly orders: OrderRepository,
  ) {}

  async onApplicationBootstrap(): Promise<void> {
    const isEmpty = await this.summaries.isEmpty();
    if (!isEmpty) return;

    this.logger.log('order_summary is empty — running bootstrap rebuild');
    await this.rebuildAll();
  }

  private async rebuildAll(): Promise<void> {
    let lastId = 0n;
    while (true) {
      const batch = await this.orders.findAllAfter(lastId, 1000);
      if (batch.length === 0) break;
      await this.summaries.upsertBatch(batch.map(o => toSummary(o)));
      lastId = batch[batch.length - 1].id.value;
    }
  }
}

Не путать с обычной работой: bootstrap — разовый при создании read-store, не при каждом старте. Триггеры:

  • Новый read-store (например, добавили Redis-кэш проекции — он пустой).
  • Disaster recovery (read-store потерян, восстанавливаем из write-side).
  • Структурная миграция (добавили колонку в order_summary, нужно дозаполнить).

Тот же скрипт пригодится для ручного rebuild по одной записи, если consumer пропустил событие из-за бага.

Eventual consistency декларируется в API

R-CQRS-SYNC-4: если endpoint обслуживает read-model — это должно быть видно в OpenAPI. Клиент не должен догадываться о природе задержки.

// adapters/in/http/order.controller.ts
@Get(':id/summary')
@ApiOperation({
  summary: 'Get order summary (read-projection)',
  description:
    'Возвращает read-проекцию заказа.\n\n' +
    'Возможна задержка до 1 секунды между write-операцией и обновлением проекции.\n\n' +
    'Для немедленной согласованности (например, сразу после POST /orders) ' +
    'используйте GET /orders/:id (полный агрегат из write-store).',
})
@ApiOkResponse({ type: OrderSummaryDto })
async getSummary(@Param('id', ParseUUIDPipe) id: string): Promise<OrderSummaryDto> {
  const result = await this.handler.execute(new GetOrderSummary(OrderId.of(id)));
  if (!result) throw new NotFoundException();
  return result;
}

Зачем:

  • Клиент знает, что после POST /orders GET /orders/:id/summary может вернуть 404 или старое состояние.
  • Если нужна immediate consistency — он использует другой endpoint.
  • В тестах и troubleshooting eventual consistency — задокументированное архитектурное свойство, не баг.

Read-your-writes — три механизма

R-CQRS-SYNC-5: иногда нужно гарантировать, что клиент после своего write сразу увидит этот write в read-проекции. Три варианта, в порядке возрастания инвазивности:

1. Sticky session на gateway

Запросы одного клиента приходят на тот же pod, который только что обработал его write. Если consumer в том же процессе обновил in-memory проекцию синхронно после commit — клиент увидит свежие данные.

Не работает cross-pod, cross-service. Хрупко.

2. Polling после commit

// core/order/command/create-order.handler.ts
async execute(cmd: CreateOrder): Promise<OrderId> {
  const orderId = await this.tx.run(async () => {
    const order = Order.create(cmd.customerId, cmd.items);
    await this.orders.save(order);
    return order.id;
  });

  // ждём появления в read-model до 2 секунд
  await this.pollUntilVisible(orderId, 2000);
  return orderId;
}

private async pollUntilVisible(id: OrderId, timeoutMs: number): Promise<void> {
  const deadline = Date.now() + timeoutMs;
  while (Date.now() < deadline) {
    const found = await this.summaries.exists(id);
    if (found) return;
    await sleep(50);
  }
  this.logger.warn('read-model not visible after write within timeout', { id });
}

Минус: p99 latency POST /orders становится 2 секунды. Не подходит для частых write-операций.

3. Отдельный endpoint из write-store

Самое простое: для сценариев с требованием immediate consistency — отдельный endpoint, читающий прямо из write-store через агрегат.

@Get(':id')
@ApiOperation({ summary: 'Get order (immediate consistency, from write-store)' })
async getOrder(@Param('id', ParseUUIDPipe) id: string): Promise<OrderDto> { ... }

@Get(':id/summary')
@ApiOperation({ summary: 'Get order summary (eventual consistency, from read-projection)' })
async getSummary(@Param('id', ParseUUIDPipe) id: string): Promise<OrderSummaryDto> { ... }

В большинстве случаев — правильное решение: два endpoint-а явно показывают trade-off клиенту.

Пример сквозного потока: SberPay → read-model

1. POST /payments/charge
   → ChargeAccountHandler.execute(cmd)
   → account.charge(amount, key)      // регистрирует AccountCharged в агрегате
   → accountRepository.save(account) // INSERT в outbox в той же TX

2. OutboxRelay (@Cron every 200ms)
   → SELECT ... FOR UPDATE SKIP LOCKED LIMIT 100
   → публикует AccountCharged в Kafka topic payment.events

3. AccountBalanceSummaryProjector
   → @EventPattern('payment.events')
   → idempotent UPDATE account_balance_summary WHERE account_id = $1 AND version < $2

4. GET /accounts/:id/balance-summary
   → читает из account_balance_summary
   → possible lag ~200ms–1s

Цепочка не меняется от домена: Order, Product, Customer — везде один и тот же шаблон.

Что запрещено

АнтипаттернПравилоЧто взамен
Synchronous UPDATE read-model в command-handlerR-CQRS-SYNC-X1Outbox + Kafka + consumer
PG-trigger sync write-таблицы → read-таблицыR-CQRS-SYNC-X2Явный @EventPattern в NestJS
Event-payload — TypeORM entity write-схемыR-CQRS-SYNC-X3Независимый DTO-класс с версионированием
Consumer без idempotency-защитыR-CQRS-SYNC-2processed_event или idempotent UPDATE по version
Eventual consistency не задекларирована в APIR-CQRS-SYNC-4@ApiOperation({ description }) с явным описанием
Ожидание событий из Kafka при пустой read-modelR-CQRS-SYNC-3OnApplicationBootstrap batch-rebuild из write-store

Куда дальше

  • Command side — как outbox-событие регистрируется в агрегате и записывается при save.
  • Query side — read-handler-ы с Query<R> и ViewRepository.
  • Read-model — где и в каком виде хранится проекция, как обеспечить восстановимость.
  • Уровень и эволюция — lightweight vs full CQRS, эволюция по уровням.
  • Когда CQRS оправдан — когда вводить CQRS, а когда нет.