Опирается на правила: R-SHUT-DB-1R-SHUT-DB-3 и R-SHUT-DB-X1 из Graceful Shutdown Style Guide → раздел 4. БД и persistence.

Важно знать

  • engine.dispose() вызывается в lifespan-shutdown после дрейна HTTP и фоновых задач, не в начале.
  • Не закрывай пул первым — фоновые asyncio-задачи ещё могут держать активные соединения.
  • Активные транзакции завершаются своим каналом: HTTP — uvicorn graceful, фон — CancelledError с дожатием итерации.
  • Alembic работает на startup, не на shutdown — миф «почистить схему при выходе» неверен.
  • AsyncSession с expire_on_commit=False не удерживает соединение вне транзакции — пул освобождается корректно.
  • Главный принцип: доверяй порядку lifespan — dispose в самом конце блока shutdown, не раньше.

DB-пул — критический ресурс. Закрыть его раньше, чем завершатся активные транзакции, означает InterfaceError: connection already closed в момент, когда outbox-relay пытается сделать commit последней итерации. UCP формулирует одно правило: не вмешивайся в порядок lifespan раньше времени.

SQLAlchemy engine.dispose() закрывается последним

R-SHUT-DB-1: порядок lifespan-shutdown определяет, что пул закрывается после всех задач.

FastAPI lifespan — асинхронный контекстный менеджер. Shutdown-секция выполняется в том порядке, в котором написана. Правильная раскладка:

from contextlib import asynccontextmanager
from fastapi import FastAPI
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker

engine = create_async_engine(
    "postgresql+asyncpg://...",
    pool_size=10,
    max_overflow=5,
    pool_pre_ping=True,
)
session_factory = async_sessionmaker(engine, expire_on_commit=False)


@asynccontextmanager
async def lifespan(app: FastAPI):
    # startup
    yield
    # shutdown — порядок важен
    await _stop_background_tasks()   # 1. дожать фоновые задачи
    await _stop_kafka()              # 2. flush + close producer/consumer
    await engine.dispose()           # 3. закрыть пул — последним
    _log_shutdown_complete()


app = FastAPI(lifespan=lifespan)

engine.dispose() с asyncpg-движком закрывает все idle-соединения в пуле и ждёт, пока active-соединения вернутся (они уже должны быть возвращены к этому моменту — фоновые задачи завершены на шаге 1). Если соединение не вернулось — пул закроется с предупреждением, но не зависнет.

Не добавляй отдельный signal.signal(SIGTERM, ...) для engine.dispose() — lifespan-shutdown uvicorn уже вызывается на SIGTERM.

Активные транзакции завершаются своим каналом

R-SHUT-DB-2: каждый тип контекста имеет свой механизм.

HTTP-handler в транзакции

from sqlalchemy.ext.asyncio import AsyncSession
from fastapi import Depends


async def get_session() -> AsyncSession:
    async with session_factory() as session:
        yield session


@router.post("/orders")
async def create_order(
    body: CreateOrderRequest,
    session: AsyncSession = Depends(get_session),
) -> OrderResponse:
    order = Order(customer_id=body.customer_id, total=body.total)
    session.add(order)
    await session.commit()
    return OrderResponse.model_validate(order)

При SIGTERM:

  • uvicorn graceful (--timeout-graceful-shutdown 30) даёт handler завершиться.
  • async with session_factory()__aexit__ делает commit при успехе или rollback при исключении.
  • Если uvicorn timeout истёк — соединение прерывается, session-контекст делает rollback через __aexit__.

Фоновая asyncio-задача в транзакции

import asyncio


async def process_outbox():
    while app_state.is_ready:
        async with session_factory() as session:
            async with session.begin():
                rows = await session.execute(
                    select(OutboxEvent)
                    .where(OutboxEvent.status == "pending")
                    .limit(20)
                    .with_for_update(skip_locked=True)
                )
                batch = rows.scalars().all()
                for event in batch:
                    await publish_event(event)
                    event.status = "published"
        await asyncio.sleep(1)

