Kafka-сообщения теряются в двух случаях: если consumer убит до того, как успел зафиксировать (закоммитить) обработанное сообщение, или если producer убит до того, как успел отправить накопленные данные на брокер. NestJS не защищает от этого автоматически — нужно явно управлять завершением.
Почему простого выключения недостаточно
Когда приложение получает сигнал завершения (SIGTERM), оно начинает останавливаться. Если в этот момент consumer обрабатывает очередную порцию сообщений, есть два сценария:
- consumer убивается немедленно — offset не закоммичен, при следующем запуске те же сообщения придут снова (повтор);
- consumer ждёт, пока текущая порция дообработается, затем коммитит offset и только потом закрывается — данные сохранены.
Второй сценарий называется корректным завершением. Чтобы он случился, нужно явно вызвать consumer.disconnect() в хуке beforeApplicationShutdown.
Как остановить consumer
В kafkajs метод consumer.disconnect() делает три вещи: отправляет брокеру команду LeaveGroup, ждёт завершения текущего eachMessage или eachBatch, закрывает соединения. Именно потому offset успевает закоммититься до закрытия.
@Injectable()
export class OrderConsumer implements BeforeApplicationShutdown {
private readonly consumer: Consumer;
constructor(private readonly kafka: Kafka) {
this.consumer = kafka.consumer({ groupId: 'order-service-confirmations' });
}
async onModuleInit(): Promise<void> {
await this.consumer.connect();
await this.consumer.subscribe({ topic: 'orders.confirmed', fromBeginning: false });
await this.consumer.run({
eachBatch: async ({ batch, resolveOffset, commitOffsetsIfNecessary, isRunning }) => {
for (const message of batch.messages) {
if (!isRunning()) break;
await this.handle(message);
resolveOffset(message.offset);
await commitOffsetsIfNecessary();
}
},
});
}
async beforeApplicationShutdown(): Promise<void> {
await Promise.race([
this.consumer.disconnect(),
new Promise<void>((resolve) => setTimeout(resolve, 20_000).unref()),
]);
}
}
Зачем Promise.race с таймаутом 20 секунд? Если handler завис — например, ждёт ответа от внешнего сервиса, — disconnect() тоже зависнет. Без таймаута процесс будет ждать бесконечно, пока система не отправит SIGKILL. Таймаут 20 секунд — разумный предел: если за это время handler не завершился, disconnect завершается принудительно, NestJS продолжает выключаться.
Флаг isRunning() в цикле позволяет остановить обработку порции досрочно при получении сигнала завершения, не дожидаясь последнего сообщения в пачке.
Когда коммитить offset — частая ошибка
В режиме eachBatch kafkajs не коммитит автоматически — это ваша ответственность. Порядок важен: сначала обработка, потом resolveOffset() и commitOffsetsIfNecessary().
Частая ошибка — поставить resolveOffset() до обработки:
// Неправильно
resolveOffset(message.offset); // offset помечен обработанным
await this.handle(message); // если здесь упадёт — сообщение потеряно
Если handler бросит исключение после того, как offset уже зафиксирован, сообщение считается обработанным — и при перезапуске не придёт снова. Данные пропали незаметно.
Правильный порядок:
await this.handle(message);
resolveOffset(message.offset);
await commitOffsetsIfNecessary();
В режиме eachMessage kafkajs по умолчанию коммитит сам (autoCommit: true). Для явного контроля:
await this.consumer.run({
autoCommit: false,
eachMessage: async ({ topic, partition, message }) => {
await this.customerService.applyUpdate(
JSON.parse(message.value!.toString())
);
await this.consumer.commitOffsets([{
topic,
partition,
offset: (Number(message.offset) + 1).toString(),
}]);
},
});
Почему тяжёлую работу нельзя делать прямо в handler
Если handler вызывает внешние сервисы с повторными попытками, суммарное время может превысить 20 секунд. Тогда таймаут сработает, disconnect завершится принудительно, offset не закоммитится — при перезапуске придут те же сообщения. Если внешний вызов уже прошёл — без защиты от повторов получится задвоение.
Вместо прямых вызовов handler записывает намерение в базу данных (паттерн outbox), а реальная работа выполняется отдельным процессом:
eachBatch: async ({ batch, resolveOffset, commitOffsetsIfNecessary }) => {
for (const message of batch.messages) {
const event: OrderConfirmedEvent = JSON.parse(message.value!.toString());
await this.db.transaction(async (tx) => {
const seen = await tx.query(
'INSERT INTO processed_events(event_id, consumer_group) VALUES ($1, $2) ON CONFLICT DO NOTHING RETURNING id',
[event.eventId, 'order-service-confirmations'],
);
if (seen.rowCount === 0) return;
await tx.query(
'INSERT INTO outbox(aggregate_id, type, payload) VALUES ($1, $2, $3)',
[event.orderId, 'ChargePaymentRequested', JSON.stringify(event)],
);
});
resolveOffset(message.offset);
await commitOffsetsIfNecessary();
}
},
Транзакция в БД завершается за миллисекунды — handler всегда укладывается в таймаут. Запись processed_events защищает от повторов: если то же сообщение придёт ещё раз, ON CONFLICT DO NOTHING пропустит его.
Как остановить producer
Producer накапливает сообщения в памяти и отправляет их пакетами. Если процесс завершился до отправки, эти сообщения пропадут. Явный вызов producer.disconnect() сначала сбрасывает все накопленные записи на брокер, затем закрывает соединения.
@Injectable()
export class OrderEventProducer implements BeforeApplicationShutdown {
private readonly producer: Producer;
constructor(private readonly kafka: Kafka) {
this.producer = kafka.producer({ idempotent: true });
}
async onModuleInit(): Promise<void> {
await this.producer.connect();
}
async send(topic: string, messages: Message[]): Promise<void> {
await this.producer.send({ topic, messages });
}
async beforeApplicationShutdown(): Promise<void> {
await Promise.race([
this.producer.disconnect(),
new Promise<void>((resolve) => setTimeout(resolve, 15_000).unref()),
]);
}
}
Опция idempotent: true защищает от дублей при автоматических повторах: Kafka дедуплицирует записи по порядковому номеру. Для критичных операций всё равно надёжнее outbox — disconnect() может не успеть при резком завершении.
Коротко
consumer.disconnect()вbeforeApplicationShutdownдожидается завершения текущегоeachBatchи коммитит offset — без него kafkajs не ждёт.- Таймаут 20 секунд через
Promise.raceобязателен — без него зависший handler заблокирует выключение навсегда. resolveOffset()иcommitOffsetsIfNecessary()вызываются только после успешной обработки сообщения, никак не до.- Тяжёлые внешние вызовы из handler не уложатся в таймаут — выносите их в outbox и отдельный обработчик.
producer.disconnect()сбрасывает накопленные записи на брокер перед закрытием; без вызова pending-сообщения теряются.
Что почитать дальше
- Бюджеты и observability — как Kafka вписывается в общий 60-секундный бюджет завершения
- HTTP drain —
server.close()и как он работает параллельно с Kafka disconnect - Фоновые задачи и outbox — outbox-relay с проверкой флага draining
- Kubernetes —
terminationGracePeriodSeconds, preStop, readiness-зонды