Опирается на правила:
R-KFK-PROD-1…R-KFK-PROD-4иR-KFK-PROD-X1…R-KFK-PROD-X4из Kafka Style Guide → раздел 1. Producer.
Важно знать
idempotent: trueвсегда. В kafkajs этоkafka.producer({ idempotent: true, maxInFlightRequests: 5 })— требуетacks: -1. Exactly-once на уровне partition.acks: -1— единственно допустимое значение для бизнес-событий.acks: 0иacks: 1запрещены.- Partition key обязателен для всех бизнес-событий. Дефолтный ключ — aggregate id.
- JSON-сериализация (
JSON.stringify) по умолчанию. Avro/Protobuf через@kafkajs/confluent-schema-registry— только для bandwidth-чувствительных топиков.producer.sendнапрямую из use case handler для domain events — запрещён. События идут через outbox.- Kafka не XA — нельзя в одной транзакции с PG. Только outbox.
maxInFlightRequests: 5приidempotent: true— kafkajs кидает ошибку при превышении.
Kafka producer — точка, где сервис «публикует факт» во внешний мир. В kafkajs producer создаётся явно: kafka.producer(options), и его жизненный цикл управляется вручную. UCP формулирует правила так, чтобы producer был exactly-once на partition и атомичен с DB через outbox.
idempotent: true — всегда
R-KFK-PROD-1: один флаг включает три гарантии сразу.
import { Kafka } from 'kafkajs';
const kafka = new Kafka({
clientId: 'order-service',
brokers: [process.env.KAFKA_BROKERS!],
});
const producer = kafka.producer({
idempotent: true,
maxInFlightRequests: 5,
});
Что kafkajs делает с idempotent: true:
acks: -1— broker подтверждает только после репликации наmin.insync.replicasfollowers.maxInFlightRequests: 5— гарантирует ordering между retry (kafkajs кидает ошибку, если задать больше вместе сidempotent: true).- Broker дедуплицирует записи по
(producerId, sequenceNumber)на partition — retry со стороны producer не создаёт дубликат в Kafka.
Без idempotent: true retry на стороне producer создаёт дубликаты: broker получает одно сообщение дважды, downstream consumer видит оба.
Partition key — обязателен
R-KFK-PROD-2: key определяет, на какой partition уйдёт сообщение.
В NestJS producer оборачивается в провайдер. Например, для агрегата Order:
@Injectable()
export class OrderEventPublisher {
constructor(private readonly producer: KafkaProducerService) {}
async publishConfirmed(order: Order): Promise<void> {
await this.producer.send({
topic: 'order-service.order.confirmed',
messages: [
{
key: String(order.id),
value: JSON.stringify(this.toEvent(order)),
},
],
});
}
private toEvent(order: Order): OrderConfirmedEvent {
return {
eventId: order.confirmedEventId,
eventType: 'order.confirmed.v1',
occurredAt: order.confirmedAt.toISOString(),
aggregateType: 'Order',
aggregateId: String(order.id),
customerId: String(order.customerId),
totalAmount: order.totalAmount,
};
}
}
Дефолтный ключ — String(order.id) (aggregate id). Все события одного order.id уходят на один partition; внутри partition kafkajs сохраняет порядок вставки.
Без ключа — messages: [{ value: ... }] — round-robin между partitions. Сценарий поломки:
OrderCreated(orderId=7)уходит на partition 0.OrderConfirmed(orderId=7)уходит на partition 3.- Consumer для partition 3 обрабатывает
OrderConfirmedдо того, как consumer для partition 0 добрался доOrderCreated. - Downstream получает подтверждение для несуществующего заказа.
JSON по умолчанию
R-KFK-PROD-3: JSON.stringify как дефолтный способ сериализации.
await producer.send({
topic: 'product-service.product.price-updated',
acks: -1,
messages: [
{
key: String(product.id),
value: JSON.stringify(event),
},
],
});
JSON прост в отладке (kafkajs console consumer показывает читаемый payload), не требует Schema Registry. Для high-throughput топиков (миллиарды событий в сутки) — Avro/Protobuf через @kafkajs/confluent-schema-registry. Это отдельная инфра; в UCP-сервисах не дефолт.
Не producer.send из use case handler
R-KFK-PROD-4: domain events публикуются через outbox, не прямым вызовом producer из handler.
// НЕ ТАК — Kafka и DB не атомарны
@Injectable()
export class ConfirmOrderHandler {
constructor(
private readonly orders: OrderRepository,
private readonly producer: KafkaProducerService,
private readonly dataSource: DataSource,
) {}
async handle(command: ConfirmOrderCommand): Promise<Order> {
return this.dataSource.transaction(async (manager) => {
const order = await manager.findOneByOrFail(OrderEntity, { id: command.orderId });
order.confirm();
await manager.save(order);
// ОШИБКА: при rollback DB событие уже отправлено
await this.producer.send({
topic: 'order-service.order.confirmed',
messages: [{ key: String(order.id), value: JSON.stringify(event) }],
});
return toDomain(order);
});
}
}
Сценарии поломки:
producer.sendпрошёл,saveупал с deadlock → событие опубликовано, в БД заказ не подтверждён. Расхождение состояний.saveпрошёл,sendбросил сетевую ошибку → заказ подтверждён в БД, downstream не знает об этом.
Корректно — через outbox:
async handle(command: ConfirmOrderCommand): Promise<Order> {
return this.dataSource.transaction(async (manager) => {
const order = await manager.findOneByOrFail(OrderEntity, { id: command.orderId });
order.confirm();
await manager.save(order);
// запись в outbox — в той же транзакции, что save
await manager.insert(OutboxEventEntity, {
aggregateType: 'Order',
aggregateId: String(order.id),
eventType: 'order.confirmed.v1',
topic: 'order-service.order.confirmed',
partitionKey: String(order.id),
payload: JSON.stringify(OrderConfirmedEvent.from(toDomain(order))),
});
return toDomain(order);
});
}
Запись в outbox_event идёт в одной DB-транзакции с save. Атомарность гарантирует PG. Отдельный outbox-relay (@Interval) читает unpublished и публикует через kafkajs. Подробнее — Outbox publishing.
Допустимый прямой producer.send:
- Технические audit-events (в дополнение к
audit_logтаблице). - Метрики и health-сигналы.
- Команды другим сервисам без транзакционного контекста (например запрос на отчёт из admin-инструмента).
DI-провайдер producer в NestJS
KafkaProducerService — синглтон, управляет connect/disconnect через lifecycle-хуки:
@Injectable()
export class KafkaProducerService implements OnApplicationBootstrap, OnApplicationShutdown {
private readonly producer: Producer;
constructor(private readonly kafka: Kafka) {
this.producer = kafka.producer({ idempotent: true, maxInFlightRequests: 5 });
}
async onApplicationBootstrap(): Promise<void> {
await this.producer.connect();
}
async onApplicationShutdown(): Promise<void> {
await this.producer.disconnect();
}
async send(record: ProducerRecord): Promise<RecordMetadata[]> {
return this.producer.send({ acks: -1, ...record });
}
}
send всегда проставляет acks: -1 — вызывающий код не может случайно передать acks: 0 или acks: 1. Kafka-инстанс регистрируется в модуле из KafkaConfig (zod/class-validator, R-KFK-CFG-1).
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
idempotent: false | R-KFK-PROD-X1 | idempotent: true всегда |
acks: 0 или acks: 1 | R-KFK-PROD-X2 | acks: -1 |
Send без key для бизнес-событий | R-KFK-PROD-X3 | aggregate id как key |
producer.send из транзакции с DB-операцией | R-KFK-PROD-X4 | outbox pattern |
maxInFlightRequests > 5 с idempotent: true | R-KFK-PROD-1 | ≤ 5 (kafkajs выбросит ошибку иначе) |
| Агрегат целиком в payload | R-KFK-PROD-3 | объект с явными полями (см. Event design) |
producer.disconnect в catch без graceful shutdown | R-KFK-PROD-1 | OnApplicationShutdown hook |
Куда дальше
- Outbox publishing — как send через outbox-relay с
@IntervalиFOR UPDATE SKIP LOCKED. - Event design — payload format,
eventId, версионированиеeventType. - Конфигурация —
KafkaConfig, zod-валидация, fail-fast на отсутствующий топик. - Consumer —
autoCommit: false,commitOffsets,heartbeat(). - Idempotent consumer —
processed_event, dedup в одной транзакции. - Retry topic + DLQ — retry-топики с
x-attempt, DLQ, monitoring. - Observability —
prom-client,traceparentв headers, consumer lag alert. - Security — TLS, SASL/SCRAM, ACL per-сервис.