Опирается на правила:
R-SHUT-DB-1…R-SHUT-DB-3иR-SHUT-DB-X1из Graceful Shutdown Style Guide → раздел 4. БД и persistence.
Важно знать
engine.dispose()вызывается в lifespan-shutdown после дрейна HTTP и фоновых задач, не в начале.- Не закрывай пул первым — фоновые asyncio-задачи ещё могут держать активные соединения.
- Активные транзакции завершаются своим каналом: HTTP — uvicorn graceful, фон —
CancelledErrorс дожатием итерации.- Alembic работает на startup, не на shutdown — миф «почистить схему при выходе» неверен.
AsyncSessionсexpire_on_commit=Falseне удерживает соединение вне транзакции — пул освобождается корректно.- Главный принцип: доверяй порядку lifespan — dispose в самом конце блока shutdown, не раньше.
DB-пул — критический ресурс. Закрыть его раньше, чем завершатся активные транзакции, означает InterfaceError: connection already closed в момент, когда outbox-relay пытается сделать commit последней итерации. UCP формулирует одно правило: не вмешивайся в порядок lifespan раньше времени.
SQLAlchemy engine.dispose() закрывается последним
R-SHUT-DB-1: порядок lifespan-shutdown определяет, что пул закрывается после всех задач.
FastAPI lifespan — асинхронный контекстный менеджер. Shutdown-секция выполняется в том порядке, в котором написана. Правильная раскладка:
from contextlib import asynccontextmanager
from fastapi import FastAPI
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
engine = create_async_engine(
"postgresql+asyncpg://...",
pool_size=10,
max_overflow=5,
pool_pre_ping=True,
)
session_factory = async_sessionmaker(engine, expire_on_commit=False)
@asynccontextmanager
async def lifespan(app: FastAPI):
# startup
yield
# shutdown — порядок важен
await _stop_background_tasks() # 1. дожать фоновые задачи
await _stop_kafka() # 2. flush + close producer/consumer
await engine.dispose() # 3. закрыть пул — последним
_log_shutdown_complete()
app = FastAPI(lifespan=lifespan)
engine.dispose() с asyncpg-движком закрывает все idle-соединения в пуле и ждёт, пока active-соединения вернутся (они уже должны быть возвращены к этому моменту — фоновые задачи завершены на шаге 1). Если соединение не вернулось — пул закроется с предупреждением, но не зависнет.
Не добавляй отдельный signal.signal(SIGTERM, ...) для engine.dispose() — lifespan-shutdown uvicorn уже вызывается на SIGTERM.
Активные транзакции завершаются своим каналом
R-SHUT-DB-2: каждый тип контекста имеет свой механизм.
HTTP-handler в транзакции
from sqlalchemy.ext.asyncio import AsyncSession
from fastapi import Depends
async def get_session() -> AsyncSession:
async with session_factory() as session:
yield session
@router.post("/orders")
async def create_order(
body: CreateOrderRequest,
session: AsyncSession = Depends(get_session),
) -> OrderResponse:
order = Order(customer_id=body.customer_id, total=body.total)
session.add(order)
await session.commit()
return OrderResponse.model_validate(order)
При SIGTERM:
- uvicorn graceful (
--timeout-graceful-shutdown 30) даёт handler завершиться. async with session_factory()—__aexit__делает commit при успехе или rollback при исключении.- Если uvicorn timeout истёк — соединение прерывается, session-контекст делает rollback через
__aexit__.
Фоновая asyncio-задача в транзакции
import asyncio
async def process_outbox():
while app_state.is_ready:
async with session_factory() as session:
async with session.begin():
rows = await session.execute(
select(OutboxEvent)
.where(OutboxEvent.status == "pending")
.limit(20)
.with_for_update(skip_locked=True)
)
batch = rows.scalars().all()
for event in batch:
await publish_event(event)
event.status = "published"
await asyncio.sleep(1)
При SIGTERM — app_state.is_ready переходит в False (readiness-флаг, R-SHUT-CFG-3), цикл завершает текущую итерацию, транзакция commit/rollback, задача заканчивается. Не используй while True — цикл должен проверять readiness-флаг.
Дожатие задачи с CancelledError
async def _stop_background_tasks():
for task in _background_tasks:
task.cancel()
try:
await asyncio.wait_for(task, timeout=25.0)
except (asyncio.CancelledError, asyncio.TimeoutError):
pass
R-SHUT-SCHED-2: в самой задаче критическую секцию (commit транзакции) обрабатываем до re-raise:
async def sync_product_catalog():
async with session_factory() as session:
async with session.begin():
try:
products = await fetch_external_catalog()
session.add_all(products)
except asyncio.CancelledError:
# транзакция откатится через __aexit__, это нормально
raise
# commit происходит в __aexit__ session.begin()
aiokafka consumer в транзакции
async def consume_order_events():
async for msg in consumer:
async with session_factory() as session:
async with session.begin():
await session.execute(
insert(ProcessedEvent).values(event_id=msg.key)
.on_conflict_do_nothing()
)
order = await session.get(Order, msg.value["order_id"])
order.status = msg.value["status"]
await consumer.commit()
При shutdown — consumer.stop() в lifespan дожидается завершения текущей итерации. Транзакция коммитится до consumer.commit() — если SIGTERM застал посередине, транзакция откатывается, offset не коммитится, replay при следующем старте — дедуп через ProcessedEvent защищает.
Alembic — только startup
R-SHUT-DB-3: миф «миграции при выходе» не существует.
Alembic:
- Запускается при старте приложения (через
alembic upgrade headв entrypoint или в lifespan-startup). - Применяет pending-миграции.
- После применения — закрывает свои соединения.
На shutdown — ничего не делает. Не добавляй alembic downgrade или любой DDL в lifespan-shutdown. Нет «очистки схемы при выходе» — это не паттерн.
@asynccontextmanager
async def lifespan(app: FastAPI):
# startup: применить миграции
await run_migrations() # alembic upgrade head
yield
# shutdown: только закрытие ресурсов, никакого DDL
await engine.dispose()
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
engine.dispose() в начале lifespan-shutdown, до отмены задач | R-SHUT-DB-X1 | dispose последним, после всех задач |
signal.signal(SIGTERM, lambda: engine.dispose()) в теле модуля | R-SHUT-DB-1 | lifespan-shutdown управляет порядком |
asyncio.get_event_loop().close() до engine.dispose() | R-SHUT-DB-1 | сначала dispose, потом loop останавливается сам |
while True без проверки readiness-флага в outbox-relay | R-SHUT-SCHED-3 | проверять app_state.is_ready в условии цикла |
alembic downgrade в shutdown | R-SHUT-DB-3 | только startup; shutdown — закрытие ресурсов |
Логирование asyncpg pool closed на ERROR | R-SHUT-OBS-X1 | нормальное событие, уровень INFO |
Открытие нового соединения в __del__ или финализаторе | R-SHUT-DB-2 | управление сессиями через dependency injection |
Куда дальше
- FastAPI/uvicorn конфигурация —
--timeout-graceful-shutdown, readiness-флаг, lifespan-шаблон. - HTTP drain — uvicorn graceful, preStop sleep, долгие эндпоинты 202+polling.
- Фоновые задачи и outbox — asyncio задачи, APScheduler,
CancelledError, outbox-relay. - Kafka shutdown — aiokafka consumer/producer stop, manual commit, cascade в outbox.
- Идемпотентность in-flight — защита от replay при SIGTERM,
Idempotency-Key,processed_event. - Бюджеты и observability — раскладка 60s budget,
app_shutdown_duration_seconds, структурный лог. - Kubernetes —
terminationGracePeriodSeconds, probes на/health/{live,ready},maxUnavailable 0.