Опирается на правила: R-SHUT-SCHED-1R-SHUT-SCHED-3 и R-SHUT-SCHED-X1 из Graceful Shutdown Style Guide → раздел 5. Фоновые задачи / очереди / outbox.

Важно знать

  • @nestjs/schedule-интервал завершает текущую итерацию — хранить in-flight Promise и await его на shutdown (~25s).
  • SchedulerRegistry.deleteInterval() останавливает следующий тик, но не прерывает уже запущенную итерацию.
  • BullMQawait worker.close() (без force) ждёт активные джобы; worker.close(true) — антипаттерн (R-SHUT-SCHED-X1).
  • Долгий async-cascade — реагировать на shutdown-сигнал (AbortSignal/isDraining()): дожать критичную секцию, не начинать следующую.
  • Outbox-relay завершает текущий batch (атомарно через FOR UPDATE SKIP LOCKED), не начинает новый; цикл читает shutdownState.isDraining(), не while (true).
  • clearInterval без await in-flight Promise — задача брошена посреди итерации, частичные изменения без rollback (inconsistent state).
  • Total budget 60s — preStop 10s + HTTP drain ≤25s + фоновые задачи ≤20s + Kafka ≤15s ≤ 60s.

Фоновые задачи (@nestjs/schedule, BullMQ, outbox-relay) — место, где graceful shutdown ломается незаметно. Если задача убита на середине, транзакция откатывается, но side-effect (Kafka-send, HTTP-вызов) уже произошёл — это рассогласование. UCP формулирует: дать каждой задаче завершить текущую итерацию, следующую не начинать, force-shutdown только в крайнем случае.

@nestjs/schedule дожимает итерацию

R-SHUT-SCHED-1: @nestjs/schedule не предоставляет встроенного await in-flight. Нужно хранить Promise самостоятельно:

@Injectable()
export class OutboxRelayService implements BeforeApplicationShutdown {
  private runningJob: Promise<void> | null = null;

  constructor(
    private readonly schedulerRegistry: SchedulerRegistry,
    private readonly shutdownState: ShutdownStateService,
    private readonly outboxRepository: OutboxRepository,
    private readonly producer: KafkaProducerService,
  ) {}

  @Interval('outbox-relay', 500)
  async relay(): Promise<void> {
    if (this.shutdownState.isDraining()) return;

    this.runningJob = this.processBatch();
    await this.runningJob;
    this.runningJob = null;
  }

  async beforeApplicationShutdown(): Promise<void> {
    if (this.schedulerRegistry.doesExist('interval', 'outbox-relay')) {
      this.schedulerRegistry.deleteInterval('outbox-relay');
    }
    if (this.runningJob) {
      await Promise.race([this.runningJob, timeout(25_000)]);
    }
  }

  private async processBatch(): Promise<void> {
    const batch = await this.outboxRepository.fetchUnpublished(50);
    for (const event of batch) {
      await this.producer.send(event.topic, event.partitionKey, event.payload);
      await this.outboxRepository.markPublished(event.id);
    }
  }
}

Что происходит на SIGTERM:

  1. NestJS вызывает beforeApplicationShutdown всех Injectable.
  2. deleteInterval — тик больше не запускается.
  3. Если processBatch() выполняется прямо сейчас — await this.runningJob дожидается её завершения (или 25s timeout).
  4. Текущий batch доведён до конца; следующий не начат.

Запрос от сервиса Order (domain-пример):

@Injectable()
export class OrderOutboxRepository {
  constructor(private readonly pool: Pool) {}

  async fetchUnpublished(limit: number): Promise<OutboxEvent[]> {
    const { rows } = await this.pool.query<OutboxEvent>(
      `SELECT id, topic, partition_key, payload
         FROM order_outbox
        WHERE published_at IS NULL
        ORDER BY id
        LIMIT $1
          FOR UPDATE SKIP LOCKED`,
      [limit],
    );
    return rows;
  }

  async markPublished(id: string): Promise<void> {
    await this.pool.query(
      `UPDATE order_outbox SET published_at = NOW() WHERE id = $1`,
      [id],
    );
  }
}

FOR UPDATE SKIP LOCKED — атомарность: другой pod или новый pod после деплоя подхватывает разблокированные строки.

BullMQ — worker.close() без force

R-SHUT-SCHED-1, R-SHUT-SCHED-X1: BullMQ-worker ждёт активные джобы при корректном закрытии.

@Injectable()
export class PaymentWorkerService implements OnApplicationBootstrap, BeforeApplicationShutdown {
  private worker!: Worker;

  constructor(
    private readonly shutdownState: ShutdownStateService,
    private readonly paymentService: PaymentService,
  ) {}

  onApplicationBootstrap(): void {
    this.worker = new Worker(
      'payment-jobs',
      async (job: Job<PaymentJobData>) => {
        if (this.shutdownState.isDraining()) {
          await job.moveToFailed(new Error('draining'), job.token ?? '');
          return;
        }
        await this.paymentService.processCharge(job.data);
      },
      { connection: redisConnection, concurrency: 5 },
    );
  }

  async beforeApplicationShutdown(): Promise<void> {
    await Promise.race([
      this.worker.close(),
      timeout(20_000),
    ]);
  }
}

worker.close() — graceful: ждёт активные джобы. worker.close(true) — force: обрывает немедленно, partial state.

Джоб payment по домену Customer:

interface PaymentJobData {
  customerId: string;
  orderId: string;
  amountKopecks: number;
  idempotencyKey: string;
}

idempotencyKey обязателен (R-SHUT-IDEM-1) — SIGTERM в момент повтора не должен давать двойное списание.

Долгий async-cascade с AbortSignal

R-SHUT-SCHED-2: если cascade не помещается в budget, разбить на checkpoint'ы и реагировать на сигнал остановки.

@Injectable()
export class ProductSyncService implements BeforeApplicationShutdown {
  private abortController = new AbortController();

  async beforeApplicationShutdown(): Promise<void> {
    this.abortController.abort();
  }

  async syncProductCatalog(): Promise<void> {
    const products = await this.fetchExternalProducts();

    for (const product of products) {
      if (this.abortController.signal.aborted) break;

      await this.productRepository.upsert(product);
      await this.searchIndexer.index(product);
    }
  }
}

При SIGTERM: abort() выставлен, следующая итерация цикла break-ается, транзакция коммитится по уже обработанным записям. Новый pod продолжит с первой необработанной.

Outbox-relay: только isDraining(), не while (true)

R-SHUT-SCHED-3: цикл проверяет draining-флаг.

// НЕЛЬЗЯ
@Interval('relay', 100)
async relay(): Promise<void> {
  while (true) {
    await this.processBatch();
  }
}

Такой цикл не завершается — beforeApplicationShutdown зависнет на await this.runningJob, пока не сработает timeout. Batch прерван посередине.

// ПРАВИЛЬНО — короткие итерации, флаг draining
@Interval('relay', 500)
async relay(): Promise<void> {
  if (this.shutdownState.isDraining()) return;
  this.runningJob = this.processBatch();
  await this.runningJob;
  this.runningJob = null;
}

Следующий тик не запускается после deleteInterval. Текущий batch завершается полностью.

Полная картина бюджета

SIGTERM
  │
  ├─ preStop sleep 10s          (k8s endpoint draining)
  ├─ HTTP drain         ≤ 25s   (app.close → server.close → in-flight)
  ├─ Scheduler/BullMQ   ≤ 20s   (await runningJob / worker.close)
  ├─ Kafka consumer     ≤ 15s   (consumer.disconnect)
  └─ pool.end()                 (onApplicationShutdown — после дренажа)
                        ──────
                        ≤ 60s = terminationGracePeriodSeconds

Overlapping-фазы: HTTP drain и scheduler/BullMQ идут параллельно, поэтому реальный budget не суммируется линейно. Но каждая фаза не должна превышать своего лимита.

Что запрещено

АнтипаттернПравилоЧто взамен
clearInterval без await in-flight PromiseR-SHUT-SCHED-X1deleteInterval + await runningJob
worker.close(true) (force)R-SHUT-SCHED-X1worker.close() (graceful)
while (true) в @Interval-методеR-SHUT-SCHED-3короткие итерации с draining-проверкой
isDraining() не проверяется в relayR-SHUT-SCHED-3if (shutdownState.isDraining()) return
Долгий cascade без AbortSignal/флагаR-SHUT-SCHED-2checkpoint-петля с signal.aborted
BullMQ-джоб без idempotencyKeyR-SHUT-IDEM-1обязательное поле в JobData
pool.end() в beforeApplicationShutdownR-SHUT-DB-X1onApplicationShutdown (после дренажа)

Куда дальше

  • Бюджеты и observability — метрика app_shutdown_duration_seconds, total 60s.
  • БД и persistence — pool.end() в правильной фазе.
  • HTTP drain — server.close(), closeIdleConnections(), preStop.
  • Идемпотентность in-flight — retry-safety на interrupt.
  • Runtime/конфигурация NestJSenableShutdownHooks, force-deadline, readiness→503.
  • Kafka shutdown — consumer.disconnect() с таймаутом в beforeApplicationShutdown.
  • Kubernetes — terminationGracePeriodSeconds: 60, preStop, probes.