При SIGTERM — app_state.is_ready переходит в False (readiness-флаг, R-SHUT-CFG-3), цикл завершает текущую итерацию, транзакция commit/rollback, задача заканчивается. Не используй while True — цикл должен проверять readiness-флаг.

Дожатие задачи с CancelledError

async def _stop_background_tasks():
    for task in _background_tasks:
        task.cancel()
        try:
            await asyncio.wait_for(task, timeout=25.0)
        except (asyncio.CancelledError, asyncio.TimeoutError):
            pass

R-SHUT-SCHED-2: в самой задаче критическую секцию (commit транзакции) обрабатываем до re-raise:

async def sync_product_catalog():
    async with session_factory() as session:
        async with session.begin():
            try:
                products = await fetch_external_catalog()
                session.add_all(products)
            except asyncio.CancelledError:
                # транзакция откатится через __aexit__, это нормально
                raise
            # commit происходит в __aexit__ session.begin()

aiokafka consumer в транзакции

async def consume_order_events():
    async for msg in consumer:
        async with session_factory() as session:
            async with session.begin():
                await session.execute(
                    insert(ProcessedEvent).values(event_id=msg.key)
                    .on_conflict_do_nothing()
                )
                order = await session.get(Order, msg.value["order_id"])
                order.status = msg.value["status"]
        await consumer.commit()

При shutdown — consumer.stop() в lifespan дожидается завершения текущей итерации. Транзакция коммитится до consumer.commit() — если SIGTERM застал посередине, транзакция откатывается, offset не коммитится, replay при следующем старте — дедуп через ProcessedEvent защищает.

Alembic — только startup

R-SHUT-DB-3: миф «миграции при выходе» не существует.

Alembic:

  • Запускается при старте приложения (через alembic upgrade head в entrypoint или в lifespan-startup).
  • Применяет pending-миграции.
  • После применения — закрывает свои соединения.

На shutdown — ничего не делает. Не добавляй alembic downgrade или любой DDL в lifespan-shutdown. Нет «очистки схемы при выходе» — это не паттерн.

@asynccontextmanager
async def lifespan(app: FastAPI):
    # startup: применить миграции
    await run_migrations()  # alembic upgrade head
    yield
    # shutdown: только закрытие ресурсов, никакого DDL
    await engine.dispose()

Что запрещено

АнтипаттернПравилоЧто взамен
engine.dispose() в начале lifespan-shutdown, до отмены задачR-SHUT-DB-X1dispose последним, после всех задач
signal.signal(SIGTERM, lambda: engine.dispose()) в теле модуляR-SHUT-DB-1lifespan-shutdown управляет порядком
asyncio.get_event_loop().close() до engine.dispose()R-SHUT-DB-1сначала dispose, потом loop останавливается сам
while True без проверки readiness-флага в outbox-relayR-SHUT-SCHED-3проверять app_state.is_ready в условии цикла
alembic downgrade в shutdownR-SHUT-DB-3только startup; shutdown — закрытие ресурсов
Логирование asyncpg pool closed на ERRORR-SHUT-OBS-X1нормальное событие, уровень INFO
Открытие нового соединения в __del__ или финализатореR-SHUT-DB-2управление сессиями через dependency injection

Куда дальше

  • FastAPI/uvicorn конфигурация--timeout-graceful-shutdown, readiness-флаг, lifespan-шаблон.
  • HTTP drain — uvicorn graceful, preStop sleep, долгие эндпоинты 202+polling.
  • Фоновые задачи и outbox — asyncio задачи, APScheduler, CancelledError, outbox-relay.
  • Kafka shutdown — aiokafka consumer/producer stop, manual commit, cascade в outbox.
  • Идемпотентность in-flight — защита от replay при SIGTERM, Idempotency-Key, processed_event.
  • Бюджеты и observability — раскладка 60s budget, app_shutdown_duration_seconds, структурный лог.
  • Kubernetes — terminationGracePeriodSeconds, probes на /health/{live,ready}, maxUnavailable 0.