Когда Kubernetes посылает SIGTERM, NestJS начинает останавливаться. Для HTTP-запросов это выглядит аккуратно: сервер перестаёт принимать новые соединения и ждёт, пока старые завершатся. Но фоновые задачи — @nestjs/schedule, BullMQ, outbox-relay — не останавливаются сами по себе. Если убить их посередине, получим рассогласование: транзакция откатилась, а Kafka-сообщение уже ушло.
Правильный подход: дать каждой задаче завершить текущую итерацию, следующую не начинать.
Scheduler: в NestJS нет встроенного ожидания
@nestjs/schedule запускает методы по расписанию, но не предоставляет способа узнать, выполняется ли метод прямо сейчас. SchedulerRegistry.deleteInterval() просто отменяет следующий тик — если метод уже работает, он продолжит выполнение, и никто его не дождётся.
Решение: хранить Promise текущей итерации в поле класса и ждать его в хуке beforeApplicationShutdown.
@Injectable()
export class OutboxRelayService implements BeforeApplicationShutdown {
private runningJob: Promise<void> | null = null;
constructor(
private readonly schedulerRegistry: SchedulerRegistry,
private readonly shutdownState: ShutdownStateService,
private readonly pool: Pool,
private readonly producer: KafkaProducerService,
) {}
@Interval('outbox-relay', 500)
async relay(): Promise<void> {
if (this.shutdownState.isDraining()) return;
this.runningJob = this.processBatch();
await this.runningJob;
this.runningJob = null;
}
async beforeApplicationShutdown(): Promise<void> {
if (this.schedulerRegistry.doesExist('interval', 'outbox-relay')) {
this.schedulerRegistry.deleteInterval('outbox-relay');
}
if (this.runningJob) {
await Promise.race([this.runningJob, timeout(25_000)]);
}
}
}
Что происходит при SIGTERM:
- NestJS вызывает
beforeApplicationShutdownу всех Injectable. deleteInterval— следующий тик больше не запустится.- Если
processBatch()выполняется прямо сейчас —await this.runningJobждёт её завершения (максимум 25 секунд). - Текущий пакет доходит до конца; следующий не начинается.
Почему нельзя делать while(true) внутри @Interval
Соблазнительно написать так:
@Interval('relay', 100)
async relay(): Promise<void> {
while (true) {
await this.processBatch();
}
}
Проблема: такой цикл никогда не завершится. beforeApplicationShutdown зависнет на await this.runningJob до истечения таймаута, и пакет будет прерван посередине.
Правильный подход — короткие итерации с проверкой флага остановки:
@Interval('relay', 500)
async relay(): Promise<void> {
if (this.shutdownState.isDraining()) return;
this.runningJob = this.processBatch();
await this.runningJob;
this.runningJob = null;
}
После deleteInterval следующий тик не запустится. Текущий пакет завершается полностью.
Outbox-relay: транзакция и SKIP LOCKED
Outbox-relay читает события из базы и публикует их в Kafka. Для корректной остановки важно, чтобы текущий пакет завершился атомарно.
private async processBatch(): Promise<void> {
const client = await this.pool.connect();
try {
await client.query('BEGIN');
const { rows: batch } = await client.query<OutboxEvent>(
`SELECT id, topic, partition_key AS "partitionKey", payload
FROM order_outbox
WHERE published_at IS NULL
ORDER BY id
LIMIT 50
FOR UPDATE SKIP LOCKED`,
);
const publishedIds: string[] = [];
for (const event of batch) {
await this.producer.send(event.topic, event.partitionKey, event.payload);
publishedIds.push(event.id);
}
if (publishedIds.length > 0) {
await client.query(
`UPDATE order_outbox SET published_at = NOW() WHERE id = ANY($1::uuid[])`,
[publishedIds],
);
}
await client.query('COMMIT');
} catch (err) {
await client.query('ROLLBACK');
throw err;
} finally {
client.release();
}
}
FOR UPDATE SKIP LOCKED внутри явной транзакции (BEGIN/COMMIT) блокирует строки до COMMIT: другой под при параллельном SELECT пропустит их через SKIP LOCKED. Если сделать pool.query() без BEGIN, это autocommit — блокировка снимается сразу после SELECT, и другой под может взять те же строки.
BullMQ: ждём активные джобы, не обрываем
BullMQ предоставляет два варианта закрытия воркера:
worker.close()— мягкое закрытие: ждёт все активные джобы.worker.close(true)— немедленное прерывание: джоб остановлен посередине, состояние неизвестно.
Всегда используйте worker.close() без аргументов:
@Injectable()
export class PaymentWorkerService implements OnApplicationBootstrap, BeforeApplicationShutdown {
private worker!: Worker;
constructor(
private readonly shutdownState: ShutdownStateService,
private readonly paymentService: PaymentService,
) {}
onApplicationBootstrap(): void {
this.worker = new Worker(
'payment-jobs',
async (job: Job<PaymentJobData>) => {
if (this.shutdownState.isDraining()) {
await job.moveToFailed(new Error('draining'), job.token ?? '');
return;
}
await this.paymentService.processCharge(job.data);
},
{ connection: redisConnection, concurrency: 5 },
);
}
async beforeApplicationShutdown(): Promise<void> {
await Promise.race([
this.worker.close(),
timeout(20_000),
]);
}
}
Если приложение начало останавливаться (isDraining()), новый джоб сразу переводится в состояние Failed — его заберёт другой под. Активные джобы дорабатывают до конца.
Идемпотентность джобов
BullMQ может повторить джоб после перезапуска. Если джоб списывает деньги — без идемпотентного ключа возникнет двойное списание. Поле idempotencyKey в данных джоба обязательно:
interface PaymentJobData {
customerId: string;
orderId: string;
amountKopecks: number;
idempotencyKey: string;
}
Сервис платежей проверяет этот ключ перед выполнением — повторный вызов с тем же ключом вернёт результат без повторного списания.
Долгий процесс: AbortSignal как флаг остановки
Иногда задача перебирает большой список записей — синхронизация каталога, миграция данных. Если не реагировать на сигнал остановки, задача продолжит работу до таймаута.
@Injectable()
export class ProductSyncService implements BeforeApplicationShutdown {
private abortController = new AbortController();
async beforeApplicationShutdown(): Promise<void> {
this.abortController.abort();
}
async syncProductCatalog(): Promise<void> {
const products = await this.fetchExternalProducts();
for (const product of products) {
if (this.abortController.signal.aborted) break;
await this.productRepository.upsert(product);
await this.searchIndexer.index(product);
}
}
}
При SIGTERM: abort() выставлен, следующая итерация цикла прерывается. Новый под продолжит с первой необработанной записи.
Бюджет времени
Kubernetes даёт 60 секунд на полную остановку пода (по умолчанию). Фазы перекрываются:
SIGTERM
│
├─ preStop sleep 10s (дрейнаж endpoint'ов k8s)
├─ HTTP drain ≤ 25s (server.close, дожидаемся запросов)
├─ Scheduler/BullMQ ≤ 20s (await runningJob / worker.close)
├─ Kafka consumer ≤ 20s (consumer.disconnect)
└─ pool.end() (после дренажа, в onApplicationShutdown)
──────
≤ 60s = terminationGracePeriodSeconds
HTTP drain и scheduler/BullMQ выполняются параллельно — реальное время не суммируется. Но каждая фаза не должна превышать своего лимита.
Частые ошибки
clearInterval без ожидания итерации. Задача убита посередине, транзакция откатилась, но Kafka-сообщение уже ушло. Нужно: deleteInterval плюс await runningJob.
worker.close(true) вместо worker.close(). Немедленное прерывание оставляет джоб в неизвестном состоянии. Используйте мягкое закрытие.
while (true) внутри @Interval-метода. Shutdown зависнет на таймауте, пакет прерван посередине. Используйте короткие итерации с проверкой isDraining().
pool.end() в beforeApplicationShutdown. База закрывается до того, как HTTP-запросы дренируются — активные транзакции обрываются. Закрывайте пул в onApplicationShutdown, который вызывается после beforeApplicationShutdown.
Коротко
@nestjs/scheduleне ждёт текущую итерацию сам — нужно хранить Promise и ожидать его вbeforeApplicationShutdown.SchedulerRegistry.deleteInterval()отменяет следующий тик, но не прерывает текущий.- BullMQ:
worker.close()ждёт активные джобы;worker.close(true)— немедленное прерывание, использовать нельзя. - Outbox-relay завершает текущий пакет атомарно через
FOR UPDATE SKIP LOCKED; следующий не начинает. - Цикл в relay-методе проверяет
isDraining()— неwhile (true). - Долгие процессы реагируют на
AbortSignalи прерываются по контрольным точкам. - Джобы с денежными операциями требуют
idempotencyKey— повтор не должен давать двойное действие. - Общий бюджет остановки — 60 секунд; пул базы закрывается последним, в
onApplicationShutdown.
Что почитать дальше
- HTTP drain в NestJS —
server.close(),closeIdleConnections(), preStop. - БД и persistence в NestJS —
pool.end()в правильной фазе. - Kafka shutdown в NestJS —
consumer.disconnect()с таймаутом. - Kubernetes и поды —
terminationGracePeriodSeconds, preStop, пробы.