Опирается на правила: R-KFK-CONS-1R-KFK-CONS-6 и R-KFK-CONS-X1R-KFK-CONS-X4 из Kafka Style Guide → раздел 2. Consumer.

Важно знать

  • Уникальный groupId формата <service>-<purpose>. Один group = одна логическая роль.
  • autoCommit: false обязателен. kafkajs по умолчанию делает autoCommit: true — offset уходит до завершения обработки, crash = потеря.
  • commitOffsets после успешной обработки. Offset коммитится как String(Number(message.offset) + 1) — следующий к чтению.
  • Listener idempotent — at-least-once delivery, duplicate-detection на стороне consumer (R-KFK-IDEM-*).
  • fromBeginning: true для critical-consumer'ов — аналог auto.offset.reset: earliest.
  • partitionsConsumedConcurrently ≤ числа партиций — иначе лишние slot'ы простаивают.
  • await heartbeat() в долгих обработчиках — у kafkajs нет max.poll.interval.ms, аналог — heartbeat + sessionTimeout/rebalanceTimeout.
  • HTTP к внешней системе без CB/bulkheadeachMessage зависает, Kafka rebalance, дубликаты.

Consumer — точка, где сервис принимает факт из внешнего мира. Главные ошибки здесь — потерянное событие (offset ушёл раньше обработки) и rebalance loop (consumer не успевает ответить на heartbeat, Kafka забирает partition).

NestJS не использует Kafka Microservices Transport для бизнес-consumer'ов — он не даёт ручного управления offset'ами. Kafkajs используется напрямую через DI-провайдер.

groupId per logical purpose

R-KFK-CONS-1: формат <service>-<purpose>.

// billing.module.ts
const consumer = kafka.consumer({ groupId: 'billing-order-confirmed' });

Каждый consumer-group имеет независимый offset и независимый rebalance. Если в Billing-сервисе два consumer'а — один слушает orders.confirmed, другой payment.refund.requested — им нужны разные groupId:

const orderConsumer = kafka.consumer({ groupId: 'billing-order-confirmed' });
const refundConsumer = kafka.consumer({ groupId: 'billing-refund-requested' });

Один общий groupId: 'billing-service' для всех — Kafka распределяет partition'ы между instance'ами группы без понимания, какой consumer за что отвечает. При rebalance одного нарушается работа всех.

autoCommit: false и commitOffsets

R-KFK-CONS-2: ручное управление offset'ом.

// OrderConfirmedConsumer — провайдер в BillingModule
@Injectable()
export class OrderConfirmedConsumer implements OnApplicationBootstrap, OnApplicationShutdown {
  private consumer: Consumer;

  constructor(
    private readonly kafka: KafkaClient,
    private readonly handler: ConfirmOrderPaymentHandler,
  ) {}

  async onApplicationBootstrap(): Promise<void> {
    this.consumer = this.kafka.consumer({ groupId: 'billing-order-confirmed' });
    await this.consumer.connect();
    await this.consumer.subscribe({ topic: 'orders.confirmed', fromBeginning: true });
    await this.consumer.run({
      autoCommit: false,
      partitionsConsumedConcurrently: 3,
      eachMessage: async ({ topic, partition, message, heartbeat }) => {
        const event = orderConfirmedSchema.parse(
          JSON.parse(message.value!.toString()),
        );
        await this.handler.handle(event);
        await this.consumer.commitOffsets([
          { topic, partition, offset: String(Number(message.offset) + 1) },
        ]);
      },
    });
  }

  async onApplicationShutdown(): Promise<void> {
    await this.consumer.disconnect();
  }
}

Логика: commitOffsets вызывается после handler.handle(event). Если handler бросает исключение до commit — следующий poll получит то же сообщение снова. Это at-least-once: ни одно событие не теряется, дубликаты обрабатываются через Idempotent consumer.

Offset коммитится как Number(message.offset) + 1 — это следующая позиция к чтению, а не текущая. Это особенность kafkajs: commitOffsets принимает offset следующего сообщения, не последнего обработанного.

Idempotent listener

R-KFK-CONS-3: сообщение может прийти 2+ раз — это норма at-least-once.

Сценарии дубликатов:

  • Consumer rebalance до commitOffsets.
  • Pod restart после обработки, но до commit (например, OOM).
  • Replay из DLQ.
  • Новый consumer-group читает топик с fromBeginning: true.

Защита — processed_event таблица в PG, проверка eventId перед обработкой, запись и бизнес-результат в одной транзакции. Подробнее — Idempotent consumer.

// handler: idempotency через orIgnore insert
async handle(event: OrderConfirmedEvent): Promise<void> {
  await this.dataSource.transaction(async (manager) => {
    const inserted = await manager
      .createQueryBuilder()
      .insert()
      .into(ProcessedEventEntity)
      .values({ eventId: event.eventId })
      .orIgnore()
      .execute();
    if (!inserted.identifiers.length) return;
    await this.billingRepository.createInvoice(manager, {
      orderId: event.aggregateId,
      customerId: event.customerId,
      totalAmount: event.totalAmount,
    });
  });
}

fromBeginning: true

R-KFK-CONS-4: для critical-consumer'ов.

await consumer.subscribe({ topic: 'orders.confirmed', fromBeginning: true });

Аналог auto.offset.reset: earliest в Java/Spring Kafka.

