Опирается на правила: R-SHUT-KFK-1R-SHUT-KFK-4 и R-SHUT-KFK-X1 из Graceful Shutdown Style Guide → раздел 3. Kafka shutdown.

Важно знать

  • consumer.disconnect() в beforeApplicationShutdown завершает текущий eachMessage/eachBatch и коммитит offset — без явного вызова kafkajs не ждёт.
  • Таймаут ~20s обязателен поверх disconnect через Promise.race — без него SIGKILL придёт раньше завершения текущего batch.
  • Handler не запускает долгий cascade — chain HTTP с retry не уложится в таймаут; cascade — в async-flow или outbox.
  • Явная commit-семантика: resolveOffset() + commitOffsetsIfNecessary() после обработки записи, не до.
  • producer.disconnect() (flush + close) — в beforeApplicationShutdown после consumer; pending batch не потеряются.
  • Commit до обработки (resolveOffset() до await handler(message)) — offset закоммичен, сообщение потеряно при падении.
  • Fire-and-forget handler без await внутри eachBatch — kafkajs считает batch завершённым мгновенно, offset коммитится до реальной обработки.

Kafka shutdown — место, где данные теряются незаметно. Если consumer убит до commit offset — при следующем запуске придут те же сообщения (replay). Если producer убит до flush — отправленные, но не сброшенные batch исчезают. UCP-контракт: дожать batch целиком, явный flush producer, consumer — replay-safe через idempotency.

Consumer disconnect в beforeApplicationShutdown

R-SHUT-KFK-1: consumer дожидается текущего eachMessage/eachBatch и коммитит offset.

@Injectable()
export class OrderConsumer implements BeforeApplicationShutdown {
  private readonly consumer: Consumer;

  constructor(private readonly kafka: Kafka) {
    this.consumer = kafka.consumer({ groupId: 'order-service-confirmations' });
  }

  async onModuleInit(): Promise<void> {
    await this.consumer.connect();
    await this.consumer.subscribe({ topic: 'orders.confirmed', fromBeginning: false });
    await this.consumer.run({
      eachBatch: async ({ batch, resolveOffset, commitOffsetsIfNecessary, isRunning }) => {
        for (const message of batch.messages) {
          if (!isRunning()) break;
          await this.handle(message);
          resolveOffset(message.offset);
          await commitOffsetsIfNecessary();
        }
      },
    });
  }

  async beforeApplicationShutdown(): Promise<void> {
    await Promise.race([
      this.consumer.disconnect(),
      new Promise<void>((resolve) => setTimeout(resolve, 20_000).unref()),
    ]);
  }
}

consumer.disconnect() отправляет LeaveGroup, ждёт завершения активного eachBatch и закрывает соединения. Promise.race с таймаутом 20s — защита от зависшего handler: если batch не завершился за 20s, disconnect завершается принудительно, Nest продолжает shutdown.

Nest вызывает beforeApplicationShutdown до закрытия HTTP-сервера — Kafka-часть дренируется параллельно с HTTP drain. Суммарный budget: 20s Kafka + 25s HTTP ≤ 60s (R-SHUT-OBS-1).

Handler без долгого cascade

R-SHUT-KFK-2: listener-метод не запускает chain HTTP-вызовов с retry.

Опасная схема — PaymentService вызывает внешние системы прямо из handler:

eachBatch: async ({ batch, resolveOffset }) => {
  for (const message of batch.messages) {
    const event = JSON.parse(message.value!.toString());
    await this.paymentClient.charge(event.orderId, event.amount);
    await this.notificationClient.notify(event.customerId, 'charged');
    resolveOffset(message.offset);
  }
},

Худший случай — 50+ секунд на одно сообщение. Promise.race сработает за 20s, disconnect завершится принудительно, offset не закоммитится, при перезапуске — replay. Если paymentClient.charge уже прошёл — без idempotency-ключа будет двойное списание.

Корректно — handler только фиксирует намерение в outbox:

eachBatch: async ({ batch, resolveOffset, commitOffsetsIfNecessary }) => {
  for (const message of batch.messages) {
    const event: OrderConfirmedEvent = JSON.parse(message.value!.toString());
    await this.db.transaction(async (tx) => {
      const seen = await tx.query(
        'INSERT INTO processed_events(event_id, consumer_group) VALUES ($1, $2) ON CONFLICT DO NOTHING RETURNING id',
        [event.eventId, 'order-service-confirmations'],
      );
      if (seen.rowCount === 0) return;

      await tx.query(
        'INSERT INTO outbox(aggregate_id, type, payload) VALUES ($1, $2, $3)',
        [event.orderId, 'ChargePaymentRequested', JSON.stringify(event)],
      );
    });
    resolveOffset(message.offset);
    await commitOffsetsIfNecessary();
  }
},

