---
title: "Outbox publishing (Node/NestJS) — атомарная публикация Kafka-событий"
nav_title: "Outbox publishing"
excerpt: "Outbox pattern в NestJS: запись в outbox_event в той же TypeORM-транзакции, relay с @Interval и FOR UPDATE SKIP LOCKED, batch-публикация через kafkajs."
keywords: "Kafka outbox pattern NestJS, outbox-relay TypeORM, FOR UPDATE SKIP LOCKED NestJS, kafkajs producer send, R-KFK-OBX, outbox_event partial index, published_at, NestJS schedule Interval"
focus_keyword: "Kafka outbox pattern NestJS TypeORM"
tags: ["kafka", "outbox", "nestjs", "typescript", "kafkajs", "typeorm"]
---

# Outbox publishing (Node/NestJS) — атомарная публикация Kafka-событий

> **Опирается на правила:** `R-KFK-OBX-1` … `R-KFK-OBX-4` и `R-KFK-OBX-X1` … `R-KFK-OBX-X3` из Kafka Style Guide → [раздел 3. Outbox publishing](/standards/backend/kafka/#3-outbox-publishing).

> **Важно знать**
> - **Domain events** публикуются через outbox-relay, **не** напрямую `producer.send` из handler-а.
> - **Запись в `outbox_event`** идёт в той же `DataSource.transaction`, что бизнес-write. Либо обе commit, либо обе rollback.
> - **Outbox-relay** — отдельный `@Injectable` с `@Interval` (`@nestjs/schedule`), читает unpublished с `FOR UPDATE SKIP LOCKED`, публикует через kafkajs, проставляет `published_at`.
> - **Topic name** derives от `eventType`/`aggregateType`: `<service>.<aggregate-type>.<event-name>`.
> - **Relay в batch** (10–50 events) — снижает overhead DB-poll и Kafka-roundtrip.
> - **Partial index `WHERE published_at IS NULL`** обязателен — без него full scan при каждом poll.
> - **Подписка «after commit»** без outbox — потеря событий при падении между commit и publish.
> - **`producer.send` из той же транзакции с DB** — нельзя: Kafka не XA, rollback БД не откатит публикацию.

Outbox publishing — фундаментальный паттерн UCP. Все domain events публикуются через него. Это даёт **at-least-once** доставку с атомарностью «commit DB + публикация» через локальную транзакцию в PostgreSQL. Теория — [Distributed → outbox + inbox](/standards/backend/distributed-patterns/node/outbox-inbox/).

## Entity и DDL

`R-KFK-OBX-X3`: схема с partial-индексом.

```ts
// src/outbox/outbox-event.entity.ts
@Entity('outbox_event')
export class OutboxEventEntity {
  @PrimaryGeneratedColumn({ type: 'bigint' })
  id: string;

  @Column({ name: 'event_id', type: 'uuid', unique: true })
  eventId: string;

  @Column({ name: 'aggregate_type' })
  aggregateType: string;

  @Column({ name: 'aggregate_id' })
  aggregateId: string;

  @Column({ name: 'event_type' })
  eventType: string;

  @Column({ type: 'jsonb' })
  payload: Record<string, unknown>;

  @Column()
  topic: string;

  @Column({ name: 'partition_key' })
  partitionKey: string;

  @CreateDateColumn({ name: 'created_at', type: 'timestamptz' })
  createdAt: Date;

  @Column({ name: 'published_at', type: 'timestamptz', nullable: true })
  publishedAt: Date | null;
}
```

```sql
CREATE TABLE outbox_event (
    id             bigint GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
    event_id       uuid         NOT NULL UNIQUE,
    aggregate_type text         NOT NULL,
    aggregate_id   text         NOT NULL,
    event_type     text         NOT NULL,
    payload        jsonb        NOT NULL,
    topic          text         NOT NULL,
    partition_key  text         NOT NULL,
    created_at     timestamptz  NOT NULL DEFAULT now(),
    published_at   timestamptz
);
CREATE INDEX ix_outbox_event_unpublished
    ON outbox_event(id)
    WHERE published_at IS NULL;
```

`WHERE published_at IS NULL` — partial index покрывает только «тёплые» строки, которые ещё не опубликованы. Таблица растёт без bound, а relay сканирует только нужное.

## Запись в outbox из handler

`R-KFK-OBX-1`: write-handler пишет в `outbox_event` в той же транзакции.

```ts
// src/order/application/confirm-order.handler.ts
@Injectable()
export class ConfirmOrderHandler {
  constructor(private readonly dataSource: DataSource) {}

  async handle(command: ConfirmOrderCommand): Promise<void> {
    await this.dataSource.transaction(async (manager) => {
      const orderRepo = manager.getRepository(OrderEntity);
      const outboxRepo = manager.getRepository(OutboxEventEntity);

      const order = await orderRepo.findOne({
        where: { id: command.orderId },
        lock: { mode: 'pessimistic_write' },
      });
      if (!order) throw new OrderNotFoundError(command.orderId);

      order.status = 'CONFIRMED';
      order.confirmedAt = new Date();
      await orderRepo.save(order);

      await outboxRepo.save(
        outboxRepo.create({
          eventId: uuidv7(),
          aggregateType: 'Order',
          aggregateId: order.id,
          eventType: 'order.confirmed.v1',
          payload: {
            eventId: uuidv7(),
            eventType: 'OrderConfirmed.v1',
            occurredAt: new Date().toISOString(),
            aggregateType: 'Order',
            aggregateId: order.id,
            customerId: order.customerId,
            totalAmount: order.totalAmount,
          },
          topic: 'order-service.order.confirmed',
          partitionKey: order.id,
        }),
      );
    });
  }
}
```

Атомарность гарантирует PostgreSQL: либо обе записи commit, либо обе rollback. Никакой XA с Kafka не нужен.

## Outbox-relay

`R-KFK-OBX-2`: отдельный `@Injectable` с `@Interval`.

```ts
// src/outbox/outbox-relay.service.ts
@Injectable()
export class OutboxRelayService {
  private readonly logger = new Logger(OutboxRelayService.name);

  constructor(
    private readonly dataSource: DataSource,
    private readonly producer: Producer,
  ) {}

  @Interval(500)
  async publish(): Promise<void> {
    await this.dataSource.transaction(async (manager) => {
      const events = await manager
        .getRepository(OutboxEventEntity)
        .createQueryBuilder('e')
        .setLock('pessimistic_write')
        .setOnLocked('skip_locked')
        .where('e.publishedAt IS NULL')
        .orderBy('e.id', 'ASC')
        .take(50)
        .getMany();

      if (!events.length) return;

      const byTopic = Map.groupBy(events, (e) => e.topic);

      for (const [topic, batch] of byTopic) {
        await this.producer.send({
          topic,
          acks: -1,
          messages: batch.map((e) => ({
            key: e.partitionKey,
            value: JSON.stringify(e.payload),
            headers: { 'x-event-id': e.eventId, 'x-event-type': e.eventType },
          })),
        });
      }

      await manager.update(
        OutboxEventEntity,
        events.map((e) => e.id),
        { publishedAt: new Date() },
      );
    });
  }
}
```

`FOR UPDATE SKIP LOCKED` (`setLock('pessimistic_write').setOnLocked('skip_locked')`) — несколько pod-ов relay могут работать параллельно, каждый берёт свою порцию без блокировок. Горизонтальное масштабирование без координации.

`@Interval(500)` даёт типичную задержку публикации ~500 мс. Для критичных flow — `100`, для аналитики — `5000` вполне нормально.

## Инициализация producer

`R-KFK-PROD-1`, `R-KFK-OBX-2`: producer создаётся идемпотентным и регистрируется как провайдер.

```ts
// src/kafka/kafka-producer.provider.ts
export const KafkaProducerProvider: FactoryProvider<Producer> = {
  provide: KAFKA_PRODUCER,
  useFactory: async (config: KafkaConfig): Promise<Producer> => {
    const kafka = new Kafka({
      clientId: config.clientId,
      brokers: config.brokers,
      ssl: config.ssl,
    });
    const producer = kafka.producer({ idempotent: true, maxInFlightRequests: 5 });
    await producer.connect();
    return producer;
  },
  inject: [KafkaConfig],
};
```

```ts
// src/outbox/outbox.module.ts
@Module({
  imports: [TypeOrmModule.forFeature([OutboxEventEntity]), ScheduleModule],
  providers: [KafkaProducerProvider, OutboxRelayService],
})
export class OutboxModule {}
```

## Topic naming

`R-KFK-OBX-3`: convention `<service>.<aggregate-type>.<event-name>`.

| Сервис | Topic |
|---|---|
| order-service | `order-service.order.created` |
| order-service | `order-service.order.confirmed` |
| order-service | `order-service.order.cancelled` |
| payment-service | `payment-service.payment.charged` |
| payment-service | `payment-service.payment.refunded` |
| product-service | `product-service.product.published` |
| customer-service | `customer-service.customer.registered` |

Альтернатива — один topic на aggregate:

```
product-service.product  →  { eventType: "ProductPublished.v1", ... }
                         →  { eventType: "ProductPriceChanged.v1", ... }
```

Удобно для consumer, которому нужны «все события по продукту»: один подписчик ловит всё, фильтрует по `eventType`. Цена — нельзя ack событие `ProductPriceChanged`, не коммитя весь топик.

## Batch-обработка

`R-KFK-OBX-4`: relay читает 10–50 events за раз.

```ts
.take(50)
```

Почему не по одному:
- **DB-poll overhead** — каждый запрос ~1–2 мс даже с indexed scan.
- **Kafka roundtrip** — `producer.send` ждёт ACK, ~5–20 мс.
- При 100 events/s по одному — relay постоянно занят, задержка большая.

С batch 50 — relay поднимает 50 events одним запросом, шлёт через kafkajs, `published_at` ставит одним `update`. Throughput x10–20.

## Что запрещено

| Антипаттерн | Правило | Что взамен |
|---|---|---|
| `producer.send` из той же `DataSource.transaction` с DB-операцией | `R-KFK-OBX-X1` | outbox в той же транзакции, relay публикует отдельно |
| Подписка на «after commit» / `queryRunner.release()` + send | `R-KFK-OBX-X2` | outbox-relay с `@Interval` |
| Outbox без колонки `published_at` | `R-KFK-OBX-X3` | `published_at timestamptz nullable` + partial index |
| Partial index отсутствует | `R-KFK-OBX-X3` | `WHERE published_at IS NULL` — обязателен |
| Relay без `setLock('pessimistic_write').setOnLocked('skip_locked')` | `R-KFK-OBX-2` | SKIP LOCKED для параллельных pod-ов |
| Relay по одному событию | `R-KFK-OBX-4` | batch `.take(50)` |
| `producer.send` без `acks: -1` | `R-KFK-PROD-X2` | `acks: -1` (all) обязателен |
| Outbox без `event_id UNIQUE` | `R-KFK-OBX-1` | UNIQUE constraint защищает от двойной записи |

## Куда дальше

- [Конфигурация](/standards/backend/kafka/node/configuration/) — сборка `KafkaConfig`, fail-fast на отсутствующий topic.
- [Producer](/standards/backend/kafka/node/producer/) — почему нельзя `producer.send` напрямую из handler.
- [Idempotent consumer](/standards/backend/kafka/node/idempotent-consumer/) — receiver side at-least-once.
- [Event design](/standards/backend/kafka/node/event-design/) — формат payload, который пишется в `outbox_event`.
- [Observability](/standards/backend/kafka/node/observability/) — метрики relay: lag, batch size, publish errors.
- [Consumer](/standards/backend/kafka/node/consumer/) — manual commit offset после успешной обработки.
- [Retry topic + DLQ](/standards/backend/kafka/node/retry-and-dlq/) — что делать, если relay не может опубликовать.
- [Security](/standards/backend/kafka/node/security/) — TLS/SASL для kafkajs producer.
