Опирается на правила:
R-KFK-CONS-1…R-KFK-CONS-6иR-KFK-CONS-X1…R-KFK-CONS-X4из Kafka Style Guide → раздел 2. Consumer.
Важно знать
- Уникальный
groupIdформата<service>-<purpose>. Один group = одна логическая роль.autoCommit: falseобязателен. kafkajs по умолчанию делаетautoCommit: true— offset уходит до завершения обработки, crash = потеря.commitOffsetsпосле успешной обработки. Offset коммитится какString(Number(message.offset) + 1)— следующий к чтению.- Listener idempotent — at-least-once delivery, duplicate-detection на стороне consumer (
R-KFK-IDEM-*).fromBeginning: trueдля critical-consumer'ов — аналогauto.offset.reset: earliest.partitionsConsumedConcurrently≤ числа партиций — иначе лишние slot'ы простаивают.await heartbeat()в долгих обработчиках — у kafkajs нетmax.poll.interval.ms, аналог —heartbeat+sessionTimeout/rebalanceTimeout.- HTTP к внешней системе без CB/bulkhead —
eachMessageзависает, Kafka rebalance, дубликаты.
Consumer — точка, где сервис принимает факт из внешнего мира. Главные ошибки здесь — потерянное событие (offset ушёл раньше обработки) и rebalance loop (consumer не успевает ответить на heartbeat, Kafka забирает partition).
NestJS не использует Kafka Microservices Transport для бизнес-consumer'ов — он не даёт ручного управления offset'ами. Kafkajs используется напрямую через DI-провайдер.
groupId per logical purpose
R-KFK-CONS-1: формат <service>-<purpose>.
// billing.module.ts
const consumer = kafka.consumer({ groupId: 'billing-order-confirmed' });
Каждый consumer-group имеет независимый offset и независимый rebalance. Если в Billing-сервисе два consumer'а — один слушает orders.confirmed, другой payment.refund.requested — им нужны разные groupId:
const orderConsumer = kafka.consumer({ groupId: 'billing-order-confirmed' });
const refundConsumer = kafka.consumer({ groupId: 'billing-refund-requested' });
Один общий groupId: 'billing-service' для всех — Kafka распределяет partition'ы между instance'ами группы без понимания, какой consumer за что отвечает. При rebalance одного нарушается работа всех.
autoCommit: false и commitOffsets
R-KFK-CONS-2: ручное управление offset'ом.
// OrderConfirmedConsumer — провайдер в BillingModule
@Injectable()
export class OrderConfirmedConsumer implements OnApplicationBootstrap, OnApplicationShutdown {
private consumer: Consumer;
constructor(
private readonly kafka: KafkaClient,
private readonly handler: ConfirmOrderPaymentHandler,
) {}
async onApplicationBootstrap(): Promise<void> {
this.consumer = this.kafka.consumer({ groupId: 'billing-order-confirmed' });
await this.consumer.connect();
await this.consumer.subscribe({ topic: 'orders.confirmed', fromBeginning: true });
await this.consumer.run({
autoCommit: false,
partitionsConsumedConcurrently: 3,
eachMessage: async ({ topic, partition, message, heartbeat }) => {
const event = orderConfirmedSchema.parse(
JSON.parse(message.value!.toString()),
);
await this.handler.handle(event);
await this.consumer.commitOffsets([
{ topic, partition, offset: String(Number(message.offset) + 1) },
]);
},
});
}
async onApplicationShutdown(): Promise<void> {
await this.consumer.disconnect();
}
}
Логика: commitOffsets вызывается после handler.handle(event). Если handler бросает исключение до commit — следующий poll получит то же сообщение снова. Это at-least-once: ни одно событие не теряется, дубликаты обрабатываются через Idempotent consumer.
Offset коммитится как Number(message.offset) + 1 — это следующая позиция к чтению, а не текущая. Это особенность kafkajs: commitOffsets принимает offset следующего сообщения, не последнего обработанного.
Idempotent listener
R-KFK-CONS-3: сообщение может прийти 2+ раз — это норма at-least-once.
Сценарии дубликатов:
- Consumer rebalance до
commitOffsets. - Pod restart после обработки, но до commit (например, OOM).
- Replay из DLQ.
- Новый consumer-group читает топик с
fromBeginning: true.
Защита — processed_event таблица в PG, проверка eventId перед обработкой, запись и бизнес-результат в одной транзакции. Подробнее — Idempotent consumer.
// handler: idempotency через orIgnore insert
async handle(event: OrderConfirmedEvent): Promise<void> {
await this.dataSource.transaction(async (manager) => {
const inserted = await manager
.createQueryBuilder()
.insert()
.into(ProcessedEventEntity)
.values({ eventId: event.eventId })
.orIgnore()
.execute();
if (!inserted.identifiers.length) return;
await this.billingRepository.createInvoice(manager, {
orderId: event.aggregateId,
customerId: event.customerId,
totalAmount: event.totalAmount,
});
});
}
fromBeginning: true
R-KFK-CONS-4: для critical-consumer'ов.
await consumer.subscribe({ topic: 'orders.confirmed', fromBeginning: true });
Аналог auto.offset.reset: earliest в Java/Spring Kafka.
Когда срабатывает: новая consumer-group без сохранённого offset или offset устарел (вышел за retention).
fromBeginning: false(дефолт kafkajs) — начать с самого нового. Пропускаем все сообщения до старта consumer'а. Для денег/orders — катастрофа: новый deployment или первый запуск сервиса пропустит все события в retention-окне.fromBeginning: true— начать с самого старого retained-сообщения. Прочитаем всё. Для critical-consumer'ов — обязательно.
Для analytics или metric-aggregation, где старые данные не нужны — fromBeginning: false приемлемо. Но это явное исключение.
partitionsConsumedConcurrently ≤ partitions
R-KFK-CONS-5: параллелизм внутри одного pod'а.
await consumer.run({
autoCommit: false,
partitionsConsumedConcurrently: 3,
eachMessage: async ({ topic, partition, message, heartbeat }) => {
// ...
},
});
partitionsConsumedConcurrently — сколько eachMessage-обработчиков запускается параллельно внутри одного consumer instance. Правило: partitionsConsumedConcurrently × pods ≤ partitions.
Если у топика products.updated 9 партиций, 3 pod'а — partitionsConsumedConcurrently: 3 на каждом pod: итого 9 параллельных обработчиков = 9 партиций. Больше не имеет смысла — у Kafka нет партиции для лишнего слота.
heartbeat() в долгих обработчиках
R-KFK-CONS-6 (адаптация для kafkajs): у kafkajs нет max.poll.interval.ms, аналог — sessionTimeout и rebalanceTimeout + явный await heartbeat() внутри долгой обработки.
const consumer = kafka.consumer({
groupId: 'billing-order-confirmed',
sessionTimeout: 30000,
rebalanceTimeout: 60000,
});
await consumer.run({
autoCommit: false,
eachMessage: async ({ topic, partition, message, heartbeat }) => {
const event = orderConfirmedSchema.parse(
JSON.parse(message.value!.toString()),
);
await heartbeat();
await this.externalSettlementAdapter.settle(event);
await heartbeat();
await this.consumer.commitOffsets([
{ topic, partition, offset: String(Number(message.offset) + 1) },
]);
},
});
Если eachMessage занимает больше sessionTimeout без вызова heartbeat() — Kafka считает consumer dead и rebalance-ит partition. Следующий consumer получает те же сообщения → дубликаты.
await heartbeat() — встроенный механизм kafkajs. Вызываем до и после долгой внешней операции. Это не polling — это уведомление брокера «мы живы, продолжаем обрабатывать».
Валидация payload через zod
R-KFK-CFG-3, R-KFK-EVT-4 (адаптация для node): на границе consumer payload валидируется через статический реестр zod-схем, не «угадываем тип по строке из payload».
// core/order/domain/event/order-confirmed.schema.ts
import { z } from 'zod';
export const orderConfirmedSchema = z.object({
eventId: z.string().uuid(),
eventType: z.literal('order.confirmed.v1'),
occurredAt: z.string().datetime(),
aggregateType: z.literal('Order'),
aggregateId: z.string().uuid(),
customerId: z.string().uuid(),
totalAmount: z.number().positive(),
currency: z.string().length(3),
});
export type OrderConfirmedEvent = z.infer<typeof orderConfirmedSchema>;
// в eachMessage
const event = orderConfirmedSchema.parse(
JSON.parse(message.value!.toString()),
);
// Если payload не соответствует схеме — ZodError → обработчик catch → DLQ
Poison-message (несовместимая схема) — не retry, сразу DLQ. Retry имеет смысл только для transient failures (сеть, DB timeout), не для контрактных ошибок.
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
autoCommit: true (дефолт kafkajs) | R-KFK-CONS-X1 | autoCommit: false + commitOffsets после обработки |
await sleep(N) > 1s в eachMessage без heartbeat() | R-KFK-CONS-X2 | retry topic с consumer.pause() |
groupId отсутствует или общий для разных consumer'ов | R-KFK-CONS-X3 | <service>-<purpose> per-роль |
HTTP к внешней системе без CB/bulkhead в eachMessage | R-KFK-CONS-X4 | cockatiel CircuitBreaker + Bulkhead |
fromBeginning: false для critical-consumer'ов | R-KFK-CONS-4 | fromBeginning: true |
partitionsConsumedConcurrently > числа партиций | R-KFK-CONS-5 | ≤ partitions |
Нет await heartbeat() в долгой обработке | R-KFK-CONS-6 | heartbeat() до/после долгой операции |
catch (e) { logger.error(e); } + commit без DLQ | R-KFK-CONS-2 | publish в retry/DLQ, commit после |
Куда дальше
- Конфигурация —
KafkaClient, zod-валидируемый конфиг, fail-fast на старте. - Event design —
eventIdUUID v7,eventType.v1, zod-схемы событий. - Idempotent consumer —
processed_event,orIgnoreinsert, dedup в транзакции. - Observability — consumer lag через
prom-client,traceparentв headers. - Outbox publishing — domain-события через outbox, не прямой
producer.send. - Producer —
idempotent: true,acks: -1, partition key = aggregate id. - Retry topic + DLQ —
consumer.pause(),x-attemptheader, non-blocking retry. - Security — TLS/SASL, ACL per-сервис, PII в restricted topics.