Транзакция в БД — одна, завершается за < 50ms. Реальный HTTP charge — в отдельном outbox-relay worker. Listener на shutdown всегда укладывается в таймаут.

Явная commit-семантика

R-SHUT-KFK-3: resolveOffset() после обработки, commitOffsetsIfNecessary() — для группировки commit в batch.

kafkajs при eachBatch не коммитит автоматически — ответственность на коде:

eachBatch: async ({ batch, resolveOffset, commitOffsetsIfNecessary, isRunning, heartbeat }) => {
  for (const message of batch.messages) {
    if (!isRunning()) break;

    const event: ProductStockUpdatedEvent = JSON.parse(message.value!.toString());
    await this.inventoryService.applyStockUpdate(event);

    resolveOffset(message.offset);
    await commitOffsetsIfNecessary();
    await heartbeat();
  }
},

resolveOffset() до await handler() — антипаттерн: offset помечен обработанным, если handler бросит исключение или получит SIGTERM в этот момент — сообщение потеряно.

При eachMessage (одиночный режим) kafkajs берёт commit-управление на себя — autoCommit: true по умолчанию. Для явного контроля:

await this.consumer.run({
  autoCommit: false,
  eachMessage: async ({ topic, partition, message, heartbeat }) => {
    const event: CustomerUpdatedEvent = JSON.parse(message.value!.toString());
    await this.customerService.applyUpdate(event);
    await this.consumer.commitOffsets([{
      topic,
      partition,
      offset: (Number(message.offset) + 1).toString(),
    }]);
    await heartbeat();
  },
});

Producer disconnect

R-SHUT-KFK-4: producer.disconnect() в beforeApplicationShutdown — flush pending batch и закрытие соединений.

@Injectable()
export class OrderEventProducer implements BeforeApplicationShutdown {
  private readonly producer: Producer;

  constructor(private readonly kafka: Kafka) {
    this.producer = kafka.producer({ idempotent: true });
  }

  async onModuleInit(): Promise<void> {
    await this.producer.connect();
  }

  async send(topic: string, messages: Message[]): Promise<void> {
    await this.producer.send({ topic, messages });
  }

  async beforeApplicationShutdown(): Promise<void> {
    await Promise.race([
      this.producer.disconnect(),
      new Promise<void>((resolve) => setTimeout(resolve, 15_000).unref()),
    ]);
  }
}

producer.disconnect() сначала флашит все ожидающие записи на broker, затем закрывает соединения. Без этого вызова pending records в памяти теряются при завершении процесса.

idempotent: true — защита от дублей при ретраях: kafka-сторона дедупликует по sequence number. Для денежных операций outbox-паттерн надёжнее — producer flush может не успеть при SIGKILL.

Что запрещено

АнтипаттернПравилоЧто взамен
resolveOffset() до await handler()R-SHUT-KFK-X1resolveOffset() после успешной обработки
eachBatch без isRunning() проверкиR-SHUT-KFK-1if (!isRunning()) break перед обработкой каждого message
Fire-and-forget handler: void handler(msg) без awaitR-SHUT-KFK-X1await handler(msg) — kafkajs ждёт завершения
Chain HTTP с retry внутри handler (cascade > 20s)R-SHUT-KFK-2outbox + отдельный relay worker
autoCommit: true без явного контроляR-SHUT-KFK-3autoCommit: false + commitOffsets() после обработки
consumer.disconnect() без таймаутаR-SHUT-KFK-1Promise.race с setTimeout(20_000).unref()
producer.disconnect() без вызова на shutdownR-SHUT-KFK-4явный disconnect в beforeApplicationShutdown
Consumer и producer в одном модуле без lifecycleR-SHUT-KFK-1отдельные injectable с BeforeApplicationShutdown

Куда дальше

  • Бюджеты и observability — Kafka в общем 60s бюджете shutdown
  • БД и persistence — pool.end() в onApplicationShutdown после Kafka
  • HTTP drain — server.close() + closeIdleConnections() параллельно с Kafka
  • Идемпотентность in-flight — replay-safety для consumer и producer
  • Runtime-конфигурация NestJS — enableShutdownHooks, readiness→503, force-deadline
  • Kubernetes — terminationGracePeriodSeconds: 60, preStop, probes
  • Фоновые задачи и outbox — outbox-relay с проверкой draining-флага