Опирается на правила:
R-SHUT-SCHED-1…R-SHUT-SCHED-3иR-SHUT-SCHED-X1из Graceful Shutdown Style Guide → раздел 5. Scheduled / async / outbox.
Важно знать
task.cancel()+await task— корректный способ остановить фоновую задачу; безawaitзадача зависает.CancelledErrorнужно обрабатывать явно — критичную секцию (commit,producer.send) дожать, после re-raise.- APScheduler —
scheduler.shutdown(wait=True), неwait=False; иначе текущая итерация убита.- Outbox-relay на SIGTERM завершает текущий batch (
FOR UPDATE SKIP LOCKED), не начинает новый.while Trueбез проверки readiness-флага — relay нельзя остановить чисто.engine.dispose()— после отмены задач, не до; пул нужен на время дожатия.- Total budget 60s —
uvicorn graceful ≤25s+ задачи≤20s+ Kafka≤15s; не все на максимуме одновременно.
Фоновые asyncio-задачи — точка наибольшего риска при shutdown. Если задача убита на середине, транзакция откатывается, но side-effect (HTTP-вызов, producer.send) уже произошёл. UCP требует: дать задаче завершить текущую итерацию, новую не начинать.
asyncio.Task: отмена с дожатием
R-SHUT-SCHED-1 + R-SHUT-SCHED-2.
Типовой lifespan FastAPI с фоновой задачей:
from contextlib import asynccontextmanager
from fastapi import FastAPI
import asyncio
import structlog
log = structlog.get_logger()
async def order_sync_worker(ready: dict) -> None:
while ready["accepting"]:
try:
await _process_pending_orders()
await asyncio.sleep(0.5)
except asyncio.CancelledError:
log.info("order_sync_worker.cancelled")
await _process_pending_orders() # дожать последнюю итерацию
raise
except Exception:
log.exception("order_sync_worker.error")
await asyncio.sleep(2)
@asynccontextmanager
async def lifespan(app: FastAPI):
ready = {"accepting": True}
app.state.ready = ready
task = asyncio.create_task(order_sync_worker(ready), name="order-sync")
log.info("order_sync_worker.started")
yield
log.info("shutdown.begin")
ready["accepting"] = False
task.cancel()
try:
await asyncio.wait_for(asyncio.shield(task), timeout=20)
except (asyncio.CancelledError, asyncio.TimeoutError):
pass
log.info("shutdown.tasks_done")
Что происходит на SIGTERM:
- uvicorn получает SIGTERM → входит в lifespan-shutdown.
ready["accepting"] = False— следующая итерация не начнётся.task.cancel()— в задачу бросаетсяCancelledError.- Задача перехватывает
CancelledError, дожимает последнюю порцию, re-raise. await asyncio.wait_for(..., timeout=20)— ждём максимум 20 секунд.- Только после этого —
engine.dispose()(см. секцию про порядок).
APScheduler: shutdown(wait=True)
R-SHUT-SCHED-1.
Если фоновая логика реализована через APScheduler:
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from contextlib import asynccontextmanager
from fastapi import FastAPI
import structlog
log = structlog.get_logger()
async def sync_products() -> None:
async with async_session() as session:
pending = await session.execute(
select(Product).where(Product.synced_at.is_(None)).limit(20)
)
for product in pending.scalars():
await _push_to_warehouse(product)
product.synced_at = datetime.utcnow()
await session.commit()
@asynccontextmanager
async def lifespan(app: FastAPI):
scheduler = AsyncIOScheduler()
scheduler.add_job(sync_products, "interval", seconds=1, id="product-sync")
scheduler.start()
log.info("scheduler.started")
yield
log.info("shutdown.scheduler.begin")
scheduler.shutdown(wait=True) # дожать текущую итерацию
log.info("shutdown.scheduler.done")
wait=True — APScheduler ждёт завершения исполняемого job. Новые старты не происходят. wait=False убивает job немедленно, что нарушает R-SHUT-SCHED-X1.
Outbox-relay: цикл через readiness-флаг
R-SHUT-SCHED-3.
Relay-цикл не должен быть while True — нечем остановить его чисто:
# ПЛОХО — нельзя остановить без жёсткой отмены посредине batch
async def relay_loop() -> None:
while True:
await _publish_batch()
await asyncio.sleep(0.1)
Корректно — через readiness-флаг:
async def outbox_relay(ready: dict) -> None:
while ready["accepting"]:
try:
published = await _publish_outbox_batch()
if published == 0:
await asyncio.sleep(0.5)
except asyncio.CancelledError:
log.info("outbox_relay.cancelled, finishing current batch")
await _publish_outbox_batch() # дожать последний batch
raise
except Exception:
log.exception("outbox_relay.error")
await asyncio.sleep(2)
async def _publish_outbox_batch() -> int:
async with async_session() as session:
async with session.begin():
rows = await session.execute(
select(OutboxEvent)
.where(OutboxEvent.published_at.is_(None))
.order_by(OutboxEvent.id)
.limit(50)
.with_for_update(skip_locked=True)
)
events = rows.scalars().all()
if not events:
return 0
for event in events:
await producer.send_and_wait(
event.topic,
key=event.partition_key.encode(),
value=event.payload.encode(),
)
event.published_at = datetime.utcnow()
return len(events)
На SIGTERM:
ready["accepting"] = False— после текущего batch новая итерация не начнётся.task.cancel()— если batch ещё идёт,CancelledErrorперехватывается, batch дожимается.FOR UPDATE SKIP LOCKED— другой pod подхватывает незаблокированные строки.
Долгий cascade: outbox вместо inline
R-SHUT-SCHED-2.
Если внутри фоновой задачи есть долгий HTTP-вызов с retry (например, вызов payment-provider по заказам Customer):
# ОПАСНО — SIGTERM в момент retry → payment прошёл, БД не обновлена
async def charge_customer(customer_id: UUID, amount: Decimal) -> None:
result = await payment_client.charge(customer_id, amount) # 200 OK
await session.commit() # SIGTERM здесь → inconsistency
Безопасный вариант — записать намерение в outbox, relay отправит:
async def enqueue_charge(
session: AsyncSession,
customer_id: UUID,
amount: Decimal,
idempotency_key: str,
) -> None:
session.add(OutboxEvent(
topic="payments.charge-requested",
partition_key=str(customer_id),
payload=json.dumps({
"customer_id": str(customer_id),
"amount": str(amount),
"idempotency_key": idempotency_key,
}),
))
await session.flush()
Тогда SIGTERM в любой момент — outbox строка либо записана (relay отправит), либо нет (транзакция откатилась). Inconsistency невозможна. idempotency_key защищает от двойной отправки при retry.
Порядок закрытия в lifespan
R-SHUT-DB-X1: engine.dispose() — после задач, не до.
@asynccontextmanager
async def lifespan(app: FastAPI):
ready = {"accepting": True}
app.state.ready = ready
relay_task = asyncio.create_task(outbox_relay(ready), name="outbox-relay")
sync_task = asyncio.create_task(order_sync_worker(ready), name="order-sync")
yield
log.info("shutdown.begin", pod=os.environ.get("HOSTNAME"))
ready["accepting"] = False
relay_task.cancel()
sync_task.cancel()
await asyncio.gather(relay_task, sync_task, return_exceptions=True)
log.info("shutdown.tasks_done")
await producer.stop() # flush + close Kafka producer
await consumer.stop() # commit offset + close
engine.dispose() # закрыть пул SQLAlchemy — последним
log.info("shutdown.complete")
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
scheduler.shutdown(wait=False) | R-SHUT-SCHED-X1 | wait=True |
task.cancel() без await task | R-SHUT-SCHED-1 | await asyncio.gather(task, return_exceptions=True) |
while True в relay без readiness-флага | R-SHUT-SCHED-3 | while ready["accepting"] |
CancelledError перехвачен и поглощён | R-SHUT-SCHED-2 | дожать критичную секцию, затем raise |
engine.dispose() до отмены задач | R-SHUT-DB-X1 | dispose — последним в lifespan |
| Долгий HTTP с retry inline в задаче | R-SHUT-SCHED-2 | outbox + relay |
asyncio.wait_for(task, timeout=0) | R-SHUT-SCHED-1 | минимум 15–20s |
Куда дальше
- Бюджеты и observability — cumulative budget 60s, метрика
app_shutdown_duration_seconds. - БД и persistence —
engine.dispose(), транзакции на shutdown. - HTTP drain — uvicorn graceful, preStop sleep.
- Идемпотентность in-flight —
Idempotency-Key, outbox-дедуп. - Runtime/конфигурация uvicorn —
--timeout-graceful-shutdown, readiness-флаг. - Kafka shutdown —
consumer.stop(),producer.stop(), manual commit. - Kubernetes —
terminationGracePeriodSeconds, preStop, probes.