← назад к разделу

Когда Kubernetes посылает SIGTERM, NestJS начинает останавливаться. Для HTTP-запросов это выглядит аккуратно: сервер перестаёт принимать новые соединения и ждёт, пока старые завершатся. Но фоновые задачи — @nestjs/schedule, BullMQ, outbox-relay — не останавливаются сами по себе. Если убить их посередине, получим рассогласование: транзакция откатилась, а Kafka-сообщение уже ушло.

Правильный подход: дать каждой задаче завершить текущую итерацию, следующую не начинать.

Scheduler: в NestJS нет встроенного ожидания

@nestjs/schedule запускает методы по расписанию, но не предоставляет способа узнать, выполняется ли метод прямо сейчас. SchedulerRegistry.deleteInterval() просто отменяет следующий тик — если метод уже работает, он продолжит выполнение, и никто его не дождётся.

Решение: хранить Promise текущей итерации в поле класса и ждать его в хуке beforeApplicationShutdown.

@Injectable()
export class OutboxRelayService implements BeforeApplicationShutdown {
  private runningJob: Promise<void> | null = null;

  constructor(
    private readonly schedulerRegistry: SchedulerRegistry,
    private readonly shutdownState: ShutdownStateService,
    private readonly pool: Pool,
    private readonly producer: KafkaProducerService,
  ) {}

  @Interval('outbox-relay', 500)
  async relay(): Promise<void> {
    if (this.shutdownState.isDraining()) return;

    this.runningJob = this.processBatch();
    await this.runningJob;
    this.runningJob = null;
  }

  async beforeApplicationShutdown(): Promise<void> {
    if (this.schedulerRegistry.doesExist('interval', 'outbox-relay')) {
      this.schedulerRegistry.deleteInterval('outbox-relay');
    }
    if (this.runningJob) {
      await Promise.race([this.runningJob, timeout(25_000)]);
    }
  }
}

Что происходит при SIGTERM:

  1. NestJS вызывает beforeApplicationShutdown у всех Injectable.
  2. deleteInterval — следующий тик больше не запустится.
  3. Если processBatch() выполняется прямо сейчас — await this.runningJob ждёт её завершения (максимум 25 секунд).
  4. Текущий пакет доходит до конца; следующий не начинается.

Почему нельзя делать while(true) внутри @Interval

Соблазнительно написать так:

@Interval('relay', 100)
async relay(): Promise<void> {
  while (true) {
    await this.processBatch();
  }
}

Проблема: такой цикл никогда не завершится. beforeApplicationShutdown зависнет на await this.runningJob до истечения таймаута, и пакет будет прерван посередине.

Правильный подход — короткие итерации с проверкой флага остановки:

@Interval('relay', 500)
async relay(): Promise<void> {
  if (this.shutdownState.isDraining()) return;
  this.runningJob = this.processBatch();
  await this.runningJob;
  this.runningJob = null;
}

После deleteInterval следующий тик не запустится. Текущий пакет завершается полностью.

Outbox-relay: транзакция и SKIP LOCKED

Outbox-relay читает события из базы и публикует их в Kafka. Для корректной остановки важно, чтобы текущий пакет завершился атомарно.

private async processBatch(): Promise<void> {
  const client = await this.pool.connect();
  try {
    await client.query('BEGIN');
    const { rows: batch } = await client.query<OutboxEvent>(
      `SELECT id, topic, partition_key AS "partitionKey", payload
         FROM order_outbox
        WHERE published_at IS NULL
        ORDER BY id
        LIMIT 50
          FOR UPDATE SKIP LOCKED`,
    );
    const publishedIds: string[] = [];
    for (const event of batch) {
      await this.producer.send(event.topic, event.partitionKey, event.payload);
      publishedIds.push(event.id);
    }
    if (publishedIds.length > 0) {
      await client.query(
        `UPDATE order_outbox SET published_at = NOW() WHERE id = ANY($1::uuid[])`,
        [publishedIds],
      );
    }
    await client.query('COMMIT');
  } catch (err) {
    await client.query('ROLLBACK');
    throw err;
  } finally {
    client.release();
  }
}

FOR UPDATE SKIP LOCKED внутри явной транзакции (BEGIN/COMMIT) блокирует строки до COMMIT: другой под при параллельном SELECT пропустит их через SKIP LOCKED. Если сделать pool.query() без BEGIN, это autocommit — блокировка снимается сразу после SELECT, и другой под может взять те же строки.

BullMQ: ждём активные джобы, не обрываем

BullMQ предоставляет два варианта закрытия воркера:

  • worker.close() — мягкое закрытие: ждёт все активные джобы.
  • worker.close(true) — немедленное прерывание: джоб остановлен посередине, состояние неизвестно.

Всегда используйте worker.close() без аргументов:

@Injectable()
export class PaymentWorkerService implements OnApplicationBootstrap, BeforeApplicationShutdown {
  private worker!: Worker;

  constructor(
    private readonly shutdownState: ShutdownStateService,
    private readonly paymentService: PaymentService,
  ) {}

  onApplicationBootstrap(): void {
    this.worker = new Worker(
      'payment-jobs',
      async (job: Job<PaymentJobData>) => {
        if (this.shutdownState.isDraining()) {
          await job.moveToFailed(new Error('draining'), job.token ?? '');
          return;
        }
        await this.paymentService.processCharge(job.data);
      },
      { connection: redisConnection, concurrency: 5 },
    );
  }

  async beforeApplicationShutdown(): Promise<void> {
    await Promise.race([
      this.worker.close(),
      timeout(20_000),
    ]);
  }
}

Если приложение начало останавливаться (isDraining()), новый джоб сразу переводится в состояние Failed — его заберёт другой под. Активные джобы дорабатывают до конца.

Идемпотентность джобов

BullMQ может повторить джоб после перезапуска. Если джоб списывает деньги — без идемпотентного ключа возникнет двойное списание. Поле idempotencyKey в данных джоба обязательно:

interface PaymentJobData {
  customerId: string;
  orderId: string;
  amountKopecks: number;
  idempotencyKey: string;
}

Сервис платежей проверяет этот ключ перед выполнением — повторный вызов с тем же ключом вернёт результат без повторного списания.

Долгий процесс: AbortSignal как флаг остановки

Иногда задача перебирает большой список записей — синхронизация каталога, миграция данных. Если не реагировать на сигнал остановки, задача продолжит работу до таймаута.

@Injectable()
export class ProductSyncService implements BeforeApplicationShutdown {
  private abortController = new AbortController();

  async beforeApplicationShutdown(): Promise<void> {
    this.abortController.abort();
  }

  async syncProductCatalog(): Promise<void> {
    const products = await this.fetchExternalProducts();

    for (const product of products) {
      if (this.abortController.signal.aborted) break;

      await this.productRepository.upsert(product);
      await this.searchIndexer.index(product);
    }
  }
}

При SIGTERM: abort() выставлен, следующая итерация цикла прерывается. Новый под продолжит с первой необработанной записи.

Бюджет времени

Kubernetes даёт 60 секунд на полную остановку пода (по умолчанию). Фазы перекрываются:

SIGTERM
  │
  ├─ preStop sleep 10s        (дрейнаж endpoint'ов k8s)
  ├─ HTTP drain       ≤ 25s   (server.close, дожидаемся запросов)
  ├─ Scheduler/BullMQ ≤ 20s   (await runningJob / worker.close)
  ├─ Kafka consumer   ≤ 20s   (consumer.disconnect)
  └─ pool.end()               (после дренажа, в onApplicationShutdown)
                      ──────
                      ≤ 60s = terminationGracePeriodSeconds

HTTP drain и scheduler/BullMQ выполняются параллельно — реальное время не суммируется. Но каждая фаза не должна превышать своего лимита.

Частые ошибки

clearInterval без ожидания итерации. Задача убита посередине, транзакция откатилась, но Kafka-сообщение уже ушло. Нужно: deleteInterval плюс await runningJob.

worker.close(true) вместо worker.close(). Немедленное прерывание оставляет джоб в неизвестном состоянии. Используйте мягкое закрытие.

while (true) внутри @Interval-метода. Shutdown зависнет на таймауте, пакет прерван посередине. Используйте короткие итерации с проверкой isDraining().

pool.end() в beforeApplicationShutdown. База закрывается до того, как HTTP-запросы дренируются — активные транзакции обрываются. Закрывайте пул в onApplicationShutdown, который вызывается после beforeApplicationShutdown.

Коротко

  • @nestjs/schedule не ждёт текущую итерацию сам — нужно хранить Promise и ожидать его в beforeApplicationShutdown.
  • SchedulerRegistry.deleteInterval() отменяет следующий тик, но не прерывает текущий.
  • BullMQ: worker.close() ждёт активные джобы; worker.close(true) — немедленное прерывание, использовать нельзя.
  • Outbox-relay завершает текущий пакет атомарно через FOR UPDATE SKIP LOCKED; следующий не начинает.
  • Цикл в relay-методе проверяет isDraining() — не while (true).
  • Долгие процессы реагируют на AbortSignal и прерываются по контрольным точкам.
  • Джобы с денежными операциями требуют idempotencyKey — повтор не должен давать двойное действие.
  • Общий бюджет остановки — 60 секунд; пул базы закрывается последним, в onApplicationShutdown.

Что почитать дальше

  • HTTP drain в NestJS — server.close(), closeIdleConnections(), preStop.
  • БД и persistence в NestJS — pool.end() в правильной фазе.
  • Kafka shutdown в NestJS — consumer.disconnect() с таймаутом.
  • Kubernetes и поды — terminationGracePeriodSeconds, preStop, пробы.