Опирается на правила: R-KFK-PROD-1R-KFK-PROD-4 и R-KFK-PROD-X1R-KFK-PROD-X4 из Kafka Style Guide → раздел 1. Producer.

Важно знать

  • idempotent: true всегда. В kafkajs это kafka.producer({ idempotent: true, maxInFlightRequests: 5 }) — требует acks: -1. Exactly-once на уровне partition.
  • acks: -1 — единственно допустимое значение для бизнес-событий. acks: 0 и acks: 1 запрещены.
  • Partition key обязателен для всех бизнес-событий. Дефолтный ключ — aggregate id.
  • JSON-сериализация (JSON.stringify) по умолчанию. Avro/Protobuf через @kafkajs/confluent-schema-registry — только для bandwidth-чувствительных топиков.
  • producer.send напрямую из use case handler для domain events — запрещён. События идут через outbox.
  • Kafka не XA — нельзя в одной транзакции с PG. Только outbox.
  • maxInFlightRequests: 5 при idempotent: true — kafkajs кидает ошибку при превышении.

Kafka producer — точка, где сервис «публикует факт» во внешний мир. В kafkajs producer создаётся явно: kafka.producer(options), и его жизненный цикл управляется вручную. UCP формулирует правила так, чтобы producer был exactly-once на partition и атомичен с DB через outbox.

idempotent: true — всегда

R-KFK-PROD-1: один флаг включает три гарантии сразу.

import { Kafka } from 'kafkajs';

const kafka = new Kafka({
  clientId: 'order-service',
  brokers: [process.env.KAFKA_BROKERS!],
});

const producer = kafka.producer({
  idempotent: true,
  maxInFlightRequests: 5,
});

Что kafkajs делает с idempotent: true:

  • acks: -1 — broker подтверждает только после репликации на min.insync.replicas followers.
  • maxInFlightRequests: 5 — гарантирует ordering между retry (kafkajs кидает ошибку, если задать больше вместе с idempotent: true).
  • Broker дедуплицирует записи по (producerId, sequenceNumber) на partition — retry со стороны producer не создаёт дубликат в Kafka.

Без idempotent: true retry на стороне producer создаёт дубликаты: broker получает одно сообщение дважды, downstream consumer видит оба.

Partition key — обязателен

R-KFK-PROD-2: key определяет, на какой partition уйдёт сообщение.

В NestJS producer оборачивается в провайдер. Например, для агрегата Order:

@Injectable()
export class OrderEventPublisher {
  constructor(private readonly producer: KafkaProducerService) {}

  async publishConfirmed(order: Order): Promise<void> {
    await this.producer.send({
      topic: 'order-service.order.confirmed',
      messages: [
        {
          key: String(order.id),
          value: JSON.stringify(this.toEvent(order)),
        },
      ],
    });
  }

  private toEvent(order: Order): OrderConfirmedEvent {
    return {
      eventId: order.confirmedEventId,
      eventType: 'order.confirmed.v1',
      occurredAt: order.confirmedAt.toISOString(),
      aggregateType: 'Order',
      aggregateId: String(order.id),
      customerId: String(order.customerId),
      totalAmount: order.totalAmount,
    };
  }
}

Дефолтный ключ — String(order.id) (aggregate id). Все события одного order.id уходят на один partition; внутри partition kafkajs сохраняет порядок вставки.

Без ключа — messages: [{ value: ... }] — round-robin между partitions. Сценарий поломки:

  1. OrderCreated(orderId=7) уходит на partition 0.
  2. OrderConfirmed(orderId=7) уходит на partition 3.
  3. Consumer для partition 3 обрабатывает OrderConfirmed до того, как consumer для partition 0 добрался до OrderCreated.
  4. Downstream получает подтверждение для несуществующего заказа.

JSON по умолчанию

R-KFK-PROD-3: JSON.stringify как дефолтный способ сериализации.

await producer.send({
  topic: 'product-service.product.price-updated',
  acks: -1,
  messages: [
    {
      key: String(product.id),
      value: JSON.stringify(event),
    },
  ],
});

JSON прост в отладке (kafkajs console consumer показывает читаемый payload), не требует Schema Registry. Для high-throughput топиков (миллиарды событий в сутки) — Avro/Protobuf через @kafkajs/confluent-schema-registry. Это отдельная инфра; в UCP-сервисах не дефолт.

Не producer.send из use case handler

R-KFK-PROD-4: domain events публикуются через outbox, не прямым вызовом producer из handler.

