← назад к разделу

В CQRS write-side и read-side разделены физически: одна база принимает команды, другая отвечает на запросы. Но разделить — только полдела. Нужно, чтобы изменения из write-side надёжно попадали в read-side. Это и есть задача синхронизации.

Самый популярный способ — события через брокер сообщений. Статья разбирает этот механизм на практике: как не потерять событие, как справляться с дублями и что делать, когда read-model нужно восстановить с нуля.

Почему нельзя просто отправить событие после сохранения

Первый импульс выглядит логично: сохранили агрегат → отправили событие в Kafka. Что может пойти не так?

Всё. Сохранение в БД прошло, а Kafka в этот момент недоступна — событие потеряно, read-model навсегда отстала. Или наоборот: Kafka получила событие, но транзакция в БД откатилась — в read-side появились данные, которых нет в write-side.

Оба сценария ломают согласованность между сторонами.

Outbox: атомарность с агрегатом

Решение — outbox-паттерн. Вместо прямой отправки в Kafka событие записывается в отдельную таблицу (outbox) в той же базе данных и в той же транзакции, что и изменение агрегата. Либо оба изменения применяются, либо ни одно.

COMMIT транзакции:
  1. UPDATE orders SET status = 'CONFIRMED' WHERE id = $1
  2. INSERT INTO outbox (event_type, payload, aggregate_id)
       VALUES ('OrderConfirmed', '{...}', $1)
  → либо обе строки, либо ни одной

Агрегат регистрирует событие внутри своего метода:

// order.aggregate.ts
export class Order extends AggregateRoot {
  confirm(now: Date): void {
    if (this.status !== OrderStatus.NEW) {
      throw new OrderAlreadyConfirmedError(this.id, this.status);
    }
    this.status = OrderStatus.CONFIRMED;
    this.confirmedAt = now;
    this.registerEvent(new OrderConfirmed(this.id, this.status, now));
  }
}

Репозиторий при save записывает зарегистрированные события в outbox-таблицу:

// typeorm-order.repository.ts
@Injectable()
export class TypeOrmOrderRepository implements OrderRepository {
  constructor(
    private readonly dataSource: DataSource,
    @Inject(OUTBOX_WRITER) private readonly outbox: OutboxWriter,
  ) {}

  async save(order: Order): Promise<void> {
    const manager = this.dataSource.createEntityManager();
    await manager.save(OrderOrmEntity, toOrmEntity(order));
    for (const event of order.getUncommittedEvents()) {
      await this.outbox.write(manager, event);
    }
    order.commit();
  }
}

Отдельный компонент — outbox-relay — периодически читает из outbox и публикует сообщения в Kafka. Пока публикация не прошла, строка остаётся в таблице, и relay продолжает попытки.

// outbox-relay — упрощённая логика:
// SELECT ... FOR UPDATE SKIP LOCKED LIMIT 100
// → публикуем в Kafka
// → помечаем как опубликованное

Idempotent consumer: защита от дублей

Kafka гарантирует доставку «хотя бы раз» (at-least-once). Это значит, что одно сообщение может прийти потребителю дважды: например, consumer обработал его, но упал до коммита offset'а.

Без защиты повторная обработка сломает read-model. Поэтому consumer должен быть идемпотентным — обрабатывать дублирующееся сообщение безопасно.

Два способа:

Таблица обработанных событий

Перед обновлением read-model проверяем, не обрабатывали ли мы уже это событие:

// order-summary.projector.ts
@Injectable()
export class OrderSummaryProjector {
  @EventPattern('order.events')
  async onOrderConfirmed(@Payload() event: OrderConfirmedDto): Promise<void> {
    await this.tx.run(async () => {
      const isDuplicate = await this.processedEvents.exists(
        event.eventId,
        'order-summary-projector',
      );
      if (isDuplicate) return;

      await this.processedEvents.markProcessed(event.eventId, 'order-summary-projector');
      await this.summaries.updateStatus(event.orderId, OrderStatus.CONFIRMED, event.confirmedAt);
    });
  }
}

Структура таблицы:

CREATE TABLE processed_event (
    event_id     UUID PRIMARY KEY,
    consumer     TEXT NOT NULL,
    processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

Плюс: точная гарантия. Минус: лишняя запись при каждом событии.

Условный UPDATE по версии

Если в каждом событии есть aggregateVersion, можно использовать его как защиту:

@EventPattern('order.events')
async onOrderConfirmed(@Payload() event: OrderConfirmedDto): Promise<void> {
  await this.summaries.updateStatusIfNewer(
    event.orderId,
    OrderStatus.CONFIRMED,
    event.confirmedAt,
    event.aggregateVersion,
  );
}
UPDATE order_summary
SET    status = $1, confirmed_at = $2, version = $3, updated_at = NOW()
WHERE  order_id = $4 AND version < $3

Если событие уже применено или пришло старое — ничего не произойдёт. Плюс: не нужна отдельная таблица. Минус: нужен aggregateVersion в событии и порядок доставки по партиции Kafka.

Восстановление read-model с нуля

Иногда read-model нужно пересобрать: при первом запуске нового хранилища, после сбоя, при структурной миграции. Ждать, пока события придут из Kafka — не вариант: события могут быть старыми или их retention уже истёк.

Правильный подход — пакетное восстановление из write-store при старте приложения:

// order-summary-bootstrap.ts
@Injectable()
export class OrderSummaryBootstrap implements OnApplicationBootstrap {
  private readonly logger = new Logger(OrderSummaryBootstrap.name);

  constructor(
    @Inject(ORDER_SUMMARY_REPOSITORY) private readonly summaries: OrderSummaryRepository,
    @Inject(ORDER_REPOSITORY)         private readonly orders: OrderRepository,
  ) {}

  async onApplicationBootstrap(): Promise<void> {
    const isEmpty = await this.summaries.isEmpty();
    if (!isEmpty) return;

    this.logger.log('order_summary пустая — запускаем восстановление');
    await this.rebuildAll();
  }

  private async rebuildAll(): Promise<void> {
    let lastId = 0n;
    while (true) {
      const batch = await this.orders.findAllAfter(lastId, 1000);
      if (batch.length === 0) break;
      await this.summaries.upsertBatch(batch.map(o => toSummary(o)));
      lastId = batch[batch.length - 1].id.value;
    }
  }
}

Логика проверяет пустоту read-store и только тогда запускает обход. При следующих стартах, когда данные уже есть, инициализация пропускается.

Eventual consistency и API

Когда write и read разделены через брокер, между ними всегда есть небольшая задержка — обычно доли секунды. Клиент должен об этом знать, иначе он будет считать задержку ошибкой.

Описывайте это в OpenAPI-аннотации endpoint'а, который читает из проекции:

@Get(':id/summary')
@ApiOperation({
  summary: 'Get order summary (read-projection)',
  description:
    'Возвращает read-проекцию заказа.\n\n' +
    'Возможна задержка до 1 секунды между write-операцией и обновлением проекции.\n\n' +
    'Для немедленной согласованности используйте GET /orders/:id — он читает из write-store.',
})
@ApiOkResponse({ type: OrderSummaryDto })
async getSummary(@Param('id', ParseUUIDPipe) id: string): Promise<OrderSummaryDto> {
  const result = await this.handler.execute(new GetOrderSummary(OrderId.of(id)));
  if (!result) throw new NotFoundException();
  return result;
}

Явная документация помогает в тестах и при разборе проблем: задержка — намеренное архитектурное свойство системы, а не баг.

Read-your-writes: когда клиент хочет сразу увидеть своё изменение

Иногда требование жёстче: клиент отправил команду и тут же хочет увидеть результат в read-проекции. Три варианта, от простого к сложному:

Два endpoint'а. Самое чистое решение: один endpoint читает из write-store (немедленная согласованность), другой — из read-проекции (с возможной задержкой). Клиент выбирает нужный по ситуации.

@Get(':id')
@ApiOperation({ summary: 'Get order (из write-store, немедленная согласованность)' })
async getOrder(@Param('id', ParseUUIDPipe) id: string): Promise<OrderDto> { ... }

@Get(':id/summary')
@ApiOperation({ summary: 'Get order summary (из read-проекции, с возможной задержкой)' })
async getSummary(@Param('id', ParseUUIDPipe) id: string): Promise<OrderSummaryDto> { ... }

Ожидание в обработчике. После commit'а транзакции обработчик опрашивает read-store до появления записи или до таймаута. Подходит только когда read-your-writes действительно нужен и нагрузка невысока — p99 latency команды увеличивается на время ожидания.

Sticky routing на шлюзе. Запросы одного клиента направляются на один pod. Работает только если consumer в том же процессе обновляет проекцию синхронно, и ломается при масштабировании на несколько узлов.

В большинстве случаев подходит первый вариант.

Частые ошибки

Обновление read-model прямо в обработчике команды. Выглядит как удобный ярлык, но уничтожает смысл разделения: handler знает о read-стороне, они снова связаны.

PG-триггеры для синхронизации таблиц. Невидимы при просмотре кода, ломаются на массовых операциях, не работают если read-model в другой базе данных.

Событие — это TypeORM-сущность write-схемы. Любое изменение структуры write-стороны сразу ломает consumer. Событие — отдельный класс с собственным версионированием.

Как выглядит полный поток

1. POST /orders
   → CreateOrderHandler
   → order.create(...)       // регистрирует OrderCreated в агрегате
   → orderRepository.save()  // INSERT в outbox в той же транзакции

2. OutboxRelay (каждые ~200ms)
   → SELECT ... FOR UPDATE SKIP LOCKED LIMIT 100
   → публикует OrderCreated в Kafka topic order.events

3. OrderSummaryProjector
   → @EventPattern('order.events')
   → idempotent UPDATE order_summary

4. GET /orders/:id/summary
   → читает из order_summary
   → возможная задержка ~200ms–1s

Этот шаблон одинаков для любого агрегата в системе.

Коротко

  • Прямая отправка в Kafka после сохранения ненадёжна: либо потеряете событие, либо получите «фантомное». Outbox решает обе проблемы — событие записывается в ту же транзакцию, что и изменение агрегата.
  • Consumer должен быть идемпотентным: Kafka доставляет «хотя бы раз». Защита — либо таблица обработанных событий, либо условный UPDATE по версии.
  • При пустой read-model не ждите Kafka: запускайте пакетное восстановление из write-store через OnApplicationBootstrap.
  • Eventual consistency — задокументированное свойство, не баг. Укажите это в OpenAPI-описании endpoint'а.
  • Если нужна немедленная согласованность — заведите отдельный endpoint, читающий из write-store.

Что почитать дальше