---
title: "Kafka Consumer в NestJS — autoCommit: false, groupId, partitionsConsumedConcurrently"
nav_title: "Consumer"
excerpt: "Kafka consumer в NestJS/kafkajs: groupId per-роль, autoCommit: false + commitOffsets после обработки, fromBeginning для critical-топиков, heartbeat() против rebalance."
keywords: "Kafka consumer NestJS, kafkajs autoCommit false, groupId naming, fromBeginning true, partitionsConsumedConcurrently, heartbeat kafkajs, R-KFK-CONS, commitOffsets"
focus_keyword: "Kafka Consumer NestJS kafkajs autoCommit"
tags: ["kafka", "consumer", "nestjs", "typescript", "node", "kafkajs"]
---

# Kafka Consumer в NestJS — autoCommit: false, groupId, partitionsConsumedConcurrently

> **Опирается на правила:** `R-KFK-CONS-1` … `R-KFK-CONS-6` и `R-KFK-CONS-X1` … `R-KFK-CONS-X4` из Kafka Style Guide → [раздел 2. Consumer](/standards/backend/kafka/#2-consumer).

> **Важно знать**
> - **Уникальный `groupId`** формата `<service>-<purpose>`. Один group = одна логическая роль.
> - **`autoCommit: false` обязателен.** kafkajs по умолчанию делает `autoCommit: true` — offset уходит до завершения обработки, crash = потеря.
> - **`commitOffsets` после успешной обработки.** Offset коммитится как `String(Number(message.offset) + 1)` — следующий к чтению.
> - **Listener idempotent** — at-least-once delivery, duplicate-detection на стороне consumer (`R-KFK-IDEM-*`).
> - **`fromBeginning: true`** для critical-consumer'ов — аналог `auto.offset.reset: earliest`.
> - **`partitionsConsumedConcurrently`** ≤ числа партиций — иначе лишние slot'ы простаивают.
> - **`await heartbeat()`** в долгих обработчиках — у kafkajs нет `max.poll.interval.ms`, аналог — `heartbeat` + `sessionTimeout`/`rebalanceTimeout`.
> - **HTTP к внешней системе без CB/bulkhead** — `eachMessage` зависает, Kafka rebalance, дубликаты.

Consumer — точка, где сервис **принимает** факт из внешнего мира. Главные ошибки здесь — потерянное событие (offset ушёл раньше обработки) и rebalance loop (consumer не успевает ответить на heartbeat, Kafka забирает partition).

NestJS не использует Kafka Microservices Transport для бизнес-consumer'ов — он не даёт ручного управления offset'ами. Kafkajs используется напрямую через DI-провайдер.

## groupId per logical purpose

`R-KFK-CONS-1`: формат `<service>-<purpose>`.

```ts
// billing.module.ts
const consumer = kafka.consumer({ groupId: 'billing-order-confirmed' });
```

Каждый consumer-group имеет независимый offset и независимый rebalance. Если в Billing-сервисе два consumer'а — один слушает `orders.confirmed`, другой `payment.refund.requested` — им нужны разные `groupId`:

```ts
const orderConsumer = kafka.consumer({ groupId: 'billing-order-confirmed' });
const refundConsumer = kafka.consumer({ groupId: 'billing-refund-requested' });
```

Один общий `groupId: 'billing-service'` для всех — Kafka распределяет partition'ы между instance'ами группы без понимания, какой consumer за что отвечает. При rebalance одного нарушается работа всех.

## autoCommit: false и commitOffsets

`R-KFK-CONS-2`: ручное управление offset'ом.

```ts
// OrderConfirmedConsumer — провайдер в BillingModule
@Injectable()
export class OrderConfirmedConsumer implements OnApplicationBootstrap, OnApplicationShutdown {
  private consumer: Consumer;

  constructor(
    private readonly kafka: KafkaClient,
    private readonly handler: ConfirmOrderPaymentHandler,
  ) {}

  async onApplicationBootstrap(): Promise<void> {
    this.consumer = this.kafka.consumer({ groupId: 'billing-order-confirmed' });
    await this.consumer.connect();
    await this.consumer.subscribe({ topic: 'orders.confirmed', fromBeginning: true });
    await this.consumer.run({
      autoCommit: false,
      partitionsConsumedConcurrently: 3,
      eachMessage: async ({ topic, partition, message, heartbeat }) => {
        const event = orderConfirmedSchema.parse(
          JSON.parse(message.value!.toString()),
        );
        await this.handler.handle(event);
        await this.consumer.commitOffsets([
          { topic, partition, offset: String(Number(message.offset) + 1) },
        ]);
      },
    });
  }

  async onApplicationShutdown(): Promise<void> {
    await this.consumer.disconnect();
  }
}
```

Логика: `commitOffsets` вызывается **после** `handler.handle(event)`. Если handler бросает исключение до commit — следующий poll получит то же сообщение снова. Это at-least-once: ни одно событие не теряется, дубликаты обрабатываются через [Idempotent consumer](node/idempotent-consumer.md).

Offset коммитится как `Number(message.offset) + 1` — это следующая позиция к чтению, а не текущая. Это особенность kafkajs: `commitOffsets` принимает offset следующего сообщения, не последнего обработанного.

## Idempotent listener

`R-KFK-CONS-3`: сообщение может прийти 2+ раз — это норма at-least-once.

Сценарии дубликатов:
- Consumer rebalance до `commitOffsets`.
- Pod restart после обработки, но до commit (например, OOM).
- Replay из DLQ.
- Новый consumer-group читает топик с `fromBeginning: true`.

Защита — `processed_event` таблица в PG, проверка `eventId` перед обработкой, запись и бизнес-результат в одной транзакции. Подробнее — [Idempotent consumer](node/idempotent-consumer.md).

```ts
// handler: idempotency через orIgnore insert
async handle(event: OrderConfirmedEvent): Promise<void> {
  await this.dataSource.transaction(async (manager) => {
    const inserted = await manager
      .createQueryBuilder()
      .insert()
      .into(ProcessedEventEntity)
      .values({ eventId: event.eventId })
      .orIgnore()
      .execute();
    if (!inserted.identifiers.length) return;
    await this.billingRepository.createInvoice(manager, {
      orderId: event.aggregateId,
      customerId: event.customerId,
      totalAmount: event.totalAmount,
    });
  });
}
```

## fromBeginning: true

`R-KFK-CONS-4`: для critical-consumer'ов.

```ts
await consumer.subscribe({ topic: 'orders.confirmed', fromBeginning: true });
```

Аналог `auto.offset.reset: earliest` в Java/Spring Kafka.

Когда срабатывает: новая consumer-group без сохранённого offset или offset устарел (вышел за retention).

- **`fromBeginning: false`** (дефолт kafkajs) — начать с самого нового. Пропускаем все сообщения до старта consumer'а. Для денег/orders — катастрофа: новый deployment или первый запуск сервиса пропустит все события в retention-окне.
- **`fromBeginning: true`** — начать с самого старого retained-сообщения. Прочитаем всё. Для critical-consumer'ов — обязательно.

Для analytics или metric-aggregation, где старые данные не нужны — `fromBeginning: false` приемлемо. Но это явное исключение.

## partitionsConsumedConcurrently ≤ partitions

`R-KFK-CONS-5`: параллелизм внутри одного pod'а.

```ts
await consumer.run({
  autoCommit: false,
  partitionsConsumedConcurrently: 3,
  eachMessage: async ({ topic, partition, message, heartbeat }) => {
    // ...
  },
});
```

`partitionsConsumedConcurrently` — сколько `eachMessage`-обработчиков запускается параллельно внутри одного consumer instance. Правило: `partitionsConsumedConcurrently × pods ≤ partitions`.

Если у топика `products.updated` 9 партиций, 3 pod'а — `partitionsConsumedConcurrently: 3` на каждом pod: итого 9 параллельных обработчиков = 9 партиций. Больше не имеет смысла — у Kafka нет партиции для лишнего слота.

## heartbeat() в долгих обработчиках

`R-KFK-CONS-6` (адаптация для kafkajs): у kafkajs нет `max.poll.interval.ms`, аналог — `sessionTimeout` и `rebalanceTimeout` + явный `await heartbeat()` внутри долгой обработки.

```ts
const consumer = kafka.consumer({
  groupId: 'billing-order-confirmed',
  sessionTimeout: 30000,
  rebalanceTimeout: 60000,
});

await consumer.run({
  autoCommit: false,
  eachMessage: async ({ topic, partition, message, heartbeat }) => {
    const event = orderConfirmedSchema.parse(
      JSON.parse(message.value!.toString()),
    );
    await heartbeat();
    await this.externalSettlementAdapter.settle(event);
    await heartbeat();
    await this.consumer.commitOffsets([
      { topic, partition, offset: String(Number(message.offset) + 1) },
    ]);
  },
});
```

Если `eachMessage` занимает больше `sessionTimeout` без вызова `heartbeat()` — Kafka считает consumer dead и rebalance-ит partition. Следующий consumer получает те же сообщения → дубликаты.

`await heartbeat()` — встроенный механизм kafkajs. Вызываем до и после долгой внешней операции. Это не polling — это уведомление брокера «мы живы, продолжаем обрабатывать».

## Валидация payload через zod

`R-KFK-CFG-3`, `R-KFK-EVT-4` (адаптация для node): на границе consumer payload валидируется через статический реестр zod-схем, не «угадываем тип по строке из payload».

```ts
// core/order/domain/event/order-confirmed.schema.ts
import { z } from 'zod';

export const orderConfirmedSchema = z.object({
  eventId: z.string().uuid(),
  eventType: z.literal('order.confirmed.v1'),
  occurredAt: z.string().datetime(),
  aggregateType: z.literal('Order'),
  aggregateId: z.string().uuid(),
  customerId: z.string().uuid(),
  totalAmount: z.number().positive(),
  currency: z.string().length(3),
});

export type OrderConfirmedEvent = z.infer<typeof orderConfirmedSchema>;
```

```ts
// в eachMessage
const event = orderConfirmedSchema.parse(
  JSON.parse(message.value!.toString()),
);
// Если payload не соответствует схеме — ZodError → обработчик catch → DLQ
```

Poison-message (несовместимая схема) — не retry, сразу DLQ. Retry имеет смысл только для transient failures (сеть, DB timeout), не для контрактных ошибок.

## Что запрещено

| Антипаттерн | Правило | Что взамен |
|---|---|---|
| `autoCommit: true` (дефолт kafkajs) | `R-KFK-CONS-X1` | `autoCommit: false` + `commitOffsets` после обработки |
| `await sleep(N)` > 1s в `eachMessage` без `heartbeat()` | `R-KFK-CONS-X2` | [retry topic](node/retry-and-dlq.md) с `consumer.pause()` |
| `groupId` отсутствует или общий для разных consumer'ов | `R-KFK-CONS-X3` | `<service>-<purpose>` per-роль |
| HTTP к внешней системе без CB/bulkhead в `eachMessage` | `R-KFK-CONS-X4` | cockatiel CircuitBreaker + Bulkhead |
| `fromBeginning: false` для critical-consumer'ов | `R-KFK-CONS-4` | `fromBeginning: true` |
| `partitionsConsumedConcurrently` > числа партиций | `R-KFK-CONS-5` | ≤ partitions |
| Нет `await heartbeat()` в долгой обработке | `R-KFK-CONS-6` | `heartbeat()` до/после долгой операции |
| `catch (e) { logger.error(e); }` + commit без DLQ | `R-KFK-CONS-2` | publish в retry/DLQ, commit после |

## Куда дальше

- [Конфигурация](node/configuration.md) — `KafkaClient`, zod-валидируемый конфиг, fail-fast на старте.
- [Event design](node/event-design.md) — `eventId` UUID v7, `eventType.v1`, zod-схемы событий.
- [Idempotent consumer](node/idempotent-consumer.md) — `processed_event`, `orIgnore` insert, dedup в транзакции.
- [Observability](node/observability.md) — consumer lag через `prom-client`, `traceparent` в headers.
- [Outbox publishing](node/outbox-publishing.md) — domain-события через outbox, не прямой `producer.send`.
- [Producer](node/producer.md) — `idempotent: true`, `acks: -1`, partition key = aggregate id.
- [Retry topic + DLQ](node/retry-and-dlq.md) — `consumer.pause()`, `x-attempt` header, non-blocking retry.
- [Security](node/security.md) — TLS/SASL, ACL per-сервис, PII в restricted topics.
