Опирается на правила:
R-SHUT-IDEM-1иR-SHUT-IDEM-X1из Graceful Shutdown Style Guide → раздел 7. Идемпотентность in-flight.
Важно знать
- Операции, которые SIGTERM может прервать, обязаны быть retry-safe (
R-SHUT-IDEM-1).- HTTP POST —
Idempotency-Keyобязателен (AUTH-19); проверка через NestJS Guard перед бизнес-логикой.- kafkajs handler —
processed_event(event_id)вставляется в той же транзакции, что и side-effect.- outbox-relay — либо двух-фаза
PENDING → PUBLISHING → PUBLISHED, либо receiver-side dedup черезprocessed_event.- money-cascade без
Idempotency-Keyпод retry — при SIGTERM в момент второго attempt новый pod спишет повторно.- Graceful shutdown даёт шанс завершить операцию, но не гарантирует отсутствие partial при force-kill.
- Идемпотентность — последняя линия защиты, когда graceful не успел уложиться в бюджет 60s.
beforeApplicationShutdownиonApplicationShutdownне заменяют retry-safe дизайн: они снижают вероятность прерывания, но не исключают её.
Graceful shutdown в NestJS (app.enableShutdownHooks()) даёт операциям время завершиться: beforeApplicationShutdown → drain HTTP-сервера → onApplicationShutdown. Но бюджет — 60 секунд. Долгий cascade (kafkajs handler → HTTP-вызов к payment с retry × 3 × 10s) может не уложиться. Force-kill прерывает посередине. Если операция не идемпотентна — partial state → инцидент.
Три типа in-flight операций
R-SHUT-IDEM-1 — разные защиты для разных контекстов.
1. HTTP POST
Client → POST /orders (Idempotency-Key: order-uuid-abc)
NestJS: Guard проверяет ключ, сохраняет idempotency_record
OrderService.create() → INSERT order, status=CONFIRMED
[SIGTERM посередине, response не отправлен]
Client → retry: POST /orders (Idempotency-Key: order-uuid-abc)
NestJS (новый pod): Guard находит idempotency_record, возвращает кешированный response
Дубля нет
Guard на базе pg (node-postgres) или TypeORM:
@Injectable()
export class IdempotencyGuard implements CanActivate {
constructor(private readonly db: Pool) {}
async canActivate(ctx: ExecutionContext): Promise<boolean> {
const req = ctx.switchToHttp().getRequest<Request>();
const key = req.headers['idempotency-key'] as string | undefined;
if (!key) throw new BadRequestException('Idempotency-Key required');
const existing = await this.db.query<{ response_body: unknown }>(
'SELECT response_body FROM idempotency_record WHERE key = $1',
[key],
);
if (existing.rows.length > 0) {
ctx.switchToHttp().getResponse().json(existing.rows[0].response_body);
return false;
}
req['idempotencyKey'] = key;
return true;
}
}
После успешной обработки контроллер сохраняет ответ:
@Post('/orders')
@UseGuards(IdempotencyGuard)
async create(
@Headers('idempotency-key') key: string,
@Body() dto: CreateOrderDto,
): Promise<OrderResponse> {
const order = await this.orderService.create(dto);
await this.db.query(
'INSERT INTO idempotency_record(key, response_body) VALUES($1, $2) ON CONFLICT DO NOTHING',
[key, order],
);
return order;
}
Без Idempotency-Key — client retry создаёт второй заказ для того же Customer.
2. kafkajs handler
Handler: получил event_id=XYZ (orders.confirmed)
BEGIN TRANSACTION
INSERT processed_event(event_id='XYZ', handler='billing')
UPDATE order SET status='BILLED' WHERE id=... ← side-effect
COMMIT
resolveOffset(message.offset)
commitOffsetsIfNecessary()
[SIGTERM здесь — транзакция уже закоммичена, offset коммитится]
Restart: Handler получил event_id=XYZ снова (offset не был закоммичен до SIGTERM)
BEGIN TRANSACTION
INSERT processed_event(event_id='XYZ', handler='billing')
→ UNIQUE violation → rollback
Ранний return, resolveOffset → commitOffsetsIfNecessary
Дубля нет
Реализация через pg в beforeApplicationShutdown-безопасном @Injectable():
@Injectable()
export class OrderBillingConsumer implements OnApplicationBootstrap, BeforeApplicationShutdown {
private consumer!: Consumer;
constructor(
private readonly kafka: Kafka,
private readonly db: Pool,
private readonly shutdownState: ShutdownStateService,
) {}
async onApplicationBootstrap(): Promise<void> {
this.consumer = this.kafka.consumer({ groupId: 'billing-svc' });
await this.consumer.connect();
await this.consumer.subscribe({ topic: 'orders.confirmed', fromBeginning: false });
await this.consumer.run({
eachBatch: async ({ batch, resolveOffset, commitOffsetsIfNecessary, isStale }) => {
for (const message of batch.messages) {
if (isStale()) break;
const event: OrderConfirmedEvent = JSON.parse(message.value!.toString());
const client = await this.db.connect();
try {
await client.query('BEGIN');
const dedup = await client.query(
'INSERT INTO processed_event(event_id, handler) VALUES($1, $2) ON CONFLICT DO NOTHING RETURNING event_id',
[event.eventId, 'billing'],
);
if (dedup.rowCount === 0) {
await client.query('ROLLBACK');
} else {
await client.query(
'UPDATE "order" SET status = $1 WHERE id = $2',
['BILLED', event.orderId],
);
await client.query('COMMIT');
}
} catch (err) {
await client.query('ROLLBACK');
throw err;
} finally {
client.release();
}
resolveOffset(message.offset);
await commitOffsetsIfNecessary();
}
},
});
}
async beforeApplicationShutdown(): Promise<void> {
await Promise.race([
this.consumer.disconnect(),
new Promise((_, reject) => setTimeout(() => reject(new Error('kafka timeout')), 20_000).unref()),
]);
}
}
processed_event в той же транзакции — атомарно. Если SIGTERM прерывает до COMMIT — offset не закоммичен, replay безопасен.
3. Outbox-relay
Relay: SELECT * FROM outbox_event WHERE status='PENDING' LIMIT 50 FOR UPDATE SKIP LOCKED
Для каждого:
producer.send({ topic, messages: [{ value: payload, headers: { eventId } }] })
UPDATE outbox_event SET status='PUBLISHED', published_at=now() WHERE id=...
[SIGTERM между send и UPDATE]
Restart: SELECT * FROM outbox_event WHERE status='PENDING' LIMIT 50 ...
Тот же event → второй send
kafkajs consumer получает дубль eventId=XYZ
Двух-фаза через статус PUBLISHING:
@Injectable()
export class OutboxRelayService implements BeforeApplicationShutdown {
private running = false;
private currentBatch: Promise<void> = Promise.resolve();
constructor(
private readonly db: Pool,
private readonly producer: Producer,
private readonly shutdownState: ShutdownStateService,
) {}
@Interval(1_000)
async relay(): Promise<void> {
if (this.shutdownState.isDraining() || this.running) return;
this.running = true;
this.currentBatch = this.processBatch().finally(() => { this.running = false; });
await this.currentBatch;
}
private async processBatch(): Promise<void> {
const client = await this.db.connect();
try {
// Phase 1: lock + mark PUBLISHING
const { rows } = await client.query<{ id: string; topic: string; payload: unknown }>(
`UPDATE outbox_event SET status='PUBLISHING', locked_at=now()
WHERE id IN (
SELECT id FROM outbox_event WHERE status='PENDING' LIMIT 20 FOR UPDATE SKIP LOCKED
)
RETURNING id, topic, payload`,
);
for (const row of rows) {
// Phase 2: send
await this.producer.send({
topic: row.topic,
messages: [{ value: JSON.stringify(row.payload), headers: { eventId: row.id } }],
});
// Phase 3: mark PUBLISHED
await client.query(
`UPDATE outbox_event SET status='PUBLISHED', published_at=now() WHERE id=$1`,
[row.id],
);
}
} finally {
client.release();
}
}
async beforeApplicationShutdown(): Promise<void> {
await this.currentBatch;
}
}
Если SIGTERM между Phase 2 и Phase 3 — rows остаются в PUBLISHING. Cleanup-job раз в час возвращает их в PENDING. Retry безопасен только если consumer-side делает dedup через processed_event.
Альтернатива — всегда иметь receiver-side dedup (см. секцию 2). Тогда relay может публиковать дубли, consumer их игнорирует. Операционно проще, небольшой overhead в Kafka.
Граничные случаи
Money-HTTP без Idempotency-Key под retry
// НЕЛЬЗЯ — нет Idempotency-Key
async chargeCustomer(orderId: string, amount: Money): Promise<Receipt> {
return this.httpService.axiosRef.post(
`${this.paymentUrl}/charge`,
{ orderId, amount },
).then(r => r.data);
}
При shutdown в момент вызова:
POST /chargeуходит к payment-provider, network timeout.@nestjs/axiosretry — второй attempt.- Оба POST дошли до provider (network timeout ≠ «запрос не отправлен»).
- Provider без дедупа обрабатывает оба → двойное списание для Customer.
Корректно — Idempotency-Key генерируется один раз на бизнес-операцию:
async chargeCustomer(idempotencyKey: string, orderId: string, amount: Money): Promise<Receipt> {
return this.httpService.axiosRef.post(
`${this.paymentUrl}/charge`,
{ orderId, amount },
{ headers: { 'Idempotency-Key': idempotencyKey } },
).then(r => r.data);
}
idempotencyKey — например event.eventId из Kafka, передаётся через все retry.
kafkajs offset commit до side-effect
// НЕЛЬЗЯ — resolveOffset до завершения side-effect
eachBatch: async ({ batch, resolveOffset, commitOffsetsIfNecessary }) => {
for (const message of batch.messages) {
const event = JSON.parse(message.value!.toString());
resolveOffset(message.offset); // offset помечен
await commitOffsetsIfNecessary(); // offset закоммичен
await this.processEvent(event); // side-effect — может не завершиться
}
},
При SIGTERM после commitOffsetsIfNecessary но до завершения processEvent — offset ушёл вперёд, replay не произойдёт, side-effect потерян.
Правильно: resolveOffset и commit — только после того, как транзакция с processed_event и side-effect закоммичена (см. секцию 2).
ShutdownStateService не подключён к /health/ready
// НЕЛЬЗЯ — разрозненный флаг, k8s не узнает о draining
let shuttingDown = false;
process.on('SIGTERM', () => { shuttingDown = true; });
Readiness-проба не видит этот флаг, k8s продолжает слать трафик на умирающий pod. Правильно — единственный ShutdownStateService, реализующий BeforeApplicationShutdown, который terminus-проба читает через isDraining() (R-SHUT-CFG-3).
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
Money-HTTP без Idempotency-Key с retry | R-SHUT-IDEM-X1 | header обязателен, генерировать один раз на операцию |
resolveOffset до завершения side-effect | R-SHUT-IDEM-1 | commit после транзакции с processed_event |
| outbox-relay без receiver-side dedup и без двух-фазы | R-SHUT-IDEM-1 | PUBLISHING-статус или processed_event на consumer |
let shuttingDown вместо ShutdownStateService | R-SHUT-CFG-X1 | единственный сервис, завязанный на terminus |
fire-and-forget handler без await | R-SHUT-IDEM-1 | await обязателен, иначе offset обгоняет side-effect |
ON CONFLICT DO NOTHING без проверки rowCount | R-SHUT-IDEM-1 | проверять rowCount === 0 → ранний return |
| Идемпотентность только на одном сервисе | R-SHUT-IDEM-1 | end-to-end: client + NestJS-handler + downstream |
Куда дальше
- Бюджеты и observability — как измерить фактическую длительность shutdown и не выйти за 60s.
- БД и persistence — порядок
pool.end()/dataSource.destroy()вonApplicationShutdown. - HTTP drain —
server.close(),closeIdleConnections(), preStop sleep. - Конфигурация graceful shutdown NestJS —
enableShutdownHooks, force-deadline, readiness-флаг. - Kafka shutdown —
consumer.disconnect()с таймаутом,eachBatchсемантика. - Kubernetes —
terminationGracePeriodSeconds, probes,maxUnavailable: 0. - Scheduled / async / outbox — outbox-relay с draining-флагом, BullMQ
worker.close().