Опирается на правила: R-KFK-OBX-1R-KFK-OBX-4 и R-KFK-OBX-X1R-KFK-OBX-X3 из Kafka Style Guide → раздел 3. Outbox publishing.

Важно знать

  • Domain events публикуются через outbox-relay, не напрямую producer.send из handler-а.
  • Запись в outbox_event идёт в той же DataSource.transaction, что бизнес-write. Либо обе commit, либо обе rollback.
  • Outbox-relay — отдельный @Injectable с @Interval (@nestjs/schedule), читает unpublished с FOR UPDATE SKIP LOCKED, публикует через kafkajs, проставляет published_at.
  • Topic name derives от eventType/aggregateType: <service>.<aggregate-type>.<event-name>.
  • Relay в batch (10–50 events) — снижает overhead DB-poll и Kafka-roundtrip.
  • Partial index WHERE published_at IS NULL обязателен — без него full scan при каждом poll.
  • Подписка «after commit» без outbox — потеря событий при падении между commit и publish.
  • producer.send из той же транзакции с DB — нельзя: Kafka не XA, rollback БД не откатит публикацию.

Outbox publishing — фундаментальный паттерн UCP. Все domain events публикуются через него. Это даёт at-least-once доставку с атомарностью «commit DB + публикация» через локальную транзакцию в PostgreSQL. Теория — Distributed → outbox + inbox.

Entity и DDL

R-KFK-OBX-X3: схема с partial-индексом.

// src/outbox/outbox-event.entity.ts
@Entity('outbox_event')
export class OutboxEventEntity {
  @PrimaryGeneratedColumn({ type: 'bigint' })
  id: string;

  @Column({ name: 'event_id', type: 'uuid', unique: true })
  eventId: string;

  @Column({ name: 'aggregate_type' })
  aggregateType: string;

  @Column({ name: 'aggregate_id' })
  aggregateId: string;

  @Column({ name: 'event_type' })
  eventType: string;

  @Column({ type: 'jsonb' })
  payload: Record<string, unknown>;

  @Column()
  topic: string;

  @Column({ name: 'partition_key' })
  partitionKey: string;

  @CreateDateColumn({ name: 'created_at', type: 'timestamptz' })
  createdAt: Date;

  @Column({ name: 'published_at', type: 'timestamptz', nullable: true })
  publishedAt: Date | null;
}
CREATE TABLE outbox_event (
    id             bigint GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
    event_id       uuid         NOT NULL UNIQUE,
    aggregate_type text         NOT NULL,
    aggregate_id   text         NOT NULL,
    event_type     text         NOT NULL,
    payload        jsonb        NOT NULL,
    topic          text         NOT NULL,
    partition_key  text         NOT NULL,
    created_at     timestamptz  NOT NULL DEFAULT now(),
    published_at   timestamptz
);
CREATE INDEX ix_outbox_event_unpublished
    ON outbox_event(id)
    WHERE published_at IS NULL;

WHERE published_at IS NULL — partial index покрывает только «тёплые» строки, которые ещё не опубликованы. Таблица растёт без bound, а relay сканирует только нужное.

Запись в outbox из handler

R-KFK-OBX-1: write-handler пишет в outbox_event в той же транзакции.

// src/order/application/confirm-order.handler.ts
@Injectable()
export class ConfirmOrderHandler {
  constructor(private readonly dataSource: DataSource) {}

  async handle(command: ConfirmOrderCommand): Promise<void> {
    await this.dataSource.transaction(async (manager) => {
      const orderRepo = manager.getRepository(OrderEntity);
      const outboxRepo = manager.getRepository(OutboxEventEntity);

      const order = await orderRepo.findOne({
        where: { id: command.orderId },
        lock: { mode: 'pessimistic_write' },
      });
      if (!order) throw new OrderNotFoundError(command.orderId);

      order.status = 'CONFIRMED';
      order.confirmedAt = new Date();
      await orderRepo.save(order);

      await outboxRepo.save(
        outboxRepo.create({
          eventId: uuidv7(),
          aggregateType: 'Order',
          aggregateId: order.id,
          eventType: 'order.confirmed.v1',
          payload: {
            eventId: uuidv7(),
            eventType: 'OrderConfirmed.v1',
            occurredAt: new Date().toISOString(),
            aggregateType: 'Order',
            aggregateId: order.id,
            customerId: order.customerId,
            totalAmount: order.totalAmount,
          },
          topic: 'order-service.order.confirmed',
          partitionKey: order.id,
        }),
      );
    });
  }
}

Атомарность гарантирует PostgreSQL: либо обе записи commit, либо обе rollback. Никакой XA с Kafka не нужен.

Outbox-relay

R-KFK-OBX-2: отдельный @Injectable с @Interval.

// src/outbox/outbox-relay.service.ts
@Injectable()
export class OutboxRelayService {
  private readonly logger = new Logger(OutboxRelayService.name);

  constructor(
    private readonly dataSource: DataSource,
    private readonly producer: Producer,
  ) {}

