Когда Kubernetes посылает сигнал SIGTERM, приложение получает команду: «заверши работу аккуратно». HTTP-запросы ещё можно дообслужить, базу закрыть правильно — но что делать с фоновыми задачами, которые работают в цикле?
Это самое опасное место при остановке. Если убить задачу посередине, транзакция откатится, но побочный эффект — отправленный HTTP-запрос или сообщение в Kafka — уже случился. Разберём по частям, как это решать.
Чем опасна внезапная остановка asyncio-задачи
Представьте фоновую задачу, которая каждые полсекунды обрабатывает очередь заказов. Внутри она делает запрос к базе данных и отправляет событие во внешний сервис.
Если SIGTERM придёт в середине — между отправкой и коммитом в базу — получится несогласованность: внешний сервис думает, что заказ обработан, а база — нет. При следующем старте задача обработает тот же заказ снова.
Решение: дать задаче завершить текущую итерацию, но не начинать новую.
Как правильно останавливать asyncio.Task
В FastAPI фоновые задачи создают через asyncio.create_task() и запускают в lifespan-функции — обработчике жизненного цикла приложения.
Вот типовой пример с правильной остановкой:
from contextlib import asynccontextmanager
from fastapi import FastAPI
import asyncio
import structlog
log = structlog.get_logger()
async def order_sync_worker(ready: dict) -> None:
while ready["accepting"]:
try:
await _process_pending_orders()
await asyncio.sleep(0.5)
except asyncio.CancelledError:
log.info("order_sync_worker.cancelled")
await _process_pending_orders() # дожать последнюю итерацию
raise
except Exception:
log.exception("order_sync_worker.error")
await asyncio.sleep(2)
@asynccontextmanager
async def lifespan(app: FastAPI):
ready = {"accepting": True}
app.state.ready = ready
task = asyncio.create_task(order_sync_worker(ready), name="order-sync")
log.info("order_sync_worker.started")
yield
log.info("shutdown.begin")
ready["accepting"] = False
task.cancel()
await asyncio.gather(task, return_exceptions=True)
log.info("shutdown.tasks_done")
Что происходит при SIGTERM:
- uvicorn получает SIGTERM и входит в фазу завершения lifespan.
ready["accepting"] = False— задача не начнёт новую итерацию после текущей.task.cancel()— в задачу бросается исключениеCancelledError.- Задача перехватывает
CancelledError, дожимает текущую порцию работы, затем делаетraise— возвращает исключение наверх. await asyncio.gather(task, return_exceptions=True)— ждём, пока задача действительно завершится.
Частая ошибка — написать task.cancel() и не дождаться: задача продолжает работать ещё некоторое время, но приложение уже считает себя остановленным. Всегда нужен await после отмены.
Ещё одна ошибка — перехватить CancelledError и поглотить его (не делать raise). Тогда задача останется активной и gather будет ждать вечно.
APScheduler: не убивать задачу посередине
Если фоновая логика реализована через APScheduler, принцип тот же — нужно дать текущему запуску завершиться:
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from contextlib import asynccontextmanager
from fastapi import FastAPI
import structlog
log = structlog.get_logger()
async def sync_products() -> None:
async with async_session() as session:
pending = await session.execute(
select(Product).where(Product.synced_at.is_(None)).limit(20)
)
for product in pending.scalars():
await _push_to_warehouse(product)
product.synced_at = datetime.now(timezone.utc)
await session.commit()
@asynccontextmanager
async def lifespan(app: FastAPI):
scheduler = AsyncIOScheduler()
scheduler.add_job(sync_products, "interval", seconds=1, id="product-sync")
scheduler.start()
log.info("scheduler.started")
yield
log.info("shutdown.scheduler.begin")
scheduler.shutdown(wait=True) # дождаться текущего запуска
log.info("shutdown.scheduler.done")
scheduler.shutdown(wait=True) — APScheduler ждёт завершения исполняемого задания. Новые запуски при этом не происходят. Если написать wait=False, планировщик убьёт задание немедленно — прямо посередине цикла сохранения в базу.
Outbox-relay: readiness-флаг вместо while True
Relay — это задача, которая читает события из таблицы-очереди (outbox) и отправляет их в Kafka или другой брокер. Работает в вечном цикле.
Плохой вариант:
# нельзя остановить без жёсткого прерывания посередине
async def relay_loop() -> None:
while True:
await _publish_batch()
await asyncio.sleep(0.1)
Правильный — через readiness-флаг, как в примере выше с order_sync_worker:
async def outbox_relay(ready: dict) -> None:
while ready["accepting"]:
try:
published = await _publish_outbox_batch()
if published == 0:
await asyncio.sleep(0.5)
except asyncio.CancelledError:
log.info("outbox_relay.cancelled, finishing current batch")
await _publish_outbox_batch() # дожать последний пакет
raise
except Exception:
log.exception("outbox_relay.error")
await asyncio.sleep(2)
async def _publish_outbox_batch() -> int:
async with async_session() as session:
async with session.begin():
rows = await session.execute(
select(OutboxEvent)
.where(OutboxEvent.published_at.is_(None))
.order_by(OutboxEvent.id)
.limit(50)
.with_for_update(skip_locked=True)
)
events = rows.scalars().all()
if not events:
return 0
for event in events:
await producer.send_and_wait(
event.topic,
key=event.partition_key.encode(),
value=event.payload.encode(),
)
event.published_at = datetime.now(timezone.utc)
return len(events)
.with_for_update(skip_locked=True) — пессимистическая блокировка строк. Пока один pod обрабатывает пакет, другой pod не возьмёт те же строки. При SIGTERM текущий пакет дожимается, незаблокированные строки подхватит другой pod.
Долгий HTTP-вызов внутри задачи — опасная зона
Иногда фоновая задача делает внешний HTTP-вызов с повторными попытками — например, списание с карты:
# опасно: SIGTERM в момент retry → платёж прошёл, база не обновлена
async def charge_customer(customer_id: UUID, amount: Decimal) -> None:
result = await payment_client.charge(customer_id, amount) # 200 OK
await session.commit() # если SIGTERM здесь — несогласованность
Безопасный подход — записать намерение в outbox и выйти. Relay сам отправит, когда придёт время:
async def enqueue_charge(
session: AsyncSession,
customer_id: UUID,
amount: Decimal,
idempotency_key: str,
) -> None:
session.add(OutboxEvent(
topic="payments.charge-requested",
partition_key=str(customer_id),
payload=json.dumps({
"customer_id": str(customer_id),
"amount": str(amount),
"idempotency_key": idempotency_key,
}),
))
await session.flush()
Теперь SIGTERM в любой момент безопасен: либо строка записана в outbox (relay её отправит), либо транзакция откатилась (строки нет). idempotency_key защищает от двойной отправки при повторных попытках.
Порядок закрытия ресурсов
Важная деталь: пул соединений с базой (engine.dispose()) нужно закрывать после того, как все задачи завершились, а не до. Задачи ещё могут выполнять запросы в базу при дожатии — пул им нужен.
@asynccontextmanager
async def lifespan(app: FastAPI):
ready = {"accepting": True}
app.state.ready = ready
relay_task = asyncio.create_task(outbox_relay(ready), name="outbox-relay")
sync_task = asyncio.create_task(order_sync_worker(ready), name="order-sync")
yield
log.info("shutdown.begin")
ready["accepting"] = False
relay_task.cancel()
sync_task.cancel()
await asyncio.gather(relay_task, sync_task, return_exceptions=True)
log.info("shutdown.tasks_done")
await producer.stop() # сбросить буфер и закрыть Kafka producer
await consumer.stop() # зафиксировать смещения и закрыть
await engine.dispose() # закрыть пул SQLAlchemy — последним
log.info("shutdown.complete")
Коротко
task.cancel()безawait— задача зависает. Всегда используйawait asyncio.gather(task, return_exceptions=True).CancelledErrorнужно перехватить, дожать критичную секцию (commit, отправка), затем сделатьraise.APScheduler— всегдаscheduler.shutdown(wait=True).wait=Falseубивает задание посередине итерации.- Relay-цикл должен проверять readiness-флаг (
while ready["accepting"]), а не бытьwhile True. - Долгий HTTP с повторными попытками — выносить в outbox, не делать inline в фоновой задаче.
engine.dispose()— самый последний шаг. Задачи при дожатии ещё используют базу.
Что почитать дальше
- Бюджеты и наблюдаемость — суммарный бюджет 60 секунд и метрики остановки.
- База данных и persistence —
engine.dispose(), транзакции при остановке. - HTTP drain — uvicorn graceful, preStop sleep.
- Идемпотентность in-flight — защита от двойной обработки при retry.
- Kafka при остановке —
consumer.stop(),producer.stop(), ручной коммит.