Опирается на правила:
R-SHUT-SCHED-1…R-SHUT-SCHED-3иR-SHUT-SCHED-X1из Graceful Shutdown Style Guide → раздел 5. Фоновые задачи / очереди / outbox.
Важно знать
@nestjs/schedule-интервал завершает текущую итерацию — хранить in-flight Promise иawaitего на shutdown (~25s).SchedulerRegistry.deleteInterval()останавливает следующий тик, но не прерывает уже запущенную итерацию.- BullMQ —
await worker.close()(безforce) ждёт активные джобы;worker.close(true)— антипаттерн (R-SHUT-SCHED-X1).- Долгий async-cascade — реагировать на shutdown-сигнал (
AbortSignal/isDraining()): дожать критичную секцию, не начинать следующую.- Outbox-relay завершает текущий batch (атомарно через
FOR UPDATE SKIP LOCKED), не начинает новый; цикл читаетshutdownState.isDraining(), неwhile (true).clearIntervalбезawaitin-flight Promise — задача брошена посреди итерации, частичные изменения без rollback (inconsistent state).- Total budget 60s — preStop 10s + HTTP drain ≤25s + фоновые задачи ≤20s + Kafka ≤15s ≤ 60s.
Фоновые задачи (@nestjs/schedule, BullMQ, outbox-relay) — место, где graceful shutdown ломается незаметно. Если задача убита на середине, транзакция откатывается, но side-effect (Kafka-send, HTTP-вызов) уже произошёл — это рассогласование. UCP формулирует: дать каждой задаче завершить текущую итерацию, следующую не начинать, force-shutdown только в крайнем случае.
@nestjs/schedule дожимает итерацию
R-SHUT-SCHED-1: @nestjs/schedule не предоставляет встроенного await in-flight. Нужно хранить Promise самостоятельно:
@Injectable()
export class OutboxRelayService implements BeforeApplicationShutdown {
private runningJob: Promise<void> | null = null;
constructor(
private readonly schedulerRegistry: SchedulerRegistry,
private readonly shutdownState: ShutdownStateService,
private readonly outboxRepository: OutboxRepository,
private readonly producer: KafkaProducerService,
) {}
@Interval('outbox-relay', 500)
async relay(): Promise<void> {
if (this.shutdownState.isDraining()) return;
this.runningJob = this.processBatch();
await this.runningJob;
this.runningJob = null;
}
async beforeApplicationShutdown(): Promise<void> {
if (this.schedulerRegistry.doesExist('interval', 'outbox-relay')) {
this.schedulerRegistry.deleteInterval('outbox-relay');
}
if (this.runningJob) {
await Promise.race([this.runningJob, timeout(25_000)]);
}
}
private async processBatch(): Promise<void> {
const batch = await this.outboxRepository.fetchUnpublished(50);
for (const event of batch) {
await this.producer.send(event.topic, event.partitionKey, event.payload);
await this.outboxRepository.markPublished(event.id);
}
}
}
Что происходит на SIGTERM:
- NestJS вызывает
beforeApplicationShutdownвсех Injectable. deleteInterval— тик больше не запускается.- Если
processBatch()выполняется прямо сейчас —await this.runningJobдожидается её завершения (или 25s timeout). - Текущий batch доведён до конца; следующий не начат.
Запрос от сервиса Order (domain-пример):
@Injectable()
export class OrderOutboxRepository {
constructor(private readonly pool: Pool) {}
async fetchUnpublished(limit: number): Promise<OutboxEvent[]> {
const { rows } = await this.pool.query<OutboxEvent>(
`SELECT id, topic, partition_key, payload
FROM order_outbox
WHERE published_at IS NULL
ORDER BY id
LIMIT $1
FOR UPDATE SKIP LOCKED`,
[limit],
);
return rows;
}
async markPublished(id: string): Promise<void> {
await this.pool.query(
`UPDATE order_outbox SET published_at = NOW() WHERE id = $1`,
[id],
);
}
}
FOR UPDATE SKIP LOCKED — атомарность: другой pod или новый pod после деплоя подхватывает разблокированные строки.
BullMQ — worker.close() без force
R-SHUT-SCHED-1, R-SHUT-SCHED-X1: BullMQ-worker ждёт активные джобы при корректном закрытии.
@Injectable()
export class PaymentWorkerService implements OnApplicationBootstrap, BeforeApplicationShutdown {
private worker!: Worker;
constructor(
private readonly shutdownState: ShutdownStateService,
private readonly paymentService: PaymentService,
) {}
onApplicationBootstrap(): void {
this.worker = new Worker(
'payment-jobs',
async (job: Job<PaymentJobData>) => {
if (this.shutdownState.isDraining()) {
await job.moveToFailed(new Error('draining'), job.token ?? '');
return;
}
await this.paymentService.processCharge(job.data);
},
{ connection: redisConnection, concurrency: 5 },
);
}
async beforeApplicationShutdown(): Promise<void> {
await Promise.race([
this.worker.close(),
timeout(20_000),
]);
}
}
worker.close() — graceful: ждёт активные джобы. worker.close(true) — force: обрывает немедленно, partial state.
Джоб payment по домену Customer:
interface PaymentJobData {
customerId: string;
orderId: string;
amountKopecks: number;
idempotencyKey: string;
}
idempotencyKey обязателен (R-SHUT-IDEM-1) — SIGTERM в момент повтора не должен давать двойное списание.
Долгий async-cascade с AbortSignal
R-SHUT-SCHED-2: если cascade не помещается в budget, разбить на checkpoint'ы и реагировать на сигнал остановки.
@Injectable()
export class ProductSyncService implements BeforeApplicationShutdown {
private abortController = new AbortController();
async beforeApplicationShutdown(): Promise<void> {
this.abortController.abort();
}
async syncProductCatalog(): Promise<void> {
const products = await this.fetchExternalProducts();
for (const product of products) {
if (this.abortController.signal.aborted) break;
await this.productRepository.upsert(product);
await this.searchIndexer.index(product);
}
}
}
При SIGTERM: abort() выставлен, следующая итерация цикла break-ается, транзакция коммитится по уже обработанным записям. Новый pod продолжит с первой необработанной.
Outbox-relay: только isDraining(), не while (true)
R-SHUT-SCHED-3: цикл проверяет draining-флаг.
// НЕЛЬЗЯ
@Interval('relay', 100)
async relay(): Promise<void> {
while (true) {
await this.processBatch();
}
}
Такой цикл не завершается — beforeApplicationShutdown зависнет на await this.runningJob, пока не сработает timeout. Batch прерван посередине.
// ПРАВИЛЬНО — короткие итерации, флаг draining
@Interval('relay', 500)
async relay(): Promise<void> {
if (this.shutdownState.isDraining()) return;
this.runningJob = this.processBatch();
await this.runningJob;
this.runningJob = null;
}
Следующий тик не запускается после deleteInterval. Текущий batch завершается полностью.
Полная картина бюджета
SIGTERM
│
├─ preStop sleep 10s (k8s endpoint draining)
├─ HTTP drain ≤ 25s (app.close → server.close → in-flight)
├─ Scheduler/BullMQ ≤ 20s (await runningJob / worker.close)
├─ Kafka consumer ≤ 15s (consumer.disconnect)
└─ pool.end() (onApplicationShutdown — после дренажа)
──────
≤ 60s = terminationGracePeriodSeconds
Overlapping-фазы: HTTP drain и scheduler/BullMQ идут параллельно, поэтому реальный budget не суммируется линейно. Но каждая фаза не должна превышать своего лимита.
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
clearInterval без await in-flight Promise | R-SHUT-SCHED-X1 | deleteInterval + await runningJob |
worker.close(true) (force) | R-SHUT-SCHED-X1 | worker.close() (graceful) |
while (true) в @Interval-методе | R-SHUT-SCHED-3 | короткие итерации с draining-проверкой |
isDraining() не проверяется в relay | R-SHUT-SCHED-3 | if (shutdownState.isDraining()) return |
Долгий cascade без AbortSignal/флага | R-SHUT-SCHED-2 | checkpoint-петля с signal.aborted |
BullMQ-джоб без idempotencyKey | R-SHUT-IDEM-1 | обязательное поле в JobData |
pool.end() в beforeApplicationShutdown | R-SHUT-DB-X1 | onApplicationShutdown (после дренажа) |
Куда дальше
- Бюджеты и observability — метрика
app_shutdown_duration_seconds, total 60s. - БД и persistence —
pool.end()в правильной фазе. - HTTP drain —
server.close(),closeIdleConnections(), preStop. - Идемпотентность in-flight — retry-safety на interrupt.
- Runtime/конфигурация NestJS —
enableShutdownHooks, force-deadline, readiness→503. - Kafka shutdown —
consumer.disconnect()с таймаутом вbeforeApplicationShutdown. - Kubernetes —
terminationGracePeriodSeconds: 60, preStop, probes.