Когда NestJS получает SIGTERM, он запускает штатное завершение: дренирует HTTP-сервер, выполняет хуки beforeApplicationShutdown и onApplicationShutdown, и через 60 секунд процесс всё равно убивают принудительно. Если в этот момент в работе была операция — она прерывается на полпути.
Проблема не в самом прерывании, а в том, что при перезапуске pod'а операция запустится заново: Kubernetes перезапустит pod, клиент повторит запрос, Kafka предложит то же сообщение снова. Если операция не готова к повторному запуску — возникает двойной эффект: два заказа, двойное списание, задублированное письмо.
Идемпотентность — это свойство операции давать один и тот же результат при любом количестве повторных запусков. Реализуется по-разному в зависимости от контекста: HTTP, Kafka или outbox.
HTTP POST и Idempotency-Key
HTTP GET и DELETE идемпотентны сами по себе. POST — нет: каждый вызов создаёт новую сущность. Чтобы защитить POST от дублей, клиент отправляет заголовок Idempotency-Key — уникальный идентификатор конкретного бизнес-намерения. Сервер запоминает ответ по этому ключу и при повторном запросе возвращает сохранённый результат, не запуская логику заново.
Схема работы:
Client → POST /orders Idempotency-Key: order-uuid-abc
Guard: ключа нет в базе → пропускаем запрос
OrderService.create() → вставляем заказ
Сохраняем ответ в idempotency_record
[SIGTERM: процесс убит, ответ не дошёл до клиента]
Client → retry: POST /orders Idempotency-Key: order-uuid-abc
Guard: ключ уже есть → возвращаем сохранённый ответ
Заказ не создаётся повторно
Guard проверяет ключ до бизнес-логики:
@Injectable()
export class IdempotencyGuard implements CanActivate {
constructor(private readonly db: Pool) {}
async canActivate(ctx: ExecutionContext): Promise<boolean> {
const req = ctx.switchToHttp().getRequest<Request>();
const key = req.headers['idempotency-key'] as string | undefined;
if (!key) throw new BadRequestException('Idempotency-Key required');
const existing = await this.db.query<{ response_body: unknown }>(
'SELECT response_body FROM idempotency_record WHERE key = $1',
[key],
);
if (existing.rows.length > 0) {
ctx.switchToHttp().getResponse().json(existing.rows[0].response_body);
return false;
}
req['idempotencyKey'] = key;
return true;
}
}
После успешной обработки контроллер сохраняет ответ:
@Post('/orders')
@UseGuards(IdempotencyGuard)
async create(
@Headers('idempotency-key') key: string,
@Body() dto: CreateOrderDto,
): Promise<OrderResponse> {
const order = await this.orderService.create(dto);
await this.db.query(
'INSERT INTO idempotency_record(key, response_body) VALUES($1, $2) ON CONFLICT DO NOTHING',
[key, order],
);
return order;
}
Без Idempotency-Key каждый повторный запрос при retry создаёт новую запись — для одного и того же клиентского действия.
Исходящий HTTP с retry
Та же проблема возникает, когда NestJS сам вызывает внешний сервис с автоматическим retry. Если SIGTERM приходит во время такого вызова, два запроса могут дойти до провайдера — и оба обработаются:
// Опасно: при retry provider получит два одинаковых запроса
async chargeCustomer(orderId: string, amount: Money): Promise<Receipt> {
return this.httpService.axiosRef.post(
`${this.paymentUrl}/charge`,
{ orderId, amount },
).then(r => r.data);
}
Правильно — передавать ключ идемпотентности, сгенерированный один раз на бизнес-операцию, через все попытки:
async chargeCustomer(idempotencyKey: string, orderId: string, amount: Money): Promise<Receipt> {
return this.httpService.axiosRef.post(
`${this.paymentUrl}/charge`,
{ orderId, amount },
{ headers: { 'Idempotency-Key': idempotencyKey } },
).then(r => r.data);
}
idempotencyKey в этом случае — например, event.eventId из Kafka: он неизменен при retry и однозначно идентифицирует бизнес-операцию.
kafkajs: processed_event в той же транзакции
Kafka гарантирует доставку минимум один раз. При перезапуске consumer может получить то же сообщение снова — особенно если offset не успел закоммититься до SIGTERM. Защита — таблица уже обработанных событий processed_event.
Ключевое правило: вставка в processed_event и side-effect (изменение данных) должны выполняться в одной транзакции. Если транзакция не закоммитилась — при повторном запуске всё повторится корректно. Если закоммитилась — повторная вставка даст конфликт по уникальному индексу, и обработчик вернётся раньше без дублирования.
Handler: получил event_id=XYZ
BEGIN TRANSACTION
INSERT processed_event(event_id='XYZ', handler='billing') -- уникальный индекс
UPDATE order SET status='BILLED' WHERE id=...
COMMIT
resolveOffset → commitOffsetsIfNecessary
[SIGTERM: транзакция уже закоммичена]
Restart: Handler получил event_id=XYZ снова
BEGIN TRANSACTION
INSERT processed_event(event_id='XYZ', handler='billing')
→ нарушение уникальности → ROLLBACK
Ранний return, resolveOffset → commit
Дубля нет
Реализация:
@Injectable()
export class OrderBillingConsumer implements OnApplicationBootstrap, BeforeApplicationShutdown {
private consumer!: Consumer;
constructor(
private readonly kafka: Kafka,
private readonly db: Pool,
) {}
async onApplicationBootstrap(): Promise<void> {
this.consumer = this.kafka.consumer({ groupId: 'billing-svc' });
await this.consumer.connect();
await this.consumer.subscribe({ topic: 'orders.confirmed', fromBeginning: false });
await this.consumer.run({
eachBatch: async ({ batch, resolveOffset, commitOffsetsIfNecessary, isRunning }) => {
for (const message of batch.messages) {
if (!isRunning()) break;
const event: OrderConfirmedEvent = JSON.parse(message.value!.toString());
const client = await this.db.connect();
try {
await client.query('BEGIN');
const dedup = await client.query(
'INSERT INTO processed_event(event_id, handler) VALUES($1, $2) ON CONFLICT DO NOTHING RETURNING event_id',
[event.eventId, 'billing'],
);
if (dedup.rowCount === 0) {
await client.query('ROLLBACK');
} else {
await client.query(
'UPDATE "order" SET status = $1 WHERE id = $2',
['BILLED', event.orderId],
);
await client.query('COMMIT');
}
} catch (err) {
await client.query('ROLLBACK');
throw err;
} finally {
client.release();
}
resolveOffset(message.offset);
await commitOffsetsIfNecessary();
}
},
});
}
async beforeApplicationShutdown(): Promise<void> {
await Promise.race([
this.consumer.disconnect(),
new Promise((_, reject) => setTimeout(() => reject(new Error('kafka timeout')), 20_000).unref()),
]);
}
}
Частая ошибка — вызывать resolveOffset и commitOffsetsIfNecessary до завершения транзакции:
// Неправильно: offset закоммичен до side-effect
for (const message of batch.messages) {
resolveOffset(message.offset);
await commitOffsetsIfNecessary(); // offset ушёл вперёд
await this.processEvent(event); // может не завершиться при SIGTERM
}
При SIGTERM после commit offset'а, но до конца processEvent — side-effect потерян без возможности replay.
Outbox-relay: двухфазный статус
Outbox-паттерн означает, что сервис сначала записывает события в таблицу outbox_event, а отдельный relay-процесс отправляет их в Kafka. Проблема возникает, если SIGTERM приходит между отправкой в Kafka и пометкой события как PUBLISHED:
Relay: SELECT ... WHERE status='PENDING' FOR UPDATE SKIP LOCKED
producer.send(...)
[SIGTERM — UPDATE до PUBLISHED не выполнился]
Restart: SELECT ... WHERE status='PENDING' ...
Тот же event → второй send → дубль в Kafka
Решение — промежуточный статус PUBLISHING. Событие переходит в него до отправки; при SIGTERM между фазами оно остаётся в PUBLISHING и не попадает в следующую выборку PENDING. Отдельный cleanup-job раз в час возвращает зависшие PUBLISHING обратно в PENDING:
@Injectable()
export class OutboxRelayService implements BeforeApplicationShutdown {
private running = false;
private currentBatch: Promise<void> = Promise.resolve();
constructor(
private readonly db: Pool,
private readonly producer: Producer,
) {}
@Interval(1_000)
async relay(): Promise<void> {
if (this.running) return;
this.running = true;
this.currentBatch = this.processBatch().finally(() => { this.running = false; });
await this.currentBatch;
}
private async processBatch(): Promise<void> {
const client = await this.db.connect();
try {
// Фаза 1: забираем PENDING и помечаем PUBLISHING
const { rows } = await client.query<{ id: string; topic: string; payload: unknown }>(
`UPDATE outbox_event SET status='PUBLISHING', locked_at=now()
WHERE id IN (
SELECT id FROM outbox_event WHERE status='PENDING' LIMIT 20 FOR UPDATE SKIP LOCKED
)
RETURNING id, topic, payload`,
);
for (const row of rows) {
// Фаза 2: отправляем в Kafka
await this.producer.send({
topic: row.topic,
messages: [{ value: JSON.stringify(row.payload), headers: { eventId: row.id } }],
});
// Фаза 3: помечаем PUBLISHED
await client.query(
`UPDATE outbox_event SET status='PUBLISHED', published_at=now() WHERE id=$1`,
[row.id],
);
}
} finally {
client.release();
}
}
async beforeApplicationShutdown(): Promise<void> {
await this.currentBatch;
}
}
Если SIGTERM приходит между Фазой 2 и Фазой 3 — Kafka уже получила событие, но в базе оно в PUBLISHING. После cleanup оно попадёт в повторную отправку. Поэтому consumer-side всё равно должен делать дедупликацию через processed_event — тогда дубль в Kafka безопасен.
Альтернатива — не использовать статус PUBLISHING вообще, а сделать consumer-side деduplication обязательным. Тогда relay может отправлять дубли спокойно, и операционно это проще.
Коротко
- Graceful shutdown даёт время завершить операцию, но не гарантирует это: при force-kill через 60 секунд операция прервётся, а при перезапуске выполнится снова.
- HTTP POST: клиент отправляет
Idempotency-Key, NestJS Guard проверяет ключ и возвращает закешированный ответ при повторном запросе. - Исходящий HTTP с retry: ключ идемпотентности генерируется один раз на бизнес-операцию и передаётся во все попытки — иначе provider обрабатывает все retry независимо.
- kafkajs handler:
processed_eventвставляется в той же транзакции, что и side-effect; конфликт уникальности при повторе = ранний возврат без дублирования. resolveOffsetиcommitOffsetsIfNecessary— только после коммита транзакции: иначе offset уходит вперёд и side-effect теряется без replay.- Outbox-relay: промежуточный статус
PUBLISHINGзащищает от двойной отправки; зависшие записи возвращает вPENDINGcleanup-job. - Идемпотентность нужна на всей цепочке: клиент → NestJS-handler → downstream-сервис.
Что почитать дальше
- Конфигурация graceful shutdown NestJS —
enableShutdownHooks, force-deadline, readiness-флаг. - HTTP drain —
server.close(),closeIdleConnections(), preStop sleep. - Kafka shutdown —
consumer.disconnect()с таймаутом, семантикаeachBatch. - БД и persistence — порядок
pool.end()/dataSource.destroy()вonApplicationShutdown. - Бюджеты и observability — как измерить фактическую длительность shutdown.