// НЕ ТАК — Kafka и DB не атомарны
@Injectable()
export class ConfirmOrderHandler {
  constructor(
    private readonly orders: OrderRepository,
    private readonly producer: KafkaProducerService,
    private readonly dataSource: DataSource,
  ) {}

  async handle(command: ConfirmOrderCommand): Promise<Order> {
    return this.dataSource.transaction(async (manager) => {
      const order = await manager.findOneByOrFail(OrderEntity, { id: command.orderId });
      order.confirm();
      await manager.save(order);

      // ОШИБКА: при rollback DB событие уже отправлено
      await this.producer.send({
        topic: 'order-service.order.confirmed',
        messages: [{ key: String(order.id), value: JSON.stringify(event) }],
      });

      return toDomain(order);
    });
  }
}

Сценарии поломки:

  1. producer.send прошёл, save упал с deadlock → событие опубликовано, в БД заказ не подтверждён. Расхождение состояний.
  2. save прошёл, send бросил сетевую ошибку → заказ подтверждён в БД, downstream не знает об этом.

Корректно — через outbox:

async handle(command: ConfirmOrderCommand): Promise<Order> {
  return this.dataSource.transaction(async (manager) => {
    const order = await manager.findOneByOrFail(OrderEntity, { id: command.orderId });
    order.confirm();
    await manager.save(order);

    // запись в outbox — в той же транзакции, что save
    await manager.insert(OutboxEventEntity, {
      aggregateType: 'Order',
      aggregateId: String(order.id),
      eventType: 'order.confirmed.v1',
      topic: 'order-service.order.confirmed',
      partitionKey: String(order.id),
      payload: JSON.stringify(OrderConfirmedEvent.from(toDomain(order))),
    });

    return toDomain(order);
  });
}

Запись в outbox_event идёт в одной DB-транзакции с save. Атомарность гарантирует PG. Отдельный outbox-relay (@Interval) читает unpublished и публикует через kafkajs. Подробнее — Outbox publishing.

Допустимый прямой producer.send:

  • Технические audit-events (в дополнение к audit_log таблице).
  • Метрики и health-сигналы.
  • Команды другим сервисам без транзакционного контекста (например запрос на отчёт из admin-инструмента).

DI-провайдер producer в NestJS

KafkaProducerService — синглтон, управляет connect/disconnect через lifecycle-хуки:

@Injectable()
export class KafkaProducerService implements OnApplicationBootstrap, OnApplicationShutdown {
  private readonly producer: Producer;

  constructor(private readonly kafka: Kafka) {
    this.producer = kafka.producer({ idempotent: true, maxInFlightRequests: 5 });
  }

  async onApplicationBootstrap(): Promise<void> {
    await this.producer.connect();
  }

  async onApplicationShutdown(): Promise<void> {
    await this.producer.disconnect();
  }

  async send(record: ProducerRecord): Promise<RecordMetadata[]> {
    return this.producer.send({ acks: -1, ...record });
  }
}

send всегда проставляет acks: -1 — вызывающий код не может случайно передать acks: 0 или acks: 1. Kafka-инстанс регистрируется в модуле из KafkaConfig (zod/class-validator, R-KFK-CFG-1).

Что запрещено

АнтипаттернПравилоЧто взамен
idempotent: falseR-KFK-PROD-X1idempotent: true всегда
acks: 0 или acks: 1R-KFK-PROD-X2acks: -1
Send без key для бизнес-событийR-KFK-PROD-X3aggregate id как key
producer.send из транзакции с DB-операциейR-KFK-PROD-X4outbox pattern
maxInFlightRequests > 5 с idempotent: trueR-KFK-PROD-1≤ 5 (kafkajs выбросит ошибку иначе)
Агрегат целиком в payloadR-KFK-PROD-3объект с явными полями (см. Event design)
producer.disconnect в catch без graceful shutdownR-KFK-PROD-1OnApplicationShutdown hook

Куда дальше

  • Outbox publishing — как send через outbox-relay с @Interval и FOR UPDATE SKIP LOCKED.
  • Event design — payload format, eventId, версионирование eventType.
  • Конфигурация — KafkaConfig, zod-валидация, fail-fast на отсутствующий топик.
  • Consumer — autoCommit: false, commitOffsets, heartbeat().
  • Idempotent consumer — processed_event, dedup в одной транзакции.
  • Retry topic + DLQ — retry-топики с x-attempt, DLQ, monitoring.
  • Observability — prom-client, traceparent в headers, consumer lag alert.
  • Security — TLS, SASL/SCRAM, ACL per-сервис.