Опирается на правила: R-SHUT-IDEM-1 и R-SHUT-IDEM-X1 из Graceful Shutdown Style Guide → раздел 7. Идемпотентность in-flight.

Важно знать

  • Операции, которые SIGTERM может прервать, обязаны быть retry-safe (R-SHUT-IDEM-1).
  • HTTP POST — Idempotency-Key обязателен (AUTH-19); проверка через NestJS Guard перед бизнес-логикой.
  • kafkajs handler — processed_event(event_id) вставляется в той же транзакции, что и side-effect.
  • outbox-relay — либо двух-фаза PENDING → PUBLISHING → PUBLISHED, либо receiver-side dedup через processed_event.
  • money-cascade без Idempotency-Key под retry — при SIGTERM в момент второго attempt новый pod спишет повторно.
  • Graceful shutdown даёт шанс завершить операцию, но не гарантирует отсутствие partial при force-kill.
  • Идемпотентность — последняя линия защиты, когда graceful не успел уложиться в бюджет 60s.
  • beforeApplicationShutdown и onApplicationShutdown не заменяют retry-safe дизайн: они снижают вероятность прерывания, но не исключают её.

Graceful shutdown в NestJS (app.enableShutdownHooks()) даёт операциям время завершиться: beforeApplicationShutdown → drain HTTP-сервера → onApplicationShutdown. Но бюджет — 60 секунд. Долгий cascade (kafkajs handler → HTTP-вызов к payment с retry × 3 × 10s) может не уложиться. Force-kill прерывает посередине. Если операция не идемпотентна — partial state → инцидент.

Три типа in-flight операций

R-SHUT-IDEM-1 — разные защиты для разных контекстов.

1. HTTP POST

Client → POST /orders (Idempotency-Key: order-uuid-abc)
         NestJS: Guard проверяет ключ, сохраняет idempotency_record
                 OrderService.create() → INSERT order, status=CONFIRMED
         [SIGTERM посередине, response не отправлен]

Client → retry: POST /orders (Idempotency-Key: order-uuid-abc)
         NestJS (новый pod): Guard находит idempotency_record, возвращает кешированный response
         Дубля нет

Guard на базе pg (node-postgres) или TypeORM:

@Injectable()
export class IdempotencyGuard implements CanActivate {
  constructor(private readonly db: Pool) {}

  async canActivate(ctx: ExecutionContext): Promise<boolean> {
    const req = ctx.switchToHttp().getRequest<Request>();
    const key = req.headers['idempotency-key'] as string | undefined;
    if (!key) throw new BadRequestException('Idempotency-Key required');

    const existing = await this.db.query<{ response_body: unknown }>(
      'SELECT response_body FROM idempotency_record WHERE key = $1',
      [key],
    );
    if (existing.rows.length > 0) {
      ctx.switchToHttp().getResponse().json(existing.rows[0].response_body);
      return false;
    }
    req['idempotencyKey'] = key;
    return true;
  }
}

После успешной обработки контроллер сохраняет ответ:

@Post('/orders')
@UseGuards(IdempotencyGuard)
async create(
  @Headers('idempotency-key') key: string,
  @Body() dto: CreateOrderDto,
): Promise<OrderResponse> {
  const order = await this.orderService.create(dto);
  await this.db.query(
    'INSERT INTO idempotency_record(key, response_body) VALUES($1, $2) ON CONFLICT DO NOTHING',
    [key, order],
  );
  return order;
}

Без Idempotency-Key — client retry создаёт второй заказ для того же Customer.

2. kafkajs handler

Handler: получил event_id=XYZ (orders.confirmed)
         BEGIN TRANSACTION
           INSERT processed_event(event_id='XYZ', handler='billing')
           UPDATE order SET status='BILLED' WHERE id=...   ← side-effect
         COMMIT
         resolveOffset(message.offset)
         commitOffsetsIfNecessary()
         [SIGTERM здесь — транзакция уже закоммичена, offset коммитится]

Restart:  Handler получил event_id=XYZ снова (offset не был закоммичен до SIGTERM)
          BEGIN TRANSACTION
            INSERT processed_event(event_id='XYZ', handler='billing')
            → UNIQUE violation → rollback
          Ранний return, resolveOffset → commitOffsetsIfNecessary
          Дубля нет

Реализация через pg в beforeApplicationShutdown-безопасном @Injectable():

