Опирается на правила:
R-SHUT-KFK-1…R-SHUT-KFK-4иR-SHUT-KFK-X1из Graceful Shutdown Style Guide → раздел 3. Kafka shutdown.
Важно знать
consumer.disconnect()вbeforeApplicationShutdownзавершает текущийeachMessage/eachBatchи коммитит offset — без явного вызова kafkajs не ждёт.- Таймаут ~20s обязателен поверх disconnect через
Promise.race— без него SIGKILL придёт раньше завершения текущего batch.- Handler не запускает долгий cascade — chain HTTP с retry не уложится в таймаут; cascade — в async-flow или outbox.
- Явная commit-семантика:
resolveOffset()+commitOffsetsIfNecessary()после обработки записи, не до.producer.disconnect()(flush + close) — вbeforeApplicationShutdownпосле consumer; pending batch не потеряются.- Commit до обработки (
resolveOffset()доawait handler(message)) — offset закоммичен, сообщение потеряно при падении.- Fire-and-forget handler без
awaitвнутриeachBatch— kafkajs считает batch завершённым мгновенно, offset коммитится до реальной обработки.
Kafka shutdown — место, где данные теряются незаметно. Если consumer убит до commit offset — при следующем запуске придут те же сообщения (replay). Если producer убит до flush — отправленные, но не сброшенные batch исчезают. UCP-контракт: дожать batch целиком, явный flush producer, consumer — replay-safe через idempotency.
Consumer disconnect в beforeApplicationShutdown
R-SHUT-KFK-1: consumer дожидается текущего eachMessage/eachBatch и коммитит offset.
@Injectable()
export class OrderConsumer implements BeforeApplicationShutdown {
private readonly consumer: Consumer;
constructor(private readonly kafka: Kafka) {
this.consumer = kafka.consumer({ groupId: 'order-service-confirmations' });
}
async onModuleInit(): Promise<void> {
await this.consumer.connect();
await this.consumer.subscribe({ topic: 'orders.confirmed', fromBeginning: false });
await this.consumer.run({
eachBatch: async ({ batch, resolveOffset, commitOffsetsIfNecessary, isRunning }) => {
for (const message of batch.messages) {
if (!isRunning()) break;
await this.handle(message);
resolveOffset(message.offset);
await commitOffsetsIfNecessary();
}
},
});
}
async beforeApplicationShutdown(): Promise<void> {
await Promise.race([
this.consumer.disconnect(),
new Promise<void>((resolve) => setTimeout(resolve, 20_000).unref()),
]);
}
}
consumer.disconnect() отправляет LeaveGroup, ждёт завершения активного eachBatch и закрывает соединения. Promise.race с таймаутом 20s — защита от зависшего handler: если batch не завершился за 20s, disconnect завершается принудительно, Nest продолжает shutdown.
Nest вызывает beforeApplicationShutdown до закрытия HTTP-сервера — Kafka-часть дренируется параллельно с HTTP drain. Суммарный budget: 20s Kafka + 25s HTTP ≤ 60s (R-SHUT-OBS-1).
Handler без долгого cascade
R-SHUT-KFK-2: listener-метод не запускает chain HTTP-вызовов с retry.
Опасная схема — PaymentService вызывает внешние системы прямо из handler:
eachBatch: async ({ batch, resolveOffset }) => {
for (const message of batch.messages) {
const event = JSON.parse(message.value!.toString());
await this.paymentClient.charge(event.orderId, event.amount);
await this.notificationClient.notify(event.customerId, 'charged');
resolveOffset(message.offset);
}
},
Худший случай — 50+ секунд на одно сообщение. Promise.race сработает за 20s, disconnect завершится принудительно, offset не закоммитится, при перезапуске — replay. Если paymentClient.charge уже прошёл — без idempotency-ключа будет двойное списание.
Корректно — handler только фиксирует намерение в outbox:
eachBatch: async ({ batch, resolveOffset, commitOffsetsIfNecessary }) => {
for (const message of batch.messages) {
const event: OrderConfirmedEvent = JSON.parse(message.value!.toString());
await this.db.transaction(async (tx) => {
const seen = await tx.query(
'INSERT INTO processed_events(event_id, consumer_group) VALUES ($1, $2) ON CONFLICT DO NOTHING RETURNING id',
[event.eventId, 'order-service-confirmations'],
);
if (seen.rowCount === 0) return;
await tx.query(
'INSERT INTO outbox(aggregate_id, type, payload) VALUES ($1, $2, $3)',
[event.orderId, 'ChargePaymentRequested', JSON.stringify(event)],
);
});
resolveOffset(message.offset);
await commitOffsetsIfNecessary();
}
},
Транзакция в БД — одна, завершается за < 50ms. Реальный HTTP charge — в отдельном outbox-relay worker. Listener на shutdown всегда укладывается в таймаут.
Явная commit-семантика
R-SHUT-KFK-3: resolveOffset() после обработки, commitOffsetsIfNecessary() — для группировки commit в batch.
kafkajs при eachBatch не коммитит автоматически — ответственность на коде:
eachBatch: async ({ batch, resolveOffset, commitOffsetsIfNecessary, isRunning, heartbeat }) => {
for (const message of batch.messages) {
if (!isRunning()) break;
const event: ProductStockUpdatedEvent = JSON.parse(message.value!.toString());
await this.inventoryService.applyStockUpdate(event);
resolveOffset(message.offset);
await commitOffsetsIfNecessary();
await heartbeat();
}
},
resolveOffset() до await handler() — антипаттерн: offset помечен обработанным, если handler бросит исключение или получит SIGTERM в этот момент — сообщение потеряно.
При eachMessage (одиночный режим) kafkajs берёт commit-управление на себя — autoCommit: true по умолчанию. Для явного контроля:
await this.consumer.run({
autoCommit: false,
eachMessage: async ({ topic, partition, message, heartbeat }) => {
const event: CustomerUpdatedEvent = JSON.parse(message.value!.toString());
await this.customerService.applyUpdate(event);
await this.consumer.commitOffsets([{
topic,
partition,
offset: (Number(message.offset) + 1).toString(),
}]);
await heartbeat();
},
});
Producer disconnect
R-SHUT-KFK-4: producer.disconnect() в beforeApplicationShutdown — flush pending batch и закрытие соединений.
@Injectable()
export class OrderEventProducer implements BeforeApplicationShutdown {
private readonly producer: Producer;
constructor(private readonly kafka: Kafka) {
this.producer = kafka.producer({ idempotent: true });
}
async onModuleInit(): Promise<void> {
await this.producer.connect();
}
async send(topic: string, messages: Message[]): Promise<void> {
await this.producer.send({ topic, messages });
}
async beforeApplicationShutdown(): Promise<void> {
await Promise.race([
this.producer.disconnect(),
new Promise<void>((resolve) => setTimeout(resolve, 15_000).unref()),
]);
}
}
producer.disconnect() сначала флашит все ожидающие записи на broker, затем закрывает соединения. Без этого вызова pending records в памяти теряются при завершении процесса.
idempotent: true — защита от дублей при ретраях: kafka-сторона дедупликует по sequence number. Для денежных операций outbox-паттерн надёжнее — producer flush может не успеть при SIGKILL.
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
resolveOffset() до await handler() | R-SHUT-KFK-X1 | resolveOffset() после успешной обработки |
eachBatch без isRunning() проверки | R-SHUT-KFK-1 | if (!isRunning()) break перед обработкой каждого message |
Fire-and-forget handler: void handler(msg) без await | R-SHUT-KFK-X1 | await handler(msg) — kafkajs ждёт завершения |
| Chain HTTP с retry внутри handler (cascade > 20s) | R-SHUT-KFK-2 | outbox + отдельный relay worker |
autoCommit: true без явного контроля | R-SHUT-KFK-3 | autoCommit: false + commitOffsets() после обработки |
consumer.disconnect() без таймаута | R-SHUT-KFK-1 | Promise.race с setTimeout(20_000).unref() |
producer.disconnect() без вызова на shutdown | R-SHUT-KFK-4 | явный disconnect в beforeApplicationShutdown |
| Consumer и producer в одном модуле без lifecycle | R-SHUT-KFK-1 | отдельные injectable с BeforeApplicationShutdown |
Куда дальше
- Бюджеты и observability — Kafka в общем 60s бюджете shutdown
- БД и persistence —
pool.end()вonApplicationShutdownпосле Kafka - HTTP drain —
server.close()+closeIdleConnections()параллельно с Kafka - Идемпотентность in-flight — replay-safety для consumer и producer
- Runtime-конфигурация NestJS —
enableShutdownHooks, readiness→503, force-deadline - Kubernetes —
terminationGracePeriodSeconds: 60, preStop, probes - Фоновые задачи и outbox — outbox-relay с проверкой draining-флага