Опирается на правила: R-DIST-OBX-1R-DIST-OBX-3 и R-DIST-OBX-X1R-DIST-OBX-X2 из Distributed Patterns — раздел 5. Outbox + Inbox.

Важно знать

  • Outbox решает «DB commit + message publish атомарно»: одна DataSource.transaction пишет доменную запись и INSERT в outbox_event.
  • NestJS не даёт декларативных транзакций @Transactional — весь outbox-код внутри явного dataSource.transaction(async (m) => { ... }).
  • Outbox-relay — отдельный @Injectable с setInterval или @Cron: выбирает unpublished батч через FOR UPDATE SKIP LOCKED QueryBuilder, публикует через kafkajs producer.send, обновляет published_at в той же транзакции.
  • После публикации строка остаётся в таблице (audit + rebuild); удалять через retention-job, не сразу.
  • Inbox — обратная сторона: consumer пишет сообщение в inbox_event с processed = false, отдельный handler обрабатывает в DataSource.transaction. Используется только для critical-потоков (финансы, высокий burst).
  • Дефолт вместо inbox — processed_event dedup: INSERT ... ON CONFLICT DO NOTHING (TypeORM .orIgnore()) в той же транзакции, что и обработка события.
  • Single source of truth — PG сервиса. kafkajs-producer — транспорт, не источник правды.
  • Запрет: producer.send() из command-handler напрямую и TypeORM subscriber afterInsert/afterUpdate как замена outbox.

Outbox — фундаментальный паттерн UCP. Все события из write-handler-ов идут через outbox без исключений. Это даёт at-least-once гарантию доставки без двухфазного коммита.

Проблема «commit + publish»

Что хочется:

// src/order/handlers/create-order.handler.ts
@Injectable()
export class CreateOrderHandler {
  async handle(command: CreateOrderCommand): Promise<Order> {
    const order = await this.orderRepository.save(Order.create(command));
    await this.producer.send({
      topic: 'order.events',
      messages: [{ key: order.id, value: JSON.stringify(new OrderCreatedEvent(order)) }],
    });
    return order;
  }
}

Что не работает:

  • DB commit прошёл, producer.send упал → событие потеряно, downstream не знает об Order.
  • producer.send прошёл, DB commit упал (constraint violation, deadlock) → событие в Kafka, заказа в PG нет → downstream обрабатывает фантом.
  • Network partition между этими двумя — гарантий никаких.

Distributed transaction между PG и Kafka не существует: kafkajs не поддерживает XA, TypeORM не координирует Kafka-producer. Outbox решает это через локальную DataSource.transaction.

Outbox pattern

R-DIST-OBX-1: outbox для исходящих событий обязателен.

Схема outbox-таблицы

CREATE TABLE outbox_event (
    id              bigint GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
    event_id        uuid        NOT NULL UNIQUE,
    aggregate_type  text        NOT NULL,
    aggregate_id    text        NOT NULL,
    event_type      text        NOT NULL,
    payload         jsonb       NOT NULL,
    topic           text        NOT NULL,
    partition_key   text        NOT NULL,
    created_at      timestamptz NOT NULL DEFAULT now(),
    published_at    timestamptz
);
CREATE INDEX ix_outbox_event_unpublished ON outbox_event(id) WHERE published_at IS NULL;

Partial index WHERE published_at IS NULL — relay сканирует только unpublished строки. После публикации строка исчезает из «горячего» индекса, но остаётся в таблице для audit.

TypeORM entity

// src/outbox/outbox-event.entity.ts
@Entity('outbox_event')
export class OutboxEventEntity {
  @PrimaryGeneratedColumn({ type: 'bigint' })
  id: string;

  @Column({ type: 'uuid', unique: true })
  eventId: string;

  @Column()
  aggregateType: string;

  @Column()
  aggregateId: string;

  @Column()
  eventType: string;

  @Column({ type: 'jsonb' })
  payload: object;

  @Column()
  topic: string;

  @Column()
  partitionKey: string;

  @CreateDateColumn({ type: 'timestamptz' })
  createdAt: Date;

  @Column({ type: 'timestamptz', nullable: true })
  publishedAt: Date | null;
}

OutboxWriter

// src/outbox/outbox-writer.ts
@Injectable()
export class OutboxWriter {
  write(
    manager: EntityManager,
    opts: { eventId?: string; aggregateType: string; aggregateId: string; eventType: string; payload: object; topic: string; partitionKey: string },
  ): Promise<InsertResult> {
    return manager.getRepository(OutboxEventEntity).insert({
      eventId: opts.eventId ?? randomUUID(),
      aggregateType: opts.aggregateType,
      aggregateId: opts.aggregateId,
      eventType: opts.eventType,
      payload: opts.payload,
      topic: opts.topic,
      partitionKey: opts.partitionKey,
      publishedAt: null,
    });
  }
}

