← назад к разделу

Когда NestJS получает SIGTERM, он запускает штатное завершение: дренирует HTTP-сервер, выполняет хуки beforeApplicationShutdown и onApplicationShutdown, и через 60 секунд процесс всё равно убивают принудительно. Если в этот момент в работе была операция — она прерывается на полпути.

Проблема не в самом прерывании, а в том, что при перезапуске pod'а операция запустится заново: Kubernetes перезапустит pod, клиент повторит запрос, Kafka предложит то же сообщение снова. Если операция не готова к повторному запуску — возникает двойной эффект: два заказа, двойное списание, задублированное письмо.

Идемпотентность — это свойство операции давать один и тот же результат при любом количестве повторных запусков. Реализуется по-разному в зависимости от контекста: HTTP, Kafka или outbox.

HTTP POST и Idempotency-Key

HTTP GET и DELETE идемпотентны сами по себе. POST — нет: каждый вызов создаёт новую сущность. Чтобы защитить POST от дублей, клиент отправляет заголовок Idempotency-Key — уникальный идентификатор конкретного бизнес-намерения. Сервер запоминает ответ по этому ключу и при повторном запросе возвращает сохранённый результат, не запуская логику заново.

Схема работы:

Client → POST /orders  Idempotency-Key: order-uuid-abc
         Guard: ключа нет в базе → пропускаем запрос
         OrderService.create() → вставляем заказ
         Сохраняем ответ в idempotency_record
         [SIGTERM: процесс убит, ответ не дошёл до клиента]

Client → retry: POST /orders  Idempotency-Key: order-uuid-abc
         Guard: ключ уже есть → возвращаем сохранённый ответ
         Заказ не создаётся повторно

Guard проверяет ключ до бизнес-логики:

@Injectable()
export class IdempotencyGuard implements CanActivate {
  constructor(private readonly db: Pool) {}

  async canActivate(ctx: ExecutionContext): Promise<boolean> {
    const req = ctx.switchToHttp().getRequest<Request>();
    const key = req.headers['idempotency-key'] as string | undefined;
    if (!key) throw new BadRequestException('Idempotency-Key required');

    const existing = await this.db.query<{ response_body: unknown }>(
      'SELECT response_body FROM idempotency_record WHERE key = $1',
      [key],
    );
    if (existing.rows.length > 0) {
      ctx.switchToHttp().getResponse().json(existing.rows[0].response_body);
      return false;
    }
    req['idempotencyKey'] = key;
    return true;
  }
}

После успешной обработки контроллер сохраняет ответ:

@Post('/orders')
@UseGuards(IdempotencyGuard)
async create(
  @Headers('idempotency-key') key: string,
  @Body() dto: CreateOrderDto,
): Promise<OrderResponse> {
  const order = await this.orderService.create(dto);
  await this.db.query(
    'INSERT INTO idempotency_record(key, response_body) VALUES($1, $2) ON CONFLICT DO NOTHING',
    [key, order],
  );
  return order;
}

Без Idempotency-Key каждый повторный запрос при retry создаёт новую запись — для одного и того же клиентского действия.

Исходящий HTTP с retry

Та же проблема возникает, когда NestJS сам вызывает внешний сервис с автоматическим retry. Если SIGTERM приходит во время такого вызова, два запроса могут дойти до провайдера — и оба обработаются:

// Опасно: при retry provider получит два одинаковых запроса
async chargeCustomer(orderId: string, amount: Money): Promise<Receipt> {
  return this.httpService.axiosRef.post(
    `${this.paymentUrl}/charge`,
    { orderId, amount },
  ).then(r => r.data);
}

Правильно — передавать ключ идемпотентности, сгенерированный один раз на бизнес-операцию, через все попытки:

async chargeCustomer(idempotencyKey: string, orderId: string, amount: Money): Promise<Receipt> {
  return this.httpService.axiosRef.post(
    `${this.paymentUrl}/charge`,
    { orderId, amount },
    { headers: { 'Idempotency-Key': idempotencyKey } },
  ).then(r => r.data);
}

