← назад к разделу

Kafka-сообщения теряются в двух случаях: если consumer убит до того, как успел зафиксировать (закоммитить) обработанное сообщение, или если producer убит до того, как успел отправить накопленные данные на брокер. NestJS не защищает от этого автоматически — нужно явно управлять завершением.

Почему простого выключения недостаточно

Когда приложение получает сигнал завершения (SIGTERM), оно начинает останавливаться. Если в этот момент consumer обрабатывает очередную порцию сообщений, есть два сценария:

  • consumer убивается немедленно — offset не закоммичен, при следующем запуске те же сообщения придут снова (повтор);
  • consumer ждёт, пока текущая порция дообработается, затем коммитит offset и только потом закрывается — данные сохранены.

Второй сценарий называется корректным завершением. Чтобы он случился, нужно явно вызвать consumer.disconnect() в хуке beforeApplicationShutdown.

Как остановить consumer

В kafkajs метод consumer.disconnect() делает три вещи: отправляет брокеру команду LeaveGroup, ждёт завершения текущего eachMessage или eachBatch, закрывает соединения. Именно потому offset успевает закоммититься до закрытия.

@Injectable()
export class OrderConsumer implements BeforeApplicationShutdown {
  private readonly consumer: Consumer;

  constructor(private readonly kafka: Kafka) {
    this.consumer = kafka.consumer({ groupId: 'order-service-confirmations' });
  }

  async onModuleInit(): Promise<void> {
    await this.consumer.connect();
    await this.consumer.subscribe({ topic: 'orders.confirmed', fromBeginning: false });
    await this.consumer.run({
      eachBatch: async ({ batch, resolveOffset, commitOffsetsIfNecessary, isRunning }) => {
        for (const message of batch.messages) {
          if (!isRunning()) break;
          await this.handle(message);
          resolveOffset(message.offset);
          await commitOffsetsIfNecessary();
        }
      },
    });
  }

  async beforeApplicationShutdown(): Promise<void> {
    await Promise.race([
      this.consumer.disconnect(),
      new Promise<void>((resolve) => setTimeout(resolve, 20_000).unref()),
    ]);
  }
}

Зачем Promise.race с таймаутом 20 секунд? Если handler завис — например, ждёт ответа от внешнего сервиса, — disconnect() тоже зависнет. Без таймаута процесс будет ждать бесконечно, пока система не отправит SIGKILL. Таймаут 20 секунд — разумный предел: если за это время handler не завершился, disconnect завершается принудительно, NestJS продолжает выключаться.

Флаг isRunning() в цикле позволяет остановить обработку порции досрочно при получении сигнала завершения, не дожидаясь последнего сообщения в пачке.

Когда коммитить offset — частая ошибка

В режиме eachBatch kafkajs не коммитит автоматически — это ваша ответственность. Порядок важен: сначала обработка, потом resolveOffset() и commitOffsetsIfNecessary().

Частая ошибка — поставить resolveOffset() до обработки:

// Неправильно
resolveOffset(message.offset); // offset помечен обработанным
await this.handle(message);    // если здесь упадёт — сообщение потеряно

Если handler бросит исключение после того, как offset уже зафиксирован, сообщение считается обработанным — и при перезапуске не придёт снова. Данные пропали незаметно.

Правильный порядок:

await this.handle(message);
resolveOffset(message.offset);
await commitOffsetsIfNecessary();

В режиме eachMessage kafkajs по умолчанию коммитит сам (autoCommit: true). Для явного контроля:

await this.consumer.run({
  autoCommit: false,
  eachMessage: async ({ topic, partition, message }) => {
    await this.customerService.applyUpdate(
      JSON.parse(message.value!.toString())
    );
    await this.consumer.commitOffsets([{
      topic,
      partition,
      offset: (Number(message.offset) + 1).toString(),
    }]);
  },
});

Почему тяжёлую работу нельзя делать прямо в handler

Если handler вызывает внешние сервисы с повторными попытками, суммарное время может превысить 20 секунд. Тогда таймаут сработает, disconnect завершится принудительно, offset не закоммитится — при перезапуске придут те же сообщения. Если внешний вызов уже прошёл — без защиты от повторов получится задвоение.

Вместо прямых вызовов handler записывает намерение в базу данных (паттерн outbox), а реальная работа выполняется отдельным процессом:

eachBatch: async ({ batch, resolveOffset, commitOffsetsIfNecessary }) => {
  for (const message of batch.messages) {
    const event: OrderConfirmedEvent = JSON.parse(message.value!.toString());
    await this.db.transaction(async (tx) => {
      const seen = await tx.query(
        'INSERT INTO processed_events(event_id, consumer_group) VALUES ($1, $2) ON CONFLICT DO NOTHING RETURNING id',
        [event.eventId, 'order-service-confirmations'],
      );
      if (seen.rowCount === 0) return;

      await tx.query(
        'INSERT INTO outbox(aggregate_id, type, payload) VALUES ($1, $2, $3)',
        [event.orderId, 'ChargePaymentRequested', JSON.stringify(event)],
      );
    });
    resolveOffset(message.offset);
    await commitOffsetsIfNecessary();
  }
},

Транзакция в БД завершается за миллисекунды — handler всегда укладывается в таймаут. Запись processed_events защищает от повторов: если то же сообщение придёт ещё раз, ON CONFLICT DO NOTHING пропустит его.

Как остановить producer

Producer накапливает сообщения в памяти и отправляет их пакетами. Если процесс завершился до отправки, эти сообщения пропадут. Явный вызов producer.disconnect() сначала сбрасывает все накопленные записи на брокер, затем закрывает соединения.

@Injectable()
export class OrderEventProducer implements BeforeApplicationShutdown {
  private readonly producer: Producer;

  constructor(private readonly kafka: Kafka) {
    this.producer = kafka.producer({ idempotent: true });
  }

  async onModuleInit(): Promise<void> {
    await this.producer.connect();
  }

  async send(topic: string, messages: Message[]): Promise<void> {
    await this.producer.send({ topic, messages });
  }

  async beforeApplicationShutdown(): Promise<void> {
    await Promise.race([
      this.producer.disconnect(),
      new Promise<void>((resolve) => setTimeout(resolve, 15_000).unref()),
    ]);
  }
}

Опция idempotent: true защищает от дублей при автоматических повторах: Kafka дедуплицирует записи по порядковому номеру. Для критичных операций всё равно надёжнее outbox — disconnect() может не успеть при резком завершении.

Коротко

  • consumer.disconnect() в beforeApplicationShutdown дожидается завершения текущего eachBatch и коммитит offset — без него kafkajs не ждёт.
  • Таймаут 20 секунд через Promise.race обязателен — без него зависший handler заблокирует выключение навсегда.
  • resolveOffset() и commitOffsetsIfNecessary() вызываются только после успешной обработки сообщения, никак не до.
  • Тяжёлые внешние вызовы из handler не уложатся в таймаут — выносите их в outbox и отдельный обработчик.
  • producer.disconnect() сбрасывает накопленные записи на брокер перед закрытием; без вызова pending-сообщения теряются.

Что почитать дальше

  • Бюджеты и observability — как Kafka вписывается в общий 60-секундный бюджет завершения
  • HTTP drain — server.close() и как он работает параллельно с Kafka disconnect
  • Фоновые задачи и outbox — outbox-relay с проверкой флага draining
  • Kubernetes — terminationGracePeriodSeconds, preStop, readiness-зонды