← назад к разделу

Когда сервис получает SIGTERM, он должен остановиться аккуратно: дать запросам завершиться, сбросить буферы, закрыть соединения с базой. Если закрыть пул соединений слишком рано — фоновая задача, которая ещё не закончила commit, получит InterfaceError: connection already closed. Слишком поздно — соединения утекут и база будет держать их открытыми без причины.

Разберём, как всё устроено правильно.

engine.dispose() вызывается последним

Пул соединений — это общий ресурс, которым пользуются и HTTP-обработчики, и фоновые задачи. Закрыть пул нужно только тогда, когда все, кто им пользуется, уже завершили работу.

FastAPI управляет жизненным циклом через lifespan — асинхронный контекстный менеджер. Shutdown-секция выполняется в том порядке, в котором написана:

from contextlib import asynccontextmanager
from fastapi import FastAPI
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker

engine = create_async_engine(
    "postgresql+asyncpg://...",
    pool_size=10,
    max_overflow=5,
    pool_pre_ping=True,
)
session_factory = async_sessionmaker(engine, expire_on_commit=False)


@asynccontextmanager
async def lifespan(app: FastAPI):
    # startup
    yield
    # shutdown — порядок важен
    await _stop_background_tasks()   # 1. дожать фоновые задачи
    await _stop_kafka()              # 2. завершить producer/consumer
    await engine.dispose()           # 3. закрыть пул — последним


app = FastAPI(lifespan=lifespan)

engine.dispose() с asyncpg-движком закрывает все простаивающие соединения и ждёт, пока активные вернутся в пул. К этому моменту они уже должны быть возвращены — фоновые задачи завершены на шаге 1. Если соединение так и не вернулось — пул закроется с предупреждением, но не зависнет.

Не добавляй отдельный signal.signal(SIGTERM, ...) для engine.dispose() — uvicorn уже вызывает lifespan-shutdown при получении SIGTERM, дублирование только запутает порядок.

Что происходит с активными транзакциями при SIGTERM

Разные виды кода завершают транзакции по-разному.

HTTP-обработчик

Когда HTTP-обработчик находится внутри транзакции в момент получения SIGTERM, uvicorn даёт ему время завершиться (опция --timeout-graceful-shutdown, по умолчанию 30 секунд). Сессия, открытая через async with session_factory(), автоматически сделает commit при успехе или rollback при исключении через __aexit__.

async def get_session() -> AsyncSession:
    async with session_factory() as session:
        yield session


@router.post("/orders")
async def create_order(
    body: CreateOrderRequest,
    session: AsyncSession = Depends(get_session),
) -> OrderResponse:
    order = Order(customer_id=body.customer_id, total=body.total)
    session.add(order)
    await session.commit()
    return OrderResponse.model_validate(order)

Если timeout истёк — uvicorn разрывает соединение, __aexit__ делает rollback.

Фоновая задача

Фоновые задачи завершаются через флаг готовности. Цикл должен проверять этот флаг, чтобы мягко остановиться после текущей итерации:

async def process_outbox():
    while app_state.is_ready:
        async with session_factory() as session:
            async with session.begin():
                rows = await session.execute(
                    select(OutboxEvent)
                    .where(OutboxEvent.status == "pending")
                    .limit(20)
                    .with_for_update(skip_locked=True)
                )
                batch = rows.scalars().all()
                for event in batch:
                    await publish_event(event)
                    event.status = "published"
        await asyncio.sleep(1)

При SIGTERM app_state.is_ready переходит в False, задача завершает текущую итерацию, транзакция делает commit, цикл выходит. Не используй while True — без флага задача не остановится.

Принудительная отмена с CancelledError

Если задача не успела завершиться сама, её отменяют через cancel():

async def _stop_background_tasks():
    for task in _background_tasks:
        task.cancel()
        try:
            await asyncio.wait_for(task, timeout=25.0)
        except (asyncio.CancelledError, asyncio.TimeoutError):
            pass

Внутри самой задачи при CancelledError транзакция откатится автоматически через __aexit__ контекстного менеджера — это нормально:

async def sync_product_catalog():
    async with session_factory() as session:
        async with session.begin():
            try:
                products = await fetch_external_catalog()
                session.add_all(products)
            except asyncio.CancelledError:
                # транзакция откатится через __aexit__, это нормально
                raise

Consumer Kafka

Если потребитель Kafka обрабатывает сообщения в транзакции, останов работает так: consumer.stop() в lifespan ждёт завершения текущей итерации, транзакция коммитится, затем коммитится offset. Если SIGTERM застал посередине — транзакция откатывается, offset не коммитится, при следующем старте сообщение будет обработано повторно. Защиту от дублирования обеспечивает таблица обработанных событий:

async def consume_order_events():
    async for msg in consumer:
        async with session_factory() as session:
            async with session.begin():
                await session.execute(
                    insert(ProcessedEvent).values(event_id=msg.key)
                    .on_conflict_do_nothing()
                )
                order = await session.get(Order, msg.value["order_id"])
                order.status = msg.value["status"]
        await consumer.commit()

Alembic запускается только при старте

Частое заблуждение: «нужно почистить схему при выходе» или «запустить alembic downgrade в shutdown». Это не паттерн.

Alembic применяет миграции при старте приложения и после этого просто закрывает свои соединения. При завершении Alembic ничего не делает — это правильно:

@asynccontextmanager
async def lifespan(app: FastAPI):
    # startup: применить миграции
    await run_migrations()  # alembic upgrade head
    yield
    # shutdown: только закрытие ресурсов, никакого DDL
    await engine.dispose()

Не добавляй никакой DDL в секцию shutdown.

Частые ошибки

engine.dispose() в начале shutdown. Если вызвать dispose() до отмены фоновых задач — задачи потеряют доступ к базе в середине транзакции. Dispose всегда идёт последним.

signal.signal(SIGTERM, ...) для dispose в теле модуля. Это создаёт параллельный канал завершения в обход lifespan. Uvicorn уже обрабатывает SIGTERM и вызывает lifespan — дополнительный обработчик не нужен.

while True в фоновом цикле. Без проверки флага готовности задача не остановится по команде. Используй while app_state.is_ready.

alembic downgrade в shutdown. Это не паттерн. Shutdown — это закрытие ресурсов, не откат схемы.

Логирование закрытия пула как ошибки. Сообщение asyncpg pool closed — нормальное событие завершения, не ошибка. Используй уровень INFO.

Коротко

  • engine.dispose() вызывается в lifespan-shutdown последним — после фоновых задач и Kafka.
  • HTTP-транзакции завершаются через uvicorn graceful (--timeout-graceful-shutdown).
  • Фоновые задачи останавливаются через флаг готовности (app_state.is_ready), а не while True.
  • CancelledError в задаче — транзакция откатывается автоматически через __aexit__.
  • Alembic работает только при старте, в shutdown никакого DDL.
  • async with session_factory() всегда делает commit при успехе и rollback при исключении — ручное управление не нужно.

Что почитать дальше

  • HTTP drain — uvicorn graceful, preStop sleep, долгие эндпоинты.
  • Фоновые задачи и outbox — asyncio-задачи, APScheduler, CancelledError.
  • Kafka shutdown — aiokafka consumer и producer, ручной commit.
  • Бюджеты и наблюдаемость — раскладка 60s-бюджета, метрики завершения.