idempotencyKey в этом случае — например, event.eventId из Kafka: он неизменен при retry и однозначно идентифицирует бизнес-операцию.

kafkajs: processed_event в той же транзакции

Kafka гарантирует доставку минимум один раз. При перезапуске consumer может получить то же сообщение снова — особенно если offset не успел закоммититься до SIGTERM. Защита — таблица уже обработанных событий processed_event.

Ключевое правило: вставка в processed_event и side-effect (изменение данных) должны выполняться в одной транзакции. Если транзакция не закоммитилась — при повторном запуске всё повторится корректно. Если закоммитилась — повторная вставка даст конфликт по уникальному индексу, и обработчик вернётся раньше без дублирования.

Handler: получил event_id=XYZ
         BEGIN TRANSACTION
           INSERT processed_event(event_id='XYZ', handler='billing')  -- уникальный индекс
           UPDATE order SET status='BILLED' WHERE id=...
         COMMIT
         resolveOffset → commitOffsetsIfNecessary
         [SIGTERM: транзакция уже закоммичена]

Restart: Handler получил event_id=XYZ снова
         BEGIN TRANSACTION
           INSERT processed_event(event_id='XYZ', handler='billing')
           → нарушение уникальности → ROLLBACK
         Ранний return, resolveOffset → commit
         Дубля нет

Реализация:

@Injectable()
export class OrderBillingConsumer implements OnApplicationBootstrap, BeforeApplicationShutdown {
  private consumer!: Consumer;

  constructor(
    private readonly kafka: Kafka,
    private readonly db: Pool,
  ) {}

  async onApplicationBootstrap(): Promise<void> {
    this.consumer = this.kafka.consumer({ groupId: 'billing-svc' });
    await this.consumer.connect();
    await this.consumer.subscribe({ topic: 'orders.confirmed', fromBeginning: false });

    await this.consumer.run({
      eachBatch: async ({ batch, resolveOffset, commitOffsetsIfNecessary, isRunning }) => {
        for (const message of batch.messages) {
          if (!isRunning()) break;

          const event: OrderConfirmedEvent = JSON.parse(message.value!.toString());
          const client = await this.db.connect();
          try {
            await client.query('BEGIN');
            const dedup = await client.query(
              'INSERT INTO processed_event(event_id, handler) VALUES($1, $2) ON CONFLICT DO NOTHING RETURNING event_id',
              [event.eventId, 'billing'],
            );
            if (dedup.rowCount === 0) {
              await client.query('ROLLBACK');
            } else {
              await client.query(
                'UPDATE "order" SET status = $1 WHERE id = $2',
                ['BILLED', event.orderId],
              );
              await client.query('COMMIT');
            }
          } catch (err) {
            await client.query('ROLLBACK');
            throw err;
          } finally {
            client.release();
          }

          resolveOffset(message.offset);
          await commitOffsetsIfNecessary();
        }
      },
    });
  }

  async beforeApplicationShutdown(): Promise<void> {
    await Promise.race([
      this.consumer.disconnect(),
      new Promise((_, reject) => setTimeout(() => reject(new Error('kafka timeout')), 20_000).unref()),
    ]);
  }
}

Частая ошибка — вызывать resolveOffset и commitOffsetsIfNecessary до завершения транзакции:

// Неправильно: offset закоммичен до side-effect
for (const message of batch.messages) {
  resolveOffset(message.offset);
  await commitOffsetsIfNecessary();   // offset ушёл вперёд
  await this.processEvent(event);     // может не завершиться при SIGTERM
}

При SIGTERM после commit offset'а, но до конца processEvent — side-effect потерян без возможности replay.

Outbox-relay: двухфазный статус

Outbox-паттерн означает, что сервис сначала записывает события в таблицу outbox_event, а отдельный relay-процесс отправляет их в Kafka. Проблема возникает, если SIGTERM приходит между отправкой в Kafka и пометкой события как PUBLISHED:

Relay: SELECT ... WHERE status='PENDING' FOR UPDATE SKIP LOCKED
       producer.send(...)
       [SIGTERM — UPDATE до PUBLISHED не выполнился]

Restart: SELECT ... WHERE status='PENDING' ...
         Тот же event → второй send → дубль в Kafka

Решение — промежуточный статус PUBLISHING. Событие переходит в него до отправки; при SIGTERM между фазами оно остаётся в PUBLISHING и не попадает в следующую выборку PENDING. Отдельный cleanup-job раз в час возвращает зависшие PUBLISHING обратно в PENDING:

@Injectable()
export class OutboxRelayService implements BeforeApplicationShutdown {
  private running = false;
  private currentBatch: Promise<void> = Promise.resolve();

  constructor(
    private readonly db: Pool,
    private readonly producer: Producer,
  ) {}

  @Interval(1_000)
  async relay(): Promise<void> {
    if (this.running) return;
    this.running = true;
    this.currentBatch = this.processBatch().finally(() => { this.running = false; });
    await this.currentBatch;
  }

  private async processBatch(): Promise<void> {
    const client = await this.db.connect();
    try {
      // Фаза 1: забираем PENDING и помечаем PUBLISHING
      const { rows } = await client.query<{ id: string; topic: string; payload: unknown }>(
        `UPDATE outbox_event SET status='PUBLISHING', locked_at=now()
         WHERE id IN (
           SELECT id FROM outbox_event WHERE status='PENDING' LIMIT 20 FOR UPDATE SKIP LOCKED
         )
         RETURNING id, topic, payload`,
      );

      for (const row of rows) {
        // Фаза 2: отправляем в Kafka
        await this.producer.send({
          topic: row.topic,
          messages: [{ value: JSON.stringify(row.payload), headers: { eventId: row.id } }],
        });
        // Фаза 3: помечаем PUBLISHED
        await client.query(
          `UPDATE outbox_event SET status='PUBLISHED', published_at=now() WHERE id=$1`,
          [row.id],
        );
      }
    } finally {
      client.release();
    }
  }

  async beforeApplicationShutdown(): Promise<void> {
    await this.currentBatch;
  }
}

Если SIGTERM приходит между Фазой 2 и Фазой 3 — Kafka уже получила событие, но в базе оно в PUBLISHING. После cleanup оно попадёт в повторную отправку. Поэтому consumer-side всё равно должен делать дедупликацию через processed_event — тогда дубль в Kafka безопасен.

Альтернатива — не использовать статус PUBLISHING вообще, а сделать consumer-side деduplication обязательным. Тогда relay может отправлять дубли спокойно, и операционно это проще.

Коротко

  • Graceful shutdown даёт время завершить операцию, но не гарантирует это: при force-kill через 60 секунд операция прервётся, а при перезапуске выполнится снова.
  • HTTP POST: клиент отправляет Idempotency-Key, NestJS Guard проверяет ключ и возвращает закешированный ответ при повторном запросе.
  • Исходящий HTTP с retry: ключ идемпотентности генерируется один раз на бизнес-операцию и передаётся во все попытки — иначе provider обрабатывает все retry независимо.
  • kafkajs handler: processed_event вставляется в той же транзакции, что и side-effect; конфликт уникальности при повторе = ранний возврат без дублирования.
  • resolveOffset и commitOffsetsIfNecessary — только после коммита транзакции: иначе offset уходит вперёд и side-effect теряется без replay.
  • Outbox-relay: промежуточный статус PUBLISHING защищает от двойной отправки; зависшие записи возвращает в PENDING cleanup-job.
  • Идемпотентность нужна на всей цепочке: клиент → NestJS-handler → downstream-сервис.

Что почитать дальше

  • Конфигурация graceful shutdown NestJS — enableShutdownHooks, force-deadline, readiness-флаг.
  • HTTP drain — server.close(), closeIdleConnections(), preStop sleep.
  • Kafka shutdown — consumer.disconnect() с таймаутом, семантика eachBatch.
  • БД и persistence — порядок pool.end() / dataSource.destroy() в onApplicationShutdown.
  • Бюджеты и observability — как измерить фактическую длительность shutdown.