---
title: "Kafka Producer — idempotence, acks=all, partition key (Node/NestJS)"
nav_title: "Producer"
excerpt: "Kafka producer в NestJS/kafkajs: idempotent: true даёт acks=all и exactly-once на partition; key = aggregate id обязателен для ordering; domain-события только через outbox."
keywords: "Kafka producer NestJS, kafkajs idempotent, acks all, partition key aggregate id, producer.send outbox, R-KFK-PROD, exactly-once partition, TypeScript kafkajs"
focus_keyword: "Kafka Producer NestJS kafkajs idempotent"
tags:
  - kafka
  - node
  - nestjs
  - producer
  - outbox
  - backend
---

# Kafka Producer — idempotence, acks=all, partition key (Node/NestJS)

> **Опирается на правила:** `R-KFK-PROD-1` … `R-KFK-PROD-4` и `R-KFK-PROD-X1` … `R-KFK-PROD-X4` из Kafka Style Guide → [раздел 1. Producer](/standards/backend/kafka/#1-producer).

> **Важно знать**
> - **`idempotent: true` всегда.** В kafkajs это `kafka.producer({ idempotent: true, maxInFlightRequests: 5 })` — требует `acks: -1`. Exactly-once на уровне partition.
> - **`acks: -1`** — единственно допустимое значение для бизнес-событий. `acks: 0` и `acks: 1` запрещены.
> - **Partition key обязателен** для всех бизнес-событий. Дефолтный ключ — **aggregate id**.
> - **JSON-сериализация** (`JSON.stringify`) по умолчанию. Avro/Protobuf через `@kafkajs/confluent-schema-registry` — только для bandwidth-чувствительных топиков.
> - **`producer.send` напрямую из use case handler** для domain events — **запрещён**. События идут через outbox.
> - **Kafka не XA** — нельзя в одной транзакции с PG. Только outbox.
> - **`maxInFlightRequests: 5`** при `idempotent: true` — kafkajs кидает ошибку при превышении.

Kafka producer — точка, где сервис «публикует факт» во внешний мир. В kafkajs producer создаётся явно: `kafka.producer(options)`, и его жизненный цикл управляется вручную. UCP формулирует правила так, чтобы producer был **exactly-once** на partition и **атомичен** с DB через outbox.

## `idempotent: true` — всегда

`R-KFK-PROD-1`: один флаг включает три гарантии сразу.

```ts
import { Kafka } from 'kafkajs';

const kafka = new Kafka({
  clientId: 'order-service',
  brokers: [process.env.KAFKA_BROKERS!],
});

const producer = kafka.producer({
  idempotent: true,
  maxInFlightRequests: 5,
});
```

Что kafkajs делает с `idempotent: true`:
- **`acks: -1`** — broker подтверждает только после репликации на `min.insync.replicas` followers.
- **`maxInFlightRequests: 5`** — гарантирует ordering между retry (kafkajs кидает ошибку, если задать больше вместе с `idempotent: true`).
- Broker дедуплицирует записи по `(producerId, sequenceNumber)` на partition — retry со стороны producer не создаёт дубликат в Kafka.

Без `idempotent: true` retry на стороне producer создаёт дубликаты: broker получает одно сообщение дважды, downstream consumer видит оба.

## Partition key — обязателен

`R-KFK-PROD-2`: key определяет, на какой partition уйдёт сообщение.

В NestJS producer оборачивается в провайдер. Например, для агрегата `Order`:

```ts
@Injectable()
export class OrderEventPublisher {
  constructor(private readonly producer: KafkaProducerService) {}

  async publishConfirmed(order: Order): Promise<void> {
    await this.producer.send({
      topic: 'order-service.order.confirmed',
      messages: [
        {
          key: String(order.id),
          value: JSON.stringify(this.toEvent(order)),
        },
      ],
    });
  }

  private toEvent(order: Order): OrderConfirmedEvent {
    return {
      eventId: order.confirmedEventId,
      eventType: 'order.confirmed.v1',
      occurredAt: order.confirmedAt.toISOString(),
      aggregateType: 'Order',
      aggregateId: String(order.id),
      customerId: String(order.customerId),
      totalAmount: order.totalAmount,
    };
  }
}
```

Дефолтный ключ — `String(order.id)` (aggregate id). Все события одного `order.id` уходят на один partition; внутри partition kafkajs сохраняет порядок вставки.

Без ключа — `messages: [{ value: ... }]` — round-robin между partitions. Сценарий поломки:
1. `OrderCreated(orderId=7)` уходит на partition 0.
2. `OrderConfirmed(orderId=7)` уходит на partition 3.
3. Consumer для partition 3 обрабатывает `OrderConfirmed` **до** того, как consumer для partition 0 добрался до `OrderCreated`.
4. Downstream получает подтверждение для несуществующего заказа.

## JSON по умолчанию

`R-KFK-PROD-3`: `JSON.stringify` как дефолтный способ сериализации.

```ts
await producer.send({
  topic: 'product-service.product.price-updated',
  acks: -1,
  messages: [
    {
      key: String(product.id),
      value: JSON.stringify(event),
    },
  ],
});
```

JSON прост в отладке (`kafkajs` console consumer показывает читаемый payload), не требует Schema Registry. Для high-throughput топиков (миллиарды событий в сутки) — Avro/Protobuf через `@kafkajs/confluent-schema-registry`. Это отдельная инфра; в UCP-сервисах не дефолт.

## Не `producer.send` из use case handler

`R-KFK-PROD-4`: domain events публикуются через outbox, не прямым вызовом producer из handler.

```ts
// НЕ ТАК — Kafka и DB не атомарны
@Injectable()
export class ConfirmOrderHandler {
  constructor(
    private readonly orders: OrderRepository,
    private readonly producer: KafkaProducerService,
    private readonly dataSource: DataSource,
  ) {}

  async handle(command: ConfirmOrderCommand): Promise<Order> {
    return this.dataSource.transaction(async (manager) => {
      const order = await manager.findOneByOrFail(OrderEntity, { id: command.orderId });
      order.confirm();
      await manager.save(order);

      // ОШИБКА: при rollback DB событие уже отправлено
      await this.producer.send({
        topic: 'order-service.order.confirmed',
        messages: [{ key: String(order.id), value: JSON.stringify(event) }],
      });

      return toDomain(order);
    });
  }
}
```

Сценарии поломки:
1. `producer.send` прошёл, `save` упал с deadlock → событие опубликовано, в БД заказ не подтверждён. Расхождение состояний.
2. `save` прошёл, `send` бросил сетевую ошибку → заказ подтверждён в БД, downstream не знает об этом.

Корректно — через outbox:

```ts
async handle(command: ConfirmOrderCommand): Promise<Order> {
  return this.dataSource.transaction(async (manager) => {
    const order = await manager.findOneByOrFail(OrderEntity, { id: command.orderId });
    order.confirm();
    await manager.save(order);

    // запись в outbox — в той же транзакции, что save
    await manager.insert(OutboxEventEntity, {
      aggregateType: 'Order',
      aggregateId: String(order.id),
      eventType: 'order.confirmed.v1',
      topic: 'order-service.order.confirmed',
      partitionKey: String(order.id),
      payload: JSON.stringify(OrderConfirmedEvent.from(toDomain(order))),
    });

    return toDomain(order);
  });
}
```

Запись в `outbox_event` идёт в одной DB-транзакции с `save`. Атомарность гарантирует PG. Отдельный outbox-relay (`@Interval`) читает unpublished и публикует через kafkajs. Подробнее — [Outbox publishing](node/outbox-publishing.md).

Допустимый прямой `producer.send`:
- Технические audit-events (в дополнение к `audit_log` таблице).
- Метрики и health-сигналы.
- Команды другим сервисам без транзакционного контекста (например запрос на отчёт из admin-инструмента).

## DI-провайдер producer в NestJS

`KafkaProducerService` — синглтон, управляет `connect`/`disconnect` через lifecycle-хуки:

```ts
@Injectable()
export class KafkaProducerService implements OnApplicationBootstrap, OnApplicationShutdown {
  private readonly producer: Producer;

  constructor(private readonly kafka: Kafka) {
    this.producer = kafka.producer({ idempotent: true, maxInFlightRequests: 5 });
  }

  async onApplicationBootstrap(): Promise<void> {
    await this.producer.connect();
  }

  async onApplicationShutdown(): Promise<void> {
    await this.producer.disconnect();
  }

  async send(record: ProducerRecord): Promise<RecordMetadata[]> {
    return this.producer.send({ acks: -1, ...record });
  }
}
```

`send` всегда проставляет `acks: -1` — вызывающий код не может случайно передать `acks: 0` или `acks: 1`. `Kafka`-инстанс регистрируется в модуле из `KafkaConfig` (zod/class-validator, `R-KFK-CFG-1`).

## Что запрещено

| Антипаттерн | Правило | Что взамен |
|---|---|---|
| `idempotent: false` | `R-KFK-PROD-X1` | `idempotent: true` всегда |
| `acks: 0` или `acks: 1` | `R-KFK-PROD-X2` | `acks: -1` |
| Send без `key` для бизнес-событий | `R-KFK-PROD-X3` | aggregate id как key |
| `producer.send` из транзакции с DB-операцией | `R-KFK-PROD-X4` | outbox pattern |
| `maxInFlightRequests > 5` с `idempotent: true` | `R-KFK-PROD-1` | ≤ 5 (kafkajs выбросит ошибку иначе) |
| Агрегат целиком в payload | `R-KFK-PROD-3` | объект с явными полями (см. [Event design](node/event-design.md)) |
| `producer.disconnect` в catch без graceful shutdown | `R-KFK-PROD-1` | `OnApplicationShutdown` hook |

## Куда дальше

- [Outbox publishing](node/outbox-publishing.md) — как send через outbox-relay с `@Interval` и `FOR UPDATE SKIP LOCKED`.
- [Event design](node/event-design.md) — payload format, `eventId`, версионирование `eventType`.
- [Конфигурация](node/configuration.md) — `KafkaConfig`, zod-валидация, fail-fast на отсутствующий топик.
- [Consumer](node/consumer.md) — `autoCommit: false`, `commitOffsets`, `heartbeat()`.
- [Idempotent consumer](node/idempotent-consumer.md) — `processed_event`, dedup в одной транзакции.
- [Retry topic + DLQ](node/retry-and-dlq.md) — retry-топики с `x-attempt`, DLQ, monitoring.
- [Observability](node/observability.md) — `prom-client`, `traceparent` в headers, consumer lag alert.
- [Security](node/security.md) — TLS, SASL/SCRAM, ACL per-сервис.