@Injectable()
export class OrderBillingConsumer implements OnApplicationBootstrap, BeforeApplicationShutdown {
  private consumer!: Consumer;

  constructor(
    private readonly kafka: Kafka,
    private readonly db: Pool,
    private readonly shutdownState: ShutdownStateService,
  ) {}

  async onApplicationBootstrap(): Promise<void> {
    this.consumer = this.kafka.consumer({ groupId: 'billing-svc' });
    await this.consumer.connect();
    await this.consumer.subscribe({ topic: 'orders.confirmed', fromBeginning: false });

    await this.consumer.run({
      eachBatch: async ({ batch, resolveOffset, commitOffsetsIfNecessary, isStale }) => {
        for (const message of batch.messages) {
          if (isStale()) break;

          const event: OrderConfirmedEvent = JSON.parse(message.value!.toString());

          const client = await this.db.connect();
          try {
            await client.query('BEGIN');
            const dedup = await client.query(
              'INSERT INTO processed_event(event_id, handler) VALUES($1, $2) ON CONFLICT DO NOTHING RETURNING event_id',
              [event.eventId, 'billing'],
            );
            if (dedup.rowCount === 0) {
              await client.query('ROLLBACK');
            } else {
              await client.query(
                'UPDATE "order" SET status = $1 WHERE id = $2',
                ['BILLED', event.orderId],
              );
              await client.query('COMMIT');
            }
          } catch (err) {
            await client.query('ROLLBACK');
            throw err;
          } finally {
            client.release();
          }

          resolveOffset(message.offset);
          await commitOffsetsIfNecessary();
        }
      },
    });
  }

  async beforeApplicationShutdown(): Promise<void> {
    await Promise.race([
      this.consumer.disconnect(),
      new Promise((_, reject) => setTimeout(() => reject(new Error('kafka timeout')), 20_000).unref()),
    ]);
  }
}

processed_event в той же транзакции — атомарно. Если SIGTERM прерывает до COMMIT — offset не закоммичен, replay безопасен.

3. Outbox-relay

Relay: SELECT * FROM outbox_event WHERE status='PENDING' LIMIT 50 FOR UPDATE SKIP LOCKED
       Для каждого:
         producer.send({ topic, messages: [{ value: payload, headers: { eventId } }] })
         UPDATE outbox_event SET status='PUBLISHED', published_at=now() WHERE id=...
         [SIGTERM между send и UPDATE]

Restart: SELECT * FROM outbox_event WHERE status='PENDING' LIMIT 50 ...
         Тот же event → второй send
         kafkajs consumer получает дубль eventId=XYZ

Двух-фаза через статус PUBLISHING:

@Injectable()
export class OutboxRelayService implements BeforeApplicationShutdown {
  private running = false;
  private currentBatch: Promise<void> = Promise.resolve();

  constructor(
    private readonly db: Pool,
    private readonly producer: Producer,
    private readonly shutdownState: ShutdownStateService,
  ) {}

  @Interval(1_000)
  async relay(): Promise<void> {
    if (this.shutdownState.isDraining() || this.running) return;
    this.running = true;
    this.currentBatch = this.processBatch().finally(() => { this.running = false; });
    await this.currentBatch;
  }

  private async processBatch(): Promise<void> {
    const client = await this.db.connect();
    try {
      // Phase 1: lock + mark PUBLISHING
      const { rows } = await client.query<{ id: string; topic: string; payload: unknown }>(
        `UPDATE outbox_event SET status='PUBLISHING', locked_at=now()
         WHERE id IN (
           SELECT id FROM outbox_event WHERE status='PENDING' LIMIT 20 FOR UPDATE SKIP LOCKED
         )
         RETURNING id, topic, payload`,
      );

      for (const row of rows) {
        // Phase 2: send
        await this.producer.send({
          topic: row.topic,
          messages: [{ value: JSON.stringify(row.payload), headers: { eventId: row.id } }],
        });
        // Phase 3: mark PUBLISHED
        await client.query(
          `UPDATE outbox_event SET status='PUBLISHED', published_at=now() WHERE id=$1`,
          [row.id],
        );
      }
    } finally {
      client.release();
    }
  }

  async beforeApplicationShutdown(): Promise<void> {
    await this.currentBatch;
  }
}

