Когда сервис получает SIGTERM, Uvicorn даёт активным запросам шанс завершиться — но только в рамках таймаута (обычно 60 секунд). Если долгая цепочка вызовов не укладывается, asyncio.CancelledError прерывает её на полуслове. Операция успела начаться, но не дошла до конца — и при перезапуске сервис попробует её повторить.
Это безопасно только если операция идемпотентна: повторный вызов с теми же данными даёт тот же результат, не создавая дубликатов. Именно об этом — статья.
Что такое идемпотентность и зачем она нужна
Представьте API платежей: клиент отправил запрос на списание, но ответ не получил — соединение упало. Что делать? Попробовать ещё раз? А если первый запрос всё-таки дошёл?
Если платёжный сервис не идемпотентен, повторный запрос создаст второе списание. Клиент заплатит дважды.
Идемпотентность решает это: если вы передаёте уникальный ключ операции (Idempotency-Key), сервер запоминает результат первого вызова и при повторном просто возвращает тот же ответ — без повторного действия.
При graceful shutdown та же проблема: SIGTERM прерывает запрос в произвольный момент. Новый под получит тот же запрос снова. Без идемпотентности — дубликат.
HTTP POST с Idempotency-Key в FastAPI
Клиент отправляет заголовок Idempotency-Key — один уникальный UUID на операцию. Даже если он повторит запрос десять раз, сервер вернёт сохранённый результат без повторного действия.
from fastapi import APIRouter, Header
from uuid import UUID
router = APIRouter()
@router.post("/payments", status_code=201)
async def charge_order(
order_id: UUID,
amount: int,
idempotency_key: str = Header(..., alias="Idempotency-Key"),
service: PaymentService = Depends(get_payment_service),
) -> PaymentReceipt:
return await service.charge(order_id, amount, idempotency_key)
PaymentService.charge устроен так: сначала ищет запись по idempotency_key в базе. Если нашёл — возвращает сохранённый ответ, не делая повторного списания. Если нет — выполняет списание и сохраняет результат атомарно в одной транзакции.
Схема работы:
Клиент → POST /payments Idempotency-Key: pay-abc
FastAPI: создаёт запись, обрабатывает
[SIGTERM, Uvicorn не успевает — таймаут]
Клиент → повтор: POST /payments Idempotency-Key: pay-abc
FastAPI (новый под): находит запись, возвращает сохранённый ответ
Дубликата нет
Kafka-listener с защитой от повторной обработки
При работе с Kafka в ручном режиме (manual commit) offset подтверждается явно после обработки события. Риск: если CancelledError прилетит после side-effect, но до commit_offsets — при перезапуске сервис получит то же событие снова.
Защита — таблица processed_event: запись о том, что событие уже обработано, сохраняется в той же транзакции, что и само действие. При повторе обнаруживается конфликт по уникальному ключу — и обработка пропускается.
from aiokafka import AIOKafkaConsumer, TopicPartition
from sqlalchemy.ext.asyncio import AsyncSession
async def handle_order_confirmed(
event: OrderConfirmedEvent,
consumer: AIOKafkaConsumer,
tp: TopicPartition,
offset: int,
session: AsyncSession,
) -> None:
async with session.begin():
already = await session.execute(
select(ProcessedEvent).where(
ProcessedEvent.event_id == event.event_id,
ProcessedEvent.context == "billing",
)
)
if already.scalar_one_or_none():
await consumer.commit({tp: offset + 1})
return
session.add(ProcessedEvent(event_id=event.event_id, context="billing"))
await billing_service.charge(
order_id=event.order_id,
amount=event.total_amount,
idempotency_key=str(event.event_id),
)
await consumer.commit({tp: offset + 1})
Важная деталь: billing_service.charge тоже получает idempotency_key и передаёт его в httpx-запрос к платёжному провайдеру. Это двойная защита: и на уровне Kafka, и на уровне downstream-вызова.
Если транзакция откатилась из-за CancelledError — offset тоже не подтверждается, и при перезапуске событие придёт снова. Повторная вставка в processed_event упадёт по уникальному ограничению — обработка безопасно пропустится.
Outbox-relay с двух-фазным статусом
Outbox-паттерн используется для надёжной публикации событий в Kafka: события сначала пишутся в таблицу БД, а отдельный relay-процесс их оттуда читает и отправляет.
Проблема: если SIGTERM прервёт relay между отправкой в Kafka и пометкой строки как PUBLISHED, при перезапуске та же строка уйдёт повторно.
Решение — промежуточный статус PUBLISHING:
from sqlalchemy import update
from datetime import datetime, timezone
async def relay_batch(session: AsyncSession, producer: AIOKafkaProducer) -> int:
now = datetime.now(timezone.utc)
result = await session.execute(
update(OutboxEvent)
.where(OutboxEvent.status == "PENDING")
.values(status="PUBLISHING", locked_at=now)
.returning(OutboxEvent.id, OutboxEvent.payload, OutboxEvent.topic)
.limit(50)
)
rows = result.fetchall()
if not rows:
return 0
for row_id, payload, topic in rows:
await producer.send_and_wait(topic, value=payload)
await session.execute(
update(OutboxEvent)
.where(OutboxEvent.id == row_id)
.values(status="PUBLISHED", published_at=now)
)
await session.commit()
return len(rows)
Если SIGTERM случается между send_and_wait и UPDATE — строка зависает в статусе PUBLISHING. Отдельная фоновая задача через несколько минут возвращает такие строки обратно в PENDING. Kafka получит повторную отправку — но consumer-side processed_event защитит от дублирования.
Relay-цикл проверяет флаг готовности, а не крутится бесконечно:
async def outbox_loop(app_state: AppState) -> None:
while app_state.is_ready:
sent = await relay_batch(session, producer)
if sent == 0:
await asyncio.sleep(1.0)
Частая ошибка: httpx-retry без Idempotency-Key
# проблемный вариант
async def charge(order_id: UUID, amount: int) -> dict:
async with httpx.AsyncClient() as client:
for attempt in range(3):
try:
resp = await client.post(
f"{PAYMENT_URL}/charge",
json={"order_id": str(order_id), "amount": amount},
timeout=10.0,
)
resp.raise_for_status()
return resp.json()
except httpx.TransportError:
if attempt == 2:
raise
await asyncio.sleep(0.5)
При SIGTERM во время первой попытки: запрос ушёл, ответ не получен. Retry создаёт второй запрос. Провайдер обрабатывает оба — двойное списание.
Правильно — Idempotency-Key генерируется один раз до цикла и передаётся во всех попытках:
async def charge(order_id: UUID, amount: int, idempotency_key: str) -> dict:
async with httpx.AsyncClient() as client:
for attempt in range(3):
try:
resp = await client.post(
f"{PAYMENT_URL}/charge",
json={"order_id": str(order_id), "amount": amount},
headers={"Idempotency-Key": idempotency_key},
timeout=10.0,
)
resp.raise_for_status()
return resp.json()
except httpx.TransportError:
if attempt == 2:
raise
await asyncio.sleep(0.5)
CancelledError внутри транзакции
asyncio.CancelledError при завершении может прилететь в любом await. Если это случится внутри async with session.begin() после side-effect, но до commit — SQLAlchemy выполнит откат. Это правильно: offset тоже не подтверждается, повтор безопасен.
Если код перехватывает CancelledError — нужно либо пробросить его дальше, либо явно откатить транзакцию:
async def handle_customer_merge(event: CustomerMergeEvent, session: AsyncSession) -> None:
try:
async with session.begin():
session.add(ProcessedEvent(event_id=event.event_id, context="crm"))
await crm_service.merge(event.source_id, event.target_id)
except asyncio.CancelledError:
# откат выполнен context manager-ом; пробрасываем дальше
raise
Главное правило: except asyncio.CancelledError: pass внутри транзакции — это всегда ошибка.
Коротко
- Идемпотентность — повторный вызов с теми же данными даёт тот же результат без дубликатов. При graceful shutdown это обязательное свойство для любой операции, которую SIGTERM может прервать.
- HTTP POST —
Idempotency-Keyв заголовке; FastAPI читает его черезHeader(...), сервис сохраняет результат атомарно в одной транзакции. - Kafka-listener —
processed_eventи side-effect коммитятся в одной транзакции; offset подтверждается только после commit. При откате — повтор безопасен. - Outbox-relay — двух-фазный статус
PENDING → PUBLISHING → PUBLISHED; зависшие строки возвращаются вPENDINGфоновой задачей. - httpx-retry —
Idempotency-Keyгенерируется один раз до retry-цикла и передаётся во всех попытках. CancelledErrorвнутри транзакции нужно пробрасывать, а не подавлять — иначе откат не произойдёт.
Что почитать дальше
- Kafka: правильный shutdown consumer и producer
- Scheduled-задачи и outbox-relay
- База данных и persistence при завершении
- HTTP drain и таймауты Uvicorn