Когда сервис получает SIGTERM, он должен остановиться аккуратно: дать запросам завершиться, сбросить буферы, закрыть соединения с базой. Если закрыть пул соединений слишком рано — фоновая задача, которая ещё не закончила commit, получит InterfaceError: connection already closed. Слишком поздно — соединения утекут и база будет держать их открытыми без причины.
Разберём, как всё устроено правильно.
engine.dispose() вызывается последним
Пул соединений — это общий ресурс, которым пользуются и HTTP-обработчики, и фоновые задачи. Закрыть пул нужно только тогда, когда все, кто им пользуется, уже завершили работу.
FastAPI управляет жизненным циклом через lifespan — асинхронный контекстный менеджер. Shutdown-секция выполняется в том порядке, в котором написана:
from contextlib import asynccontextmanager
from fastapi import FastAPI
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker
engine = create_async_engine(
"postgresql+asyncpg://...",
pool_size=10,
max_overflow=5,
pool_pre_ping=True,
)
session_factory = async_sessionmaker(engine, expire_on_commit=False)
@asynccontextmanager
async def lifespan(app: FastAPI):
# startup
yield
# shutdown — порядок важен
await _stop_background_tasks() # 1. дожать фоновые задачи
await _stop_kafka() # 2. завершить producer/consumer
await engine.dispose() # 3. закрыть пул — последним
app = FastAPI(lifespan=lifespan)
engine.dispose() с asyncpg-движком закрывает все простаивающие соединения и ждёт, пока активные вернутся в пул. К этому моменту они уже должны быть возвращены — фоновые задачи завершены на шаге 1. Если соединение так и не вернулось — пул закроется с предупреждением, но не зависнет.
Не добавляй отдельный signal.signal(SIGTERM, ...) для engine.dispose() — uvicorn уже вызывает lifespan-shutdown при получении SIGTERM, дублирование только запутает порядок.
Что происходит с активными транзакциями при SIGTERM
Разные виды кода завершают транзакции по-разному.
HTTP-обработчик
Когда HTTP-обработчик находится внутри транзакции в момент получения SIGTERM, uvicorn даёт ему время завершиться (опция --timeout-graceful-shutdown, по умолчанию 30 секунд). Сессия, открытая через async with session_factory(), автоматически сделает commit при успехе или rollback при исключении через __aexit__.
async def get_session() -> AsyncSession:
async with session_factory() as session:
yield session
@router.post("/orders")
async def create_order(
body: CreateOrderRequest,
session: AsyncSession = Depends(get_session),
) -> OrderResponse:
order = Order(customer_id=body.customer_id, total=body.total)
session.add(order)
await session.commit()
return OrderResponse.model_validate(order)
Если timeout истёк — uvicorn разрывает соединение, __aexit__ делает rollback.
Фоновая задача
Фоновые задачи завершаются через флаг готовности. Цикл должен проверять этот флаг, чтобы мягко остановиться после текущей итерации:
async def process_outbox():
while app_state.is_ready:
async with session_factory() as session:
async with session.begin():
rows = await session.execute(
select(OutboxEvent)
.where(OutboxEvent.status == "pending")
.limit(20)
.with_for_update(skip_locked=True)
)
batch = rows.scalars().all()
for event in batch:
await publish_event(event)
event.status = "published"
await asyncio.sleep(1)
При SIGTERM app_state.is_ready переходит в False, задача завершает текущую итерацию, транзакция делает commit, цикл выходит. Не используй while True — без флага задача не остановится.
Принудительная отмена с CancelledError
Если задача не успела завершиться сама, её отменяют через cancel():
async def _stop_background_tasks():
for task in _background_tasks:
task.cancel()
try:
await asyncio.wait_for(task, timeout=25.0)
except (asyncio.CancelledError, asyncio.TimeoutError):
pass
Внутри самой задачи при CancelledError транзакция откатится автоматически через __aexit__ контекстного менеджера — это нормально:
async def sync_product_catalog():
async with session_factory() as session:
async with session.begin():
try:
products = await fetch_external_catalog()
session.add_all(products)
except asyncio.CancelledError:
# транзакция откатится через __aexit__, это нормально
raise
Consumer Kafka
Если потребитель Kafka обрабатывает сообщения в транзакции, останов работает так: consumer.stop() в lifespan ждёт завершения текущей итерации, транзакция коммитится, затем коммитится offset. Если SIGTERM застал посередине — транзакция откатывается, offset не коммитится, при следующем старте сообщение будет обработано повторно. Защиту от дублирования обеспечивает таблица обработанных событий:
async def consume_order_events():
async for msg in consumer:
async with session_factory() as session:
async with session.begin():
await session.execute(
insert(ProcessedEvent).values(event_id=msg.key)
.on_conflict_do_nothing()
)
order = await session.get(Order, msg.value["order_id"])
order.status = msg.value["status"]
await consumer.commit()
Alembic запускается только при старте
Частое заблуждение: «нужно почистить схему при выходе» или «запустить alembic downgrade в shutdown». Это не паттерн.
Alembic применяет миграции при старте приложения и после этого просто закрывает свои соединения. При завершении Alembic ничего не делает — это правильно:
@asynccontextmanager
async def lifespan(app: FastAPI):
# startup: применить миграции
await run_migrations() # alembic upgrade head
yield
# shutdown: только закрытие ресурсов, никакого DDL
await engine.dispose()
Не добавляй никакой DDL в секцию shutdown.
Частые ошибки
engine.dispose() в начале shutdown. Если вызвать dispose() до отмены фоновых задач — задачи потеряют доступ к базе в середине транзакции. Dispose всегда идёт последним.
signal.signal(SIGTERM, ...) для dispose в теле модуля. Это создаёт параллельный канал завершения в обход lifespan. Uvicorn уже обрабатывает SIGTERM и вызывает lifespan — дополнительный обработчик не нужен.
while True в фоновом цикле. Без проверки флага готовности задача не остановится по команде. Используй while app_state.is_ready.
alembic downgrade в shutdown. Это не паттерн. Shutdown — это закрытие ресурсов, не откат схемы.
Логирование закрытия пула как ошибки. Сообщение asyncpg pool closed — нормальное событие завершения, не ошибка. Используй уровень INFO.
Коротко
engine.dispose()вызывается в lifespan-shutdown последним — после фоновых задач и Kafka.- HTTP-транзакции завершаются через uvicorn graceful (
--timeout-graceful-shutdown). - Фоновые задачи останавливаются через флаг готовности (
app_state.is_ready), а неwhile True. CancelledErrorв задаче — транзакция откатывается автоматически через__aexit__.- Alembic работает только при старте, в shutdown никакого DDL.
async with session_factory()всегда делает commit при успехе и rollback при исключении — ручное управление не нужно.
Что почитать дальше
- HTTP drain — uvicorn graceful, preStop sleep, долгие эндпоинты.
- Фоновые задачи и outbox — asyncio-задачи, APScheduler, CancelledError.
- Kafka shutdown — aiokafka consumer и producer, ручной commit.
- Бюджеты и наблюдаемость — раскладка 60s-бюджета, метрики завершения.