  @Interval(500)
  async publish(): Promise<void> {
    await this.dataSource.transaction(async (manager) => {
      const events = await manager
        .getRepository(OutboxEventEntity)
        .createQueryBuilder('e')
        .setLock('pessimistic_write')
        .setOnLocked('skip_locked')
        .where('e.publishedAt IS NULL')
        .orderBy('e.id', 'ASC')
        .take(50)
        .getMany();

      if (!events.length) return;

      const byTopic = Map.groupBy(events, (e) => e.topic);

      for (const [topic, batch] of byTopic) {
        await this.producer.send({
          topic,
          acks: -1,
          messages: batch.map((e) => ({
            key: e.partitionKey,
            value: JSON.stringify(e.payload),
            headers: { 'x-event-id': e.eventId, 'x-event-type': e.eventType },
          })),
        });
      }

      await manager.update(
        OutboxEventEntity,
        events.map((e) => e.id),
        { publishedAt: new Date() },
      );
    });
  }
}

FOR UPDATE SKIP LOCKED (setLock('pessimistic_write').setOnLocked('skip_locked')) — несколько pod-ов relay могут работать параллельно, каждый берёт свою порцию без блокировок. Горизонтальное масштабирование без координации.

@Interval(500) даёт типичную задержку публикации ~500 мс. Для критичных flow — 100, для аналитики — 5000 вполне нормально.

Инициализация producer

R-KFK-PROD-1, R-KFK-OBX-2: producer создаётся идемпотентным и регистрируется как провайдер.

// src/kafka/kafka-producer.provider.ts
export const KafkaProducerProvider: FactoryProvider<Producer> = {
  provide: KAFKA_PRODUCER,
  useFactory: async (config: KafkaConfig): Promise<Producer> => {
    const kafka = new Kafka({
      clientId: config.clientId,
      brokers: config.brokers,
      ssl: config.ssl,
    });
    const producer = kafka.producer({ idempotent: true, maxInFlightRequests: 5 });
    await producer.connect();
    return producer;
  },
  inject: [KafkaConfig],
};
// src/outbox/outbox.module.ts
@Module({
  imports: [TypeOrmModule.forFeature([OutboxEventEntity]), ScheduleModule],
  providers: [KafkaProducerProvider, OutboxRelayService],
})
export class OutboxModule {}

Topic naming

R-KFK-OBX-3: convention <service>.<aggregate-type>.<event-name>.

СервисTopic
order-serviceorder-service.order.created
order-serviceorder-service.order.confirmed
order-serviceorder-service.order.cancelled
payment-servicepayment-service.payment.charged
payment-servicepayment-service.payment.refunded
product-serviceproduct-service.product.published
customer-servicecustomer-service.customer.registered

Альтернатива — один topic на aggregate:

product-service.product  →  { eventType: "ProductPublished.v1", ... }
                         →  { eventType: "ProductPriceChanged.v1", ... }

Удобно для consumer, которому нужны «все события по продукту»: один подписчик ловит всё, фильтрует по eventType. Цена — нельзя ack событие ProductPriceChanged, не коммитя весь топик.

Batch-обработка

R-KFK-OBX-4: relay читает 10–50 events за раз.

.take(50)

Почему не по одному:

  • DB-poll overhead — каждый запрос ~1–2 мс даже с indexed scan.
  • Kafka roundtripproducer.send ждёт ACK, ~5–20 мс.
  • При 100 events/s по одному — relay постоянно занят, задержка большая.

С batch 50 — relay поднимает 50 events одним запросом, шлёт через kafkajs, published_at ставит одним update. Throughput x10–20.

Что запрещено

АнтипаттернПравилоЧто взамен
producer.send из той же DataSource.transaction с DB-операциейR-KFK-OBX-X1outbox в той же транзакции, relay публикует отдельно
Подписка на «after commit» / queryRunner.release() + sendR-KFK-OBX-X2outbox-relay с @Interval
Outbox без колонки published_atR-KFK-OBX-X3published_at timestamptz nullable + partial index
Partial index отсутствуетR-KFK-OBX-X3WHERE published_at IS NULL — обязателен
Relay без setLock('pessimistic_write').setOnLocked('skip_locked')R-KFK-OBX-2SKIP LOCKED для параллельных pod-ов
Relay по одному событиюR-KFK-OBX-4batch .take(50)
producer.send без acks: -1R-KFK-PROD-X2acks: -1 (all) обязателен
Outbox без event_id UNIQUER-KFK-OBX-1UNIQUE constraint защищает от двойной записи

Куда дальше

  • Конфигурация — сборка KafkaConfig, fail-fast на отсутствующий topic.
  • Producer — почему нельзя producer.send напрямую из handler.
  • Idempotent consumer — receiver side at-least-once.
  • Event design — формат payload, который пишется в outbox_event.
  • Observability — метрики relay: lag, batch size, publish errors.
  • Consumer — manual commit offset после успешной обработки.
  • Retry topic + DLQ — что делать, если relay не может опубликовать.
  • Security — TLS/SASL для kafkajs producer.