OutboxWriter принимает EntityManager (уже открытая транзакция) — не открывает новую. Вся ответственность за транзакцию на стороне вызывающего handler-а.

Write-handler

// src/order/handlers/create-order.handler.ts
@Injectable()
export class CreateOrderHandler {
  constructor(
    private readonly dataSource: DataSource,
    private readonly outbox: OutboxWriter,
  ) {}

  async handle(command: CreateOrderCommand): Promise<Order> {
    return this.dataSource.transaction(async (m) => {
      const order = m.getRepository(OrderEntity).create({
        id: randomUUID(),
        customerId: command.customerId,
        amount: command.amount,
        status: 'pending',
        createdAt: new Date(),
      });
      await m.save(order);

      await this.outbox.write(m, {
        aggregateType: 'Order',
        aggregateId: order.id,
        eventType: 'OrderCreated',
        payload: {
          orderId: order.id,
          customerId: order.customerId,
          amount: order.amount,
        },
        topic: 'order.events',
        partitionKey: order.id,
      });

      return order;
    });
  }
}

INSERT order и INSERT outbox_event — одна DataSource.transaction. Либо оба commit, либо оба откат. Нет состояния «заказ есть, событие потеряно».

Outbox-relay

// src/outbox/outbox-relay.service.ts
@Injectable()
export class OutboxRelayService implements OnModuleInit, OnModuleDestroy {
  private timer: NodeJS.Timeout | null = null;

  constructor(
    private readonly dataSource: DataSource,
    private readonly kafka: Kafka,
  ) {}

  onModuleInit(): void {
    this.timer = setInterval(() => this.publish().catch(noop), 500);
  }

  onModuleDestroy(): void {
    if (this.timer) clearInterval(this.timer);
  }

  private async publish(): Promise<void> {
    await this.dataSource.transaction(async (m) => {
      const batch = await m
        .getRepository(OutboxEventEntity)
        .createQueryBuilder('e')
        .where('e.publishedAt IS NULL')
        .orderBy('e.id')
        .limit(100)
        .setLock('pessimistic_write')
        .setOnLocked('skip_locked')
        .getMany();

      if (batch.length === 0) return;

      const producer = this.kafka.producer({ idempotent: true });
      await producer.connect();

      try {
        await producer.send({
          topic: batch[0].topic,
          messages: batch.map((row) => ({
            key: row.partitionKey,
            value: JSON.stringify(row.payload),
            headers: { eventId: row.eventId, eventType: row.eventType },
          })),
        });
      } finally {
        await producer.disconnect();
      }

      await m.update(
        OutboxEventEntity,
        batch.map((r) => r.id),
        { publishedAt: new Date() },
      );
    });
  }
}

setLock('pessimistic_write').setOnLocked('skip_locked') — TypeORM-обёртка над FOR UPDATE SKIP LOCKED. Несколько инстансов relay-я могут работать параллельно: каждый захватывает свою порцию, не блокируя других.

Если relay упал между producer.send и m.update — следующий цикл возьмёт те же строки и опубликует снова. At-least-once: именно поэтому consumer обязан быть идемпотентным.

Single source of truth

R-DIST-OBX-3: PG сервиса — единственный источник правды. kafkajs producer — транспорт.

Что это даёт:

  • Потеря Kafka-данных (retention истёк, broker недоступен) — outbox_event продолжает накапливать; после восстановления relay публикует пропущенное.
  • Rebuild read-projection — скрипт перечитывает outbox_event и публикует события заново; consumer обрабатывает идемпотентно.
  • Audit — полная история событий, которые сервис когда-либо порождал.

Inbox pattern

R-DIST-OBX-2: inbox — обратная сторона outbox. Consumer пишет полученное сообщение в inbox_event с processed = false, отдельный handler обрабатывает unprocessed-строки в DataSource.transaction.

CREATE TABLE inbox_event (
    event_id     uuid        PRIMARY KEY,
    topic        text        NOT NULL,
    received_at  timestamptz NOT NULL DEFAULT now(),
    payload      jsonb       NOT NULL,
    processed    boolean     NOT NULL DEFAULT false,
    processed_at timestamptz
);
CREATE INDEX ix_inbox_event_unprocessed ON inbox_event(received_at) WHERE NOT processed;
// src/payment/inbox/payment-inbox-consumer.service.ts
@Injectable()
export class PaymentInboxConsumerService implements OnModuleInit {
  constructor(
    private readonly kafka: Kafka,
    private readonly dataSource: DataSource,
  ) {}

  async onModuleInit(): Promise<void> {
    const consumer = this.kafka.consumer({ groupId: 'payment-inbox-consumer' });
    await consumer.connect();
    await consumer.subscribe({ topic: 'payment.events' });

    await consumer.run({
      eachMessage: async ({ message }) => {
        const eventId = message.headers?.eventId?.toString() ?? randomUUID();
        await this.dataSource
          .getRepository(InboxEventEntity)
          .createQueryBuilder()
          .insert()
          .values({ eventId, topic: 'payment.events', payload: JSON.parse(message.value!.toString()) })
          .orIgnore()
          .execute();
      },
    });
  }
}
// src/payment/inbox/payment-inbox-processor.service.ts
@Injectable()
export class PaymentInboxProcessorService implements OnModuleInit, OnModuleDestroy {
  private timer: NodeJS.Timeout | null = null;