Когда срабатывает: новая consumer-group без сохранённого offset или offset устарел (вышел за retention).

  • fromBeginning: false (дефолт kafkajs) — начать с самого нового. Пропускаем все сообщения до старта consumer'а. Для денег/orders — катастрофа: новый deployment или первый запуск сервиса пропустит все события в retention-окне.
  • fromBeginning: true — начать с самого старого retained-сообщения. Прочитаем всё. Для critical-consumer'ов — обязательно.

Для analytics или metric-aggregation, где старые данные не нужны — fromBeginning: false приемлемо. Но это явное исключение.

partitionsConsumedConcurrently ≤ partitions

R-KFK-CONS-5: параллелизм внутри одного pod'а.

await consumer.run({
  autoCommit: false,
  partitionsConsumedConcurrently: 3,
  eachMessage: async ({ topic, partition, message, heartbeat }) => {
    // ...
  },
});

partitionsConsumedConcurrently — сколько eachMessage-обработчиков запускается параллельно внутри одного consumer instance. Правило: partitionsConsumedConcurrently × pods ≤ partitions.

Если у топика products.updated 9 партиций, 3 pod'а — partitionsConsumedConcurrently: 3 на каждом pod: итого 9 параллельных обработчиков = 9 партиций. Больше не имеет смысла — у Kafka нет партиции для лишнего слота.

heartbeat() в долгих обработчиках

R-KFK-CONS-6 (адаптация для kafkajs): у kafkajs нет max.poll.interval.ms, аналог — sessionTimeout и rebalanceTimeout + явный await heartbeat() внутри долгой обработки.

const consumer = kafka.consumer({
  groupId: 'billing-order-confirmed',
  sessionTimeout: 30000,
  rebalanceTimeout: 60000,
});

await consumer.run({
  autoCommit: false,
  eachMessage: async ({ topic, partition, message, heartbeat }) => {
    const event = orderConfirmedSchema.parse(
      JSON.parse(message.value!.toString()),
    );
    await heartbeat();
    await this.externalSettlementAdapter.settle(event);
    await heartbeat();
    await this.consumer.commitOffsets([
      { topic, partition, offset: String(Number(message.offset) + 1) },
    ]);
  },
});

Если eachMessage занимает больше sessionTimeout без вызова heartbeat() — Kafka считает consumer dead и rebalance-ит partition. Следующий consumer получает те же сообщения → дубликаты.

await heartbeat() — встроенный механизм kafkajs. Вызываем до и после долгой внешней операции. Это не polling — это уведомление брокера «мы живы, продолжаем обрабатывать».

Валидация payload через zod

R-KFK-CFG-3, R-KFK-EVT-4 (адаптация для node): на границе consumer payload валидируется через статический реестр zod-схем, не «угадываем тип по строке из payload».

// core/order/domain/event/order-confirmed.schema.ts
import { z } from 'zod';

export const orderConfirmedSchema = z.object({
  eventId: z.string().uuid(),
  eventType: z.literal('order.confirmed.v1'),
  occurredAt: z.string().datetime(),
  aggregateType: z.literal('Order'),
  aggregateId: z.string().uuid(),
  customerId: z.string().uuid(),
  totalAmount: z.number().positive(),
  currency: z.string().length(3),
});

export type OrderConfirmedEvent = z.infer<typeof orderConfirmedSchema>;
// в eachMessage
const event = orderConfirmedSchema.parse(
  JSON.parse(message.value!.toString()),
);
// Если payload не соответствует схеме — ZodError → обработчик catch → DLQ

Poison-message (несовместимая схема) — не retry, сразу DLQ. Retry имеет смысл только для transient failures (сеть, DB timeout), не для контрактных ошибок.

Что запрещено

АнтипаттернПравилоЧто взамен
autoCommit: true (дефолт kafkajs)R-KFK-CONS-X1autoCommit: false + commitOffsets после обработки
await sleep(N) > 1s в eachMessage без heartbeat()R-KFK-CONS-X2retry topic с consumer.pause()
groupId отсутствует или общий для разных consumer'овR-KFK-CONS-X3<service>-<purpose> per-роль
HTTP к внешней системе без CB/bulkhead в eachMessageR-KFK-CONS-X4cockatiel CircuitBreaker + Bulkhead
fromBeginning: false для critical-consumer'овR-KFK-CONS-4fromBeginning: true
partitionsConsumedConcurrently > числа партицийR-KFK-CONS-5≤ partitions
Нет await heartbeat() в долгой обработкеR-KFK-CONS-6heartbeat() до/после долгой операции
catch (e) { logger.error(e); } + commit без DLQR-KFK-CONS-2publish в retry/DLQ, commit после

Куда дальше

  • Конфигурация — KafkaClient, zod-валидируемый конфиг, fail-fast на старте.
  • Event design — eventId UUID v7, eventType.v1, zod-схемы событий.
  • Idempotent consumer — processed_event, orIgnore insert, dedup в транзакции.
  • Observability — consumer lag через prom-client, traceparent в headers.
  • Outbox publishing — domain-события через outbox, не прямой producer.send.
  • Producer — idempotent: true, acks: -1, partition key = aggregate id.
  • Retry topic + DLQ — consumer.pause(), x-attempt header, non-blocking retry.
  • Security — TLS/SASL, ACL per-сервис, PII в restricted topics.