Опирается на правила: R-SHUT-SCHED-1R-SHUT-SCHED-3 и R-SHUT-SCHED-X1 из Graceful Shutdown Style Guide → раздел 5. Scheduled / async / outbox.

Важно знать

  • task.cancel() + await task — корректный способ остановить фоновую задачу; без await задача зависает.
  • CancelledError нужно обрабатывать явно — критичную секцию (commit, producer.send) дожать, после re-raise.
  • APSchedulerscheduler.shutdown(wait=True), не wait=False; иначе текущая итерация убита.
  • Outbox-relay на SIGTERM завершает текущий batch (FOR UPDATE SKIP LOCKED), не начинает новый.
  • while True без проверки readiness-флага — relay нельзя остановить чисто.
  • engine.dispose() — после отмены задач, не до; пул нужен на время дожатия.
  • Total budget 60suvicorn graceful ≤25s + задачи ≤20s + Kafka ≤15s; не все на максимуме одновременно.

Фоновые asyncio-задачи — точка наибольшего риска при shutdown. Если задача убита на середине, транзакция откатывается, но side-effect (HTTP-вызов, producer.send) уже произошёл. UCP требует: дать задаче завершить текущую итерацию, новую не начинать.

asyncio.Task: отмена с дожатием

R-SHUT-SCHED-1 + R-SHUT-SCHED-2.

Типовой lifespan FastAPI с фоновой задачей:

from contextlib import asynccontextmanager
from fastapi import FastAPI
import asyncio
import structlog

log = structlog.get_logger()

async def order_sync_worker(ready: dict) -> None:
    while ready["accepting"]:
        try:
            await _process_pending_orders()
            await asyncio.sleep(0.5)
        except asyncio.CancelledError:
            log.info("order_sync_worker.cancelled")
            await _process_pending_orders()   # дожать последнюю итерацию
            raise
        except Exception:
            log.exception("order_sync_worker.error")
            await asyncio.sleep(2)

@asynccontextmanager
async def lifespan(app: FastAPI):
    ready = {"accepting": True}
    app.state.ready = ready

    task = asyncio.create_task(order_sync_worker(ready), name="order-sync")
    log.info("order_sync_worker.started")

    yield

    log.info("shutdown.begin")
    ready["accepting"] = False
    task.cancel()
    try:
        await asyncio.wait_for(asyncio.shield(task), timeout=20)
    except (asyncio.CancelledError, asyncio.TimeoutError):
        pass
    log.info("shutdown.tasks_done")

Что происходит на SIGTERM:

  1. uvicorn получает SIGTERM → входит в lifespan-shutdown.
  2. ready["accepting"] = False — следующая итерация не начнётся.
  3. task.cancel() — в задачу бросается CancelledError.
  4. Задача перехватывает CancelledError, дожимает последнюю порцию, re-raise.
  5. await asyncio.wait_for(..., timeout=20) — ждём максимум 20 секунд.
  6. Только после этого — engine.dispose() (см. секцию про порядок).

APScheduler: shutdown(wait=True)

R-SHUT-SCHED-1.

Если фоновая логика реализована через APScheduler:

from apscheduler.schedulers.asyncio import AsyncIOScheduler
from contextlib import asynccontextmanager
from fastapi import FastAPI
import structlog

log = structlog.get_logger()

async def sync_products() -> None:
    async with async_session() as session:
        pending = await session.execute(
            select(Product).where(Product.synced_at.is_(None)).limit(20)
        )
        for product in pending.scalars():
            await _push_to_warehouse(product)
            product.synced_at = datetime.utcnow()
        await session.commit()

@asynccontextmanager
async def lifespan(app: FastAPI):
    scheduler = AsyncIOScheduler()
    scheduler.add_job(sync_products, "interval", seconds=1, id="product-sync")
    scheduler.start()
    log.info("scheduler.started")

    yield

    log.info("shutdown.scheduler.begin")
    scheduler.shutdown(wait=True)   # дожать текущую итерацию
    log.info("shutdown.scheduler.done")

wait=True — APScheduler ждёт завершения исполняемого job. Новые старты не происходят. wait=False убивает job немедленно, что нарушает R-SHUT-SCHED-X1.

Outbox-relay: цикл через readiness-флаг

R-SHUT-SCHED-3.

Relay-цикл не должен быть while True — нечем остановить его чисто:

# ПЛОХО — нельзя остановить без жёсткой отмены посредине batch
async def relay_loop() -> None:
    while True:
        await _publish_batch()
        await asyncio.sleep(0.1)

Корректно — через readiness-флаг:

async def outbox_relay(ready: dict) -> None:
    while ready["accepting"]:
        try:
            published = await _publish_outbox_batch()
            if published == 0:
                await asyncio.sleep(0.5)
        except asyncio.CancelledError:
            log.info("outbox_relay.cancelled, finishing current batch")
            await _publish_outbox_batch()   # дожать последний batch
            raise
        except Exception:
            log.exception("outbox_relay.error")
            await asyncio.sleep(2)

async def _publish_outbox_batch() -> int:
    async with async_session() as session:
        async with session.begin():
            rows = await session.execute(
                select(OutboxEvent)
                .where(OutboxEvent.published_at.is_(None))
                .order_by(OutboxEvent.id)
                .limit(50)
                .with_for_update(skip_locked=True)
            )
            events = rows.scalars().all()
            if not events:
                return 0

            for event in events:
                await producer.send_and_wait(
                    event.topic,
                    key=event.partition_key.encode(),
                    value=event.payload.encode(),
                )
                event.published_at = datetime.utcnow()

            return len(events)

На SIGTERM:

  1. ready["accepting"] = False — после текущего batch новая итерация не начнётся.
  2. task.cancel() — если batch ещё идёт, CancelledError перехватывается, batch дожимается.
  3. FOR UPDATE SKIP LOCKED — другой pod подхватывает незаблокированные строки.

Долгий cascade: outbox вместо inline

R-SHUT-SCHED-2.

Если внутри фоновой задачи есть долгий HTTP-вызов с retry (например, вызов payment-provider по заказам Customer):

# ОПАСНО — SIGTERM в момент retry → payment прошёл, БД не обновлена
async def charge_customer(customer_id: UUID, amount: Decimal) -> None:
    result = await payment_client.charge(customer_id, amount)  # 200 OK
    await session.commit()  # SIGTERM здесь → inconsistency

Безопасный вариант — записать намерение в outbox, relay отправит:

async def enqueue_charge(
    session: AsyncSession,
    customer_id: UUID,
    amount: Decimal,
    idempotency_key: str,
) -> None:
    session.add(OutboxEvent(
        topic="payments.charge-requested",
        partition_key=str(customer_id),
        payload=json.dumps({
            "customer_id": str(customer_id),
            "amount": str(amount),
            "idempotency_key": idempotency_key,
        }),
    ))
    await session.flush()

Тогда SIGTERM в любой момент — outbox строка либо записана (relay отправит), либо нет (транзакция откатилась). Inconsistency невозможна. idempotency_key защищает от двойной отправки при retry.

Порядок закрытия в lifespan

R-SHUT-DB-X1: engine.dispose() — после задач, не до.

@asynccontextmanager
async def lifespan(app: FastAPI):
    ready = {"accepting": True}
    app.state.ready = ready

    relay_task = asyncio.create_task(outbox_relay(ready), name="outbox-relay")
    sync_task = asyncio.create_task(order_sync_worker(ready), name="order-sync")

    yield

    log.info("shutdown.begin", pod=os.environ.get("HOSTNAME"))

    ready["accepting"] = False
    relay_task.cancel()
    sync_task.cancel()

    await asyncio.gather(relay_task, sync_task, return_exceptions=True)
    log.info("shutdown.tasks_done")

    await producer.stop()        # flush + close Kafka producer
    await consumer.stop()        # commit offset + close

    engine.dispose()             # закрыть пул SQLAlchemy — последним
    log.info("shutdown.complete")

Что запрещено

АнтипаттернПравилоЧто взамен
scheduler.shutdown(wait=False)R-SHUT-SCHED-X1wait=True
task.cancel() без await taskR-SHUT-SCHED-1await asyncio.gather(task, return_exceptions=True)
while True в relay без readiness-флагаR-SHUT-SCHED-3while ready["accepting"]
CancelledError перехвачен и поглощёнR-SHUT-SCHED-2дожать критичную секцию, затем raise
engine.dispose() до отмены задачR-SHUT-DB-X1dispose — последним в lifespan
Долгий HTTP с retry inline в задачеR-SHUT-SCHED-2outbox + relay
asyncio.wait_for(task, timeout=0)R-SHUT-SCHED-1минимум 15–20s

Куда дальше

  • Бюджеты и observability — cumulative budget 60s, метрика app_shutdown_duration_seconds.
  • БД и persistence — engine.dispose(), транзакции на shutdown.
  • HTTP drain — uvicorn graceful, preStop sleep.
  • Идемпотентность in-flight — Idempotency-Key, outbox-дедуп.
  • Runtime/конфигурация uvicorn — --timeout-graceful-shutdown, readiness-флаг.
  • Kafka shutdown — consumer.stop(), producer.stop(), manual commit.
  • Kubernetes — terminationGracePeriodSeconds, preStop, probes.