  constructor(
    private readonly dataSource: DataSource,
    private readonly handler: PaymentEventHandler,
  ) {}

  onModuleInit(): void {
    this.timer = setInterval(() => this.process().catch(noop), 200);
  }

  onModuleDestroy(): void {
    if (this.timer) clearInterval(this.timer);
  }

  private async process(): Promise<void> {
    const batch = await this.dataSource
      .getRepository(InboxEventEntity)
      .createQueryBuilder('e')
      .where('e.processed = false')
      .orderBy('e.receivedAt')
      .limit(50)
      .setLock('pessimistic_write')
      .setOnLocked('skip_locked')
      .getMany();

    for (const row of batch) {
      await this.dataSource.transaction(async (m) => {
        await this.handler.handle(m, row.payload);
        await m.update(InboxEventEntity, { eventId: row.eventId }, {
          processed: true,
          processedAt: new Date(),
        });
      });
    }
  }
}

Когда использовать inbox

  • Высокий burst — Kafka даёт резкий пик 10k msg/s, обработка тяжёлая (5 ms × 10k = 50 секунд). Inbox развязывает приём и обработку: consumer принимает быстро, processor работает в своём темпе.
  • Critical financial flows — нужно гарантировать, что ни одно сообщение не потеряется между приёмом и обработкой даже при крэше.

В большинстве случаев inbox избыточен. Достаточно processed_event dedup с .orIgnore() в той же транзакции, что и доменная обработка:

// src/product/handlers/product-reserved.handler.ts
@Injectable()
export class ProductReservedHandler {
  constructor(private readonly dataSource: DataSource) {}

  async handle(event: ProductReservedEvent): Promise<void> {
    await this.dataSource.transaction(async (m) => {
      const inserted = await m
        .getRepository(ProcessedEventEntity)
        .createQueryBuilder()
        .insert()
        .values({ eventId: event.eventId, processedAt: new Date() })
        .orIgnore()
        .execute();

      if (inserted.raw.affectedRows === 0) {
        return;
      }

      await m
        .createQueryBuilder()
        .update(ProductEntity)
        .set({ reservedQuantity: () => 'reserved_quantity + :qty', updatedAt: new Date() })
        .where('id = :id', { id: event.productId })
        .setParameter('qty', event.quantity)
        .execute();
    });
  }
}

.orIgnore() — TypeORM-обёртка над INSERT ... ON CONFLICT DO NOTHING. UNIQUE-constraint на event_id в processed_event гарантирует деdup под race: если два consumer-а получили одно сообщение, только один запишет запись; второй получит affectedRows === 0 и выйдет.

КритерийТолько processed_eventInbox + processor
Сложностьнизкаясредняя
Burst handlingограничен concurrency consumer-аразвязка приёма и обработки
Recovery после крэшаre-consume из Kafkaотдельная обработка inbox
Когда применятьдефолтfinancial / high burst

Что запрещено

АнтипаттернПравилоЧто взамен
producer.send() напрямую из command-handlerR-DIST-OBX-X1OutboxWriter.write(m, ...) в той же DataSource.transaction
TypeORM subscriber afterInsert / afterUpdate для отправки в KafkaR-DIST-OBX-X2outbox-relay с FOR UPDATE SKIP LOCKED
Relay без setOnLocked('skip_locked')R-DIST-OBX-1.setLock('pessimistic_write').setOnLocked('skip_locked') при нескольких инстансах
Outbox без partial-index по unpublishedR-DIST-OBX-1WHERE published_at IS NULL
Kafka как source of truthR-DIST-OBX-3PG — SoT, kafkajs — транспорт
Inbox для каждого consumer-а по умолчаниюR-DIST-OBX-2processed_event dedup через .orIgnore() — дефолт; inbox только для financial / high burst
DELETE published строк из outbox сразу после relayR-DIST-OBX-3хранить для audit/rebuild, чистить retention-job-ом по расписанию

Куда дальше

  • Idempotency — receiver обязан быть идемпотентным при at-least-once; processed_event + .orIgnore().
  • Saga — оркестрация vs хореография — saga-state-таблица и outbox работают вместе; команды orchestrator-а публикуются через outbox.
  • Eventual consistency — outbox — главный механизм EC; bounded-staleness SLO.
  • Compensation — compensation-команды публикуются через outbox в той же DataSource.transaction.
  • Distributed transactions — что не делать — почему multi-DataSource-commit-цепочка не замена outbox.
  • Когда нужны распределённые паттерны — outbox актуален только при cross-service событиях.