← назад к разделу

Когда Kubernetes посылает сигнал SIGTERM, приложение получает команду: «заверши работу аккуратно». HTTP-запросы ещё можно дообслужить, базу закрыть правильно — но что делать с фоновыми задачами, которые работают в цикле?

Это самое опасное место при остановке. Если убить задачу посередине, транзакция откатится, но побочный эффект — отправленный HTTP-запрос или сообщение в Kafka — уже случился. Разберём по частям, как это решать.

Чем опасна внезапная остановка asyncio-задачи

Представьте фоновую задачу, которая каждые полсекунды обрабатывает очередь заказов. Внутри она делает запрос к базе данных и отправляет событие во внешний сервис.

Если SIGTERM придёт в середине — между отправкой и коммитом в базу — получится несогласованность: внешний сервис думает, что заказ обработан, а база — нет. При следующем старте задача обработает тот же заказ снова.

Решение: дать задаче завершить текущую итерацию, но не начинать новую.

Как правильно останавливать asyncio.Task

В FastAPI фоновые задачи создают через asyncio.create_task() и запускают в lifespan-функции — обработчике жизненного цикла приложения.

Вот типовой пример с правильной остановкой:

from contextlib import asynccontextmanager
from fastapi import FastAPI
import asyncio
import structlog

log = structlog.get_logger()

async def order_sync_worker(ready: dict) -> None:
    while ready["accepting"]:
        try:
            await _process_pending_orders()
            await asyncio.sleep(0.5)
        except asyncio.CancelledError:
            log.info("order_sync_worker.cancelled")
            await _process_pending_orders()   # дожать последнюю итерацию
            raise
        except Exception:
            log.exception("order_sync_worker.error")
            await asyncio.sleep(2)

@asynccontextmanager
async def lifespan(app: FastAPI):
    ready = {"accepting": True}
    app.state.ready = ready

    task = asyncio.create_task(order_sync_worker(ready), name="order-sync")
    log.info("order_sync_worker.started")

    yield

    log.info("shutdown.begin")
    ready["accepting"] = False
    task.cancel()
    await asyncio.gather(task, return_exceptions=True)
    log.info("shutdown.tasks_done")

Что происходит при SIGTERM:

  1. uvicorn получает SIGTERM и входит в фазу завершения lifespan.
  2. ready["accepting"] = False — задача не начнёт новую итерацию после текущей.
  3. task.cancel() — в задачу бросается исключение CancelledError.
  4. Задача перехватывает CancelledError, дожимает текущую порцию работы, затем делает raise — возвращает исключение наверх.
  5. await asyncio.gather(task, return_exceptions=True) — ждём, пока задача действительно завершится.

Частая ошибка — написать task.cancel() и не дождаться: задача продолжает работать ещё некоторое время, но приложение уже считает себя остановленным. Всегда нужен await после отмены.

Ещё одна ошибка — перехватить CancelledError и поглотить его (не делать raise). Тогда задача останется активной и gather будет ждать вечно.

APScheduler: не убивать задачу посередине

Если фоновая логика реализована через APScheduler, принцип тот же — нужно дать текущему запуску завершиться:

from apscheduler.schedulers.asyncio import AsyncIOScheduler
from contextlib import asynccontextmanager
from fastapi import FastAPI
import structlog

log = structlog.get_logger()

async def sync_products() -> None:
    async with async_session() as session:
        pending = await session.execute(
            select(Product).where(Product.synced_at.is_(None)).limit(20)
        )
        for product in pending.scalars():
            await _push_to_warehouse(product)
            product.synced_at = datetime.now(timezone.utc)
        await session.commit()

@asynccontextmanager
async def lifespan(app: FastAPI):
    scheduler = AsyncIOScheduler()
    scheduler.add_job(sync_products, "interval", seconds=1, id="product-sync")
    scheduler.start()
    log.info("scheduler.started")

    yield

    log.info("shutdown.scheduler.begin")
    scheduler.shutdown(wait=True)   # дождаться текущего запуска
    log.info("shutdown.scheduler.done")

scheduler.shutdown(wait=True) — APScheduler ждёт завершения исполняемого задания. Новые запуски при этом не происходят. Если написать wait=False, планировщик убьёт задание немедленно — прямо посередине цикла сохранения в базу.

Outbox-relay: readiness-флаг вместо while True

Relay — это задача, которая читает события из таблицы-очереди (outbox) и отправляет их в Kafka или другой брокер. Работает в вечном цикле.

Плохой вариант:

# нельзя остановить без жёсткого прерывания посередине
async def relay_loop() -> None:
    while True:
        await _publish_batch()
        await asyncio.sleep(0.1)

Правильный — через readiness-флаг, как в примере выше с order_sync_worker:

async def outbox_relay(ready: dict) -> None:
    while ready["accepting"]:
        try:
            published = await _publish_outbox_batch()
            if published == 0:
                await asyncio.sleep(0.5)
        except asyncio.CancelledError:
            log.info("outbox_relay.cancelled, finishing current batch")
            await _publish_outbox_batch()   # дожать последний пакет
            raise
        except Exception:
            log.exception("outbox_relay.error")
            await asyncio.sleep(2)

async def _publish_outbox_batch() -> int:
    async with async_session() as session:
        async with session.begin():
            rows = await session.execute(
                select(OutboxEvent)
                .where(OutboxEvent.published_at.is_(None))
                .order_by(OutboxEvent.id)
                .limit(50)
                .with_for_update(skip_locked=True)
            )
            events = rows.scalars().all()
            if not events:
                return 0

            for event in events:
                await producer.send_and_wait(
                    event.topic,
                    key=event.partition_key.encode(),
                    value=event.payload.encode(),
                )
                event.published_at = datetime.now(timezone.utc)

            return len(events)

.with_for_update(skip_locked=True) — пессимистическая блокировка строк. Пока один pod обрабатывает пакет, другой pod не возьмёт те же строки. При SIGTERM текущий пакет дожимается, незаблокированные строки подхватит другой pod.

Долгий HTTP-вызов внутри задачи — опасная зона

Иногда фоновая задача делает внешний HTTP-вызов с повторными попытками — например, списание с карты:

# опасно: SIGTERM в момент retry → платёж прошёл, база не обновлена
async def charge_customer(customer_id: UUID, amount: Decimal) -> None:
    result = await payment_client.charge(customer_id, amount)  # 200 OK
    await session.commit()  # если SIGTERM здесь — несогласованность

Безопасный подход — записать намерение в outbox и выйти. Relay сам отправит, когда придёт время:

async def enqueue_charge(
    session: AsyncSession,
    customer_id: UUID,
    amount: Decimal,
    idempotency_key: str,
) -> None:
    session.add(OutboxEvent(
        topic="payments.charge-requested",
        partition_key=str(customer_id),
        payload=json.dumps({
            "customer_id": str(customer_id),
            "amount": str(amount),
            "idempotency_key": idempotency_key,
        }),
    ))
    await session.flush()

Теперь SIGTERM в любой момент безопасен: либо строка записана в outbox (relay её отправит), либо транзакция откатилась (строки нет). idempotency_key защищает от двойной отправки при повторных попытках.

Порядок закрытия ресурсов

Важная деталь: пул соединений с базой (engine.dispose()) нужно закрывать после того, как все задачи завершились, а не до. Задачи ещё могут выполнять запросы в базу при дожатии — пул им нужен.

@asynccontextmanager
async def lifespan(app: FastAPI):
    ready = {"accepting": True}
    app.state.ready = ready

    relay_task = asyncio.create_task(outbox_relay(ready), name="outbox-relay")
    sync_task = asyncio.create_task(order_sync_worker(ready), name="order-sync")

    yield

    log.info("shutdown.begin")

    ready["accepting"] = False
    relay_task.cancel()
    sync_task.cancel()

    await asyncio.gather(relay_task, sync_task, return_exceptions=True)
    log.info("shutdown.tasks_done")

    await producer.stop()        # сбросить буфер и закрыть Kafka producer
    await consumer.stop()        # зафиксировать смещения и закрыть

    await engine.dispose()       # закрыть пул SQLAlchemy — последним
    log.info("shutdown.complete")

Коротко

  • task.cancel() без await — задача зависает. Всегда используй await asyncio.gather(task, return_exceptions=True).
  • CancelledError нужно перехватить, дожать критичную секцию (commit, отправка), затем сделать raise.
  • APScheduler — всегда scheduler.shutdown(wait=True). wait=False убивает задание посередине итерации.
  • Relay-цикл должен проверять readiness-флаг (while ready["accepting"]), а не быть while True.
  • Долгий HTTP с повторными попытками — выносить в outbox, не делать inline в фоновой задаче.
  • engine.dispose() — самый последний шаг. Задачи при дожатии ещё используют базу.

Что почитать дальше

  • Бюджеты и наблюдаемость — суммарный бюджет 60 секунд и метрики остановки.
  • База данных и persistence — engine.dispose(), транзакции при остановке.
  • HTTP drain — uvicorn graceful, preStop sleep.
  • Идемпотентность in-flight — защита от двойной обработки при retry.
  • Kafka при остановке — consumer.stop(), producer.stop(), ручной коммит.