← назад к разделу

Когда приложение получает сигнал SIGTERM, оно не должно обрывать соединения с базой данных прямо посреди работы. Незавершённая транзакция — это потеря данных или состояние, которое придётся чинить руками. Разберём, как правильно завершить работу с БД в NestJS.

Почему порядок закрытия имеет значение

Представьте: сервис получил SIGTERM и сразу закрыл пул соединений. В этот момент HTTP-хендлер ещё обрабатывает запрос с открытой транзакцией. Транзакция обрывается, данные не записаны — клиент получил ошибку, а в базе неизвестно что.

NestJS решает эту проблему через два жизненных цикла при остановке:

  • beforeApplicationShutdown — вызывается первым. Здесь нужно дождаться всех активных операций: HTTP-запросов, фоновых задач, очередей.
  • onApplicationShutdown — вызывается после. Только здесь закрывают пул соединений.

Правило простое: пул закрывается последним, после всего остального.

Закрытие пула pg

Реализуйте интерфейс OnApplicationShutdown в сервисе, который хранит пул:

import { Module, OnApplicationShutdown, Injectable } from '@nestjs/common';
import { Pool } from 'pg';

@Injectable()
export class DatabaseService implements OnApplicationShutdown {
  readonly pool: Pool;

  constructor() {
    this.pool = new Pool({
      connectionString: process.env.DATABASE_URL,
      max: 20,
      idleTimeoutMillis: 30_000,
      connectionTimeoutMillis: 5_000,
    });
  }

  async onApplicationShutdown(signal?: string): Promise<void> {
    this.pool.on('error', () => {});
    await this.pool.end();
  }
}

Строка this.pool.on('error', () => {}) перед pool.end() подавляет ошибки от соединений, которые закрываются в процессе — это ожидаемое поведение, не сбой.

TypeORM

Для TypeORM принцип тот же, только вместо pool.end() вызывают dataSource.destroy():

import { Injectable, OnApplicationShutdown } from '@nestjs/common';
import { InjectDataSource } from '@nestjs/typeorm';
import { DataSource } from 'typeorm';

@Injectable()
export class TypeOrmShutdownService implements OnApplicationShutdown {
  constructor(@InjectDataSource() private readonly dataSource: DataSource) {}

  async onApplicationShutdown(): Promise<void> {
    if (this.dataSource.isInitialized) {
      await this.dataSource.destroy();
    }
  }
}

Проверка isInitialized защищает от ошибки при повторном вызове: если DataSource уже закрыт, второй destroy() выбросит исключение.

Как дожидаться активных транзакций

Разные источники транзакций требуют разного подхода.

HTTP-хендлер с транзакцией

Хендлер открывает транзакцию, делает вставку в базу и коммитит:

@Injectable()
export class CreateOrderHandler {
  constructor(private readonly db: DatabaseService) {}

  async execute(customerId: string, amount: number): Promise<{ id: string }> {
    const client = await this.db.pool.connect();
    try {
      await client.query('BEGIN');
      const { rows } = await client.query<{ id: string }>(
        `INSERT INTO orders (customer_id, amount, status)
         VALUES ($1, $2, 'pending')
         RETURNING id`,
        [customerId, amount],
      );
      await client.query('COMMIT');
      return { id: rows[0].id };
    } catch (err) {
      await client.query('ROLLBACK');
      throw err;
    } finally {
      client.release();
    }
  }
}

Здесь ничего дополнительного делать не нужно. При получении SIGTERM NestJS вызывает app.close(), который закрывает HTTP-сервер через server.close(). Это значит: новые запросы не принимаются, а текущий запрос дожидается COMMIT или ROLLBACK и завершается. Только после этого Nest переходит к фазе onApplicationShutdown и закрывает пул.

Фоновый планировщик с транзакцией

Со планировщиком сложнее: он работает по таймеру и может быть в середине итерации в момент остановки. Нужно отслеживать текущую итерацию вручную:

@Injectable()
export class OutboxRelayService implements OnApplicationShutdown {
  private inflightPromise: Promise<void> | null = null;

  constructor(
    private readonly db: DatabaseService,
    private readonly shutdownState: ShutdownStateService,
  ) {}

  @Interval(5_000)
  async processOutboxBatch(): Promise<void> {
    if (this.shutdownState.isDraining()) return;

    const work = this._doProcessBatch();
    this.inflightPromise = work;
    await work;
    this.inflightPromise = null;
  }