Если SIGTERM между Phase 2 и Phase 3 — rows остаются в PUBLISHING. Cleanup-job раз в час возвращает их в PENDING. Retry безопасен только если consumer-side делает dedup через processed_event.

Альтернатива — всегда иметь receiver-side dedup (см. секцию 2). Тогда relay может публиковать дубли, consumer их игнорирует. Операционно проще, небольшой overhead в Kafka.

Граничные случаи

Money-HTTP без Idempotency-Key под retry

// НЕЛЬЗЯ — нет Idempotency-Key
async chargeCustomer(orderId: string, amount: Money): Promise<Receipt> {
  return this.httpService.axiosRef.post(
    `${this.paymentUrl}/charge`,
    { orderId, amount },
  ).then(r => r.data);
}

При shutdown в момент вызова:

  1. POST /charge уходит к payment-provider, network timeout.
  2. @nestjs/axios retry — второй attempt.
  3. Оба POST дошли до provider (network timeout ≠ «запрос не отправлен»).
  4. Provider без дедупа обрабатывает оба → двойное списание для Customer.

Корректно — Idempotency-Key генерируется один раз на бизнес-операцию:

async chargeCustomer(idempotencyKey: string, orderId: string, amount: Money): Promise<Receipt> {
  return this.httpService.axiosRef.post(
    `${this.paymentUrl}/charge`,
    { orderId, amount },
    { headers: { 'Idempotency-Key': idempotencyKey } },
  ).then(r => r.data);
}

idempotencyKey — например event.eventId из Kafka, передаётся через все retry.

kafkajs offset commit до side-effect

// НЕЛЬЗЯ — resolveOffset до завершения side-effect
eachBatch: async ({ batch, resolveOffset, commitOffsetsIfNecessary }) => {
  for (const message of batch.messages) {
    const event = JSON.parse(message.value!.toString());
    resolveOffset(message.offset);           // offset помечен
    await commitOffsetsIfNecessary();        // offset закоммичен
    await this.processEvent(event);          // side-effect — может не завершиться
  }
},

При SIGTERM после commitOffsetsIfNecessary но до завершения processEvent — offset ушёл вперёд, replay не произойдёт, side-effect потерян.

Правильно: resolveOffset и commit — только после того, как транзакция с processed_event и side-effect закоммичена (см. секцию 2).

ShutdownStateService не подключён к /health/ready

// НЕЛЬЗЯ — разрозненный флаг, k8s не узнает о draining
let shuttingDown = false;
process.on('SIGTERM', () => { shuttingDown = true; });

Readiness-проба не видит этот флаг, k8s продолжает слать трафик на умирающий pod. Правильно — единственный ShutdownStateService, реализующий BeforeApplicationShutdown, который terminus-проба читает через isDraining() (R-SHUT-CFG-3).

Что запрещено

АнтипаттернПравилоЧто взамен
Money-HTTP без Idempotency-Key с retryR-SHUT-IDEM-X1header обязателен, генерировать один раз на операцию
resolveOffset до завершения side-effectR-SHUT-IDEM-1commit после транзакции с processed_event
outbox-relay без receiver-side dedup и без двух-фазыR-SHUT-IDEM-1PUBLISHING-статус или processed_event на consumer
let shuttingDown вместо ShutdownStateServiceR-SHUT-CFG-X1единственный сервис, завязанный на terminus
fire-and-forget handler без awaitR-SHUT-IDEM-1await обязателен, иначе offset обгоняет side-effect
ON CONFLICT DO NOTHING без проверки rowCountR-SHUT-IDEM-1проверять rowCount === 0 → ранний return
Идемпотентность только на одном сервисеR-SHUT-IDEM-1end-to-end: client + NestJS-handler + downstream

Куда дальше

  • Бюджеты и observability — как измерить фактическую длительность shutdown и не выйти за 60s.
  • БД и persistence — порядок pool.end() / dataSource.destroy() в onApplicationShutdown.
  • HTTP drain — server.close(), closeIdleConnections(), preStop sleep.
  • Конфигурация graceful shutdown NestJSenableShutdownHooks, force-deadline, readiness-флаг.
  • Kafka shutdown — consumer.disconnect() с таймаутом, eachBatch семантика.
  • Kubernetes — terminationGracePeriodSeconds, probes, maxUnavailable: 0.
  • Scheduled / async / outbox — outbox-relay с draining-флагом, BullMQ worker.close().