  private async _doProcessBatch(): Promise<void> {
    const client = await this.db.pool.connect();
    try {
      await client.query('BEGIN');
      const { rows } = await client.query<{ id: string; payload: string }>(
        `SELECT id, payload FROM outbox
         WHERE dispatched_at IS NULL
         ORDER BY created_at
         FOR UPDATE SKIP LOCKED
         LIMIT 20`,
      );
      for (const event of rows) {
        await client.query(
          `UPDATE outbox SET dispatched_at = now() WHERE id = $1`,
          [event.id],
        );
      }
      await client.query('COMMIT');
    } catch (err) {
      await client.query('ROLLBACK');
      throw err;
    } finally {
      client.release();
    }
  }

  async beforeApplicationShutdown(): Promise<void> {
    if (this.inflightPromise) {
      await this.inflightPromise;
    }
  }
}

isDraining() проверяется перед стартом каждой новой итерации — уже начатую не прерываем. В beforeApplicationShutdown ждём текущую итерацию через inflightPromise. Пул закроется только после.

BullMQ-воркер с транзакцией

BullMQ предоставляет собственный механизм завершения — worker.close(). Он дожидается текущего джоба перед остановкой:

@Processor('product-index')
@Injectable()
export class ProductIndexWorker extends WorkerHost implements BeforeApplicationShutdown {
  constructor(private readonly db: DatabaseService) {
    super();
  }

  async process(job: { data: { productId: string } }): Promise<void> {
    const client = await this.db.pool.connect();
    try {
      await client.query('BEGIN');
      await client.query(
        `UPDATE products SET indexed_at = now() WHERE id = $1`,
        [job.data.productId],
      );
      await client.query('COMMIT');
    } catch (err) {
      await client.query('ROLLBACK');
      throw err;
    } finally {
      client.release();
    }
  }

  async beforeApplicationShutdown(): Promise<void> {
    await Promise.race([
      (this.worker as Worker).close(),
      new Promise<void>((resolve) => setTimeout(resolve, 20_000).unref()),
    ]);
  }
}

Promise.race с таймаутом в 20 секунд — защита от зависшего джоба. Если воркер не завершился за 20 секунд, beforeApplicationShutdown всё равно возвращает управление Nest, который продолжает остановку. Незавершённая транзакция получит ROLLBACK от pg при разрыве соединения.

Миграции запускают только при старте

Миграции применяются один раз — до того как сервис начинает принимать запросы:

async function bootstrap(): Promise<void> {
  const app = await NestFactory.create(AppModule);
  app.enableShutdownHooks();

  const dataSource = app.get(DataSource);
  await dataSource.runMigrations();

  await app.listen(3000);
}
bootstrap();

При остановке никаких «обратных миграций» не запускают. dataSource.destroy() закрывает соединения, а не откатывает схему. Это намеренно: схема базы данных не должна зависеть от того, работает сервис или нет.

Частые ошибки

Закрытие пула в beforeApplicationShutdown. На этом этапе HTTP-запросы и фоновые задачи ещё могут работать — пул нужен им. Закрывать пул только в onApplicationShutdown.

Нет await inflightPromise в планировщике. Если не дождаться текущей итерации в beforeApplicationShutdown, пул закроется раньше, чем планировщик закончит транзакцию — получите ошибку в середине записи.

pool.end() в process.on('SIGTERM', ...) до app.close(). Не обходите lifecycle-хуки NestJS — они дают правильный порядок без ручной оркестрации.

worker.close(true) с флагом force. Этот режим убивает джоб не дожидаясь завершения. Используйте worker.close() без аргументов — BullMQ сам дождётся активного джоба.

logger.error при нормальном pool.end(). Штатное закрытие пула — это ожидаемое событие, логируйте его на уровне INFO, иначе каждый деплой будет шуметь в канале алертов.

Коротко

  • Пул соединений закрывается в onApplicationShutdown — после HTTP drain и фоновых задач.
  • Не закрывайте пул в beforeApplicationShutdown: в этой фазе ещё могут идти транзакции.
  • HTTP-транзакции дожидаются автоматически через server.close() при app.close().
  • Планировщики: сохраняйте текущую итерацию в inflightPromise и ждите её в beforeApplicationShutdown.
  • BullMQ: worker.close() без force — воркер сам дождётся активного джоба.
  • TypeORM: dataSource.destroy() с проверкой isInitialized перед вызовом.
  • Миграции запускают только на старте, до app.listen(); при остановке схему не трогают.

Что почитать дальше

  • HTTP drain в Node.js — server.close(), долгие эндпоинты, таймауты.
  • Фоновые задачи и очереди — планировщик, outbox, BullMQ.
  • Kafka при остановке — consumer.disconnect(), семантика коммита.