← назад к разделу

Когда сервис получает SIGTERM, Uvicorn даёт активным запросам шанс завершиться — но только в рамках таймаута (обычно 60 секунд). Если долгая цепочка вызовов не укладывается, asyncio.CancelledError прерывает её на полуслове. Операция успела начаться, но не дошла до конца — и при перезапуске сервис попробует её повторить.

Это безопасно только если операция идемпотентна: повторный вызов с теми же данными даёт тот же результат, не создавая дубликатов. Именно об этом — статья.

Что такое идемпотентность и зачем она нужна

Представьте API платежей: клиент отправил запрос на списание, но ответ не получил — соединение упало. Что делать? Попробовать ещё раз? А если первый запрос всё-таки дошёл?

Если платёжный сервис не идемпотентен, повторный запрос создаст второе списание. Клиент заплатит дважды.

Идемпотентность решает это: если вы передаёте уникальный ключ операции (Idempotency-Key), сервер запоминает результат первого вызова и при повторном просто возвращает тот же ответ — без повторного действия.

При graceful shutdown та же проблема: SIGTERM прерывает запрос в произвольный момент. Новый под получит тот же запрос снова. Без идемпотентности — дубликат.

HTTP POST с Idempotency-Key в FastAPI

Клиент отправляет заголовок Idempotency-Key — один уникальный UUID на операцию. Даже если он повторит запрос десять раз, сервер вернёт сохранённый результат без повторного действия.

from fastapi import APIRouter, Header
from uuid import UUID

router = APIRouter()

@router.post("/payments", status_code=201)
async def charge_order(
    order_id: UUID,
    amount: int,
    idempotency_key: str = Header(..., alias="Idempotency-Key"),
    service: PaymentService = Depends(get_payment_service),
) -> PaymentReceipt:
    return await service.charge(order_id, amount, idempotency_key)

PaymentService.charge устроен так: сначала ищет запись по idempotency_key в базе. Если нашёл — возвращает сохранённый ответ, не делая повторного списания. Если нет — выполняет списание и сохраняет результат атомарно в одной транзакции.

Схема работы:

Клиент → POST /payments  Idempotency-Key: pay-abc
         FastAPI: создаёт запись, обрабатывает
         [SIGTERM, Uvicorn не успевает — таймаут]

Клиент → повтор: POST /payments  Idempotency-Key: pay-abc
         FastAPI (новый под): находит запись, возвращает сохранённый ответ
         Дубликата нет

Kafka-listener с защитой от повторной обработки

При работе с Kafka в ручном режиме (manual commit) offset подтверждается явно после обработки события. Риск: если CancelledError прилетит после side-effect, но до commit_offsets — при перезапуске сервис получит то же событие снова.

Защита — таблица processed_event: запись о том, что событие уже обработано, сохраняется в той же транзакции, что и само действие. При повторе обнаруживается конфликт по уникальному ключу — и обработка пропускается.

from aiokafka import AIOKafkaConsumer, TopicPartition
from sqlalchemy.ext.asyncio import AsyncSession

async def handle_order_confirmed(
    event: OrderConfirmedEvent,
    consumer: AIOKafkaConsumer,
    tp: TopicPartition,
    offset: int,
    session: AsyncSession,
) -> None:
    async with session.begin():
        already = await session.execute(
            select(ProcessedEvent).where(
                ProcessedEvent.event_id == event.event_id,
                ProcessedEvent.context == "billing",
            )
        )
        if already.scalar_one_or_none():
            await consumer.commit({tp: offset + 1})
            return
        session.add(ProcessedEvent(event_id=event.event_id, context="billing"))
        await billing_service.charge(
            order_id=event.order_id,
            amount=event.total_amount,
            idempotency_key=str(event.event_id),
        )
    await consumer.commit({tp: offset + 1})

Важная деталь: billing_service.charge тоже получает idempotency_key и передаёт его в httpx-запрос к платёжному провайдеру. Это двойная защита: и на уровне Kafka, и на уровне downstream-вызова.

Если транзакция откатилась из-за CancelledError — offset тоже не подтверждается, и при перезапуске событие придёт снова. Повторная вставка в processed_event упадёт по уникальному ограничению — обработка безопасно пропустится.

Outbox-relay с двух-фазным статусом

Outbox-паттерн используется для надёжной публикации событий в Kafka: события сначала пишутся в таблицу БД, а отдельный relay-процесс их оттуда читает и отправляет.

Проблема: если SIGTERM прервёт relay между отправкой в Kafka и пометкой строки как PUBLISHED, при перезапуске та же строка уйдёт повторно.

Решение — промежуточный статус PUBLISHING:

from sqlalchemy import update
from datetime import datetime, timezone

async def relay_batch(session: AsyncSession, producer: AIOKafkaProducer) -> int:
    now = datetime.now(timezone.utc)
    result = await session.execute(
        update(OutboxEvent)
        .where(OutboxEvent.status == "PENDING")
        .values(status="PUBLISHING", locked_at=now)
        .returning(OutboxEvent.id, OutboxEvent.payload, OutboxEvent.topic)
        .limit(50)
    )
    rows = result.fetchall()
    if not rows:
        return 0

    for row_id, payload, topic in rows:
        await producer.send_and_wait(topic, value=payload)
        await session.execute(
            update(OutboxEvent)
            .where(OutboxEvent.id == row_id)
            .values(status="PUBLISHED", published_at=now)
        )
    await session.commit()
    return len(rows)

Если SIGTERM случается между send_and_wait и UPDATE — строка зависает в статусе PUBLISHING. Отдельная фоновая задача через несколько минут возвращает такие строки обратно в PENDING. Kafka получит повторную отправку — но consumer-side processed_event защитит от дублирования.

Relay-цикл проверяет флаг готовности, а не крутится бесконечно:

async def outbox_loop(app_state: AppState) -> None:
    while app_state.is_ready:
        sent = await relay_batch(session, producer)
        if sent == 0:
            await asyncio.sleep(1.0)

Частая ошибка: httpx-retry без Idempotency-Key

# проблемный вариант
async def charge(order_id: UUID, amount: int) -> dict:
    async with httpx.AsyncClient() as client:
        for attempt in range(3):
            try:
                resp = await client.post(
                    f"{PAYMENT_URL}/charge",
                    json={"order_id": str(order_id), "amount": amount},
                    timeout=10.0,
                )
                resp.raise_for_status()
                return resp.json()
            except httpx.TransportError:
                if attempt == 2:
                    raise
                await asyncio.sleep(0.5)

При SIGTERM во время первой попытки: запрос ушёл, ответ не получен. Retry создаёт второй запрос. Провайдер обрабатывает оба — двойное списание.

Правильно — Idempotency-Key генерируется один раз до цикла и передаётся во всех попытках:

async def charge(order_id: UUID, amount: int, idempotency_key: str) -> dict:
    async with httpx.AsyncClient() as client:
        for attempt in range(3):
            try:
                resp = await client.post(
                    f"{PAYMENT_URL}/charge",
                    json={"order_id": str(order_id), "amount": amount},
                    headers={"Idempotency-Key": idempotency_key},
                    timeout=10.0,
                )
                resp.raise_for_status()
                return resp.json()
            except httpx.TransportError:
                if attempt == 2:
                    raise
                await asyncio.sleep(0.5)

CancelledError внутри транзакции

asyncio.CancelledError при завершении может прилететь в любом await. Если это случится внутри async with session.begin() после side-effect, но до commit — SQLAlchemy выполнит откат. Это правильно: offset тоже не подтверждается, повтор безопасен.

Если код перехватывает CancelledError — нужно либо пробросить его дальше, либо явно откатить транзакцию:

async def handle_customer_merge(event: CustomerMergeEvent, session: AsyncSession) -> None:
    try:
        async with session.begin():
            session.add(ProcessedEvent(event_id=event.event_id, context="crm"))
            await crm_service.merge(event.source_id, event.target_id)
    except asyncio.CancelledError:
        # откат выполнен context manager-ом; пробрасываем дальше
        raise

Главное правило: except asyncio.CancelledError: pass внутри транзакции — это всегда ошибка.

Коротко

  • Идемпотентность — повторный вызов с теми же данными даёт тот же результат без дубликатов. При graceful shutdown это обязательное свойство для любой операции, которую SIGTERM может прервать.
  • HTTP POSTIdempotency-Key в заголовке; FastAPI читает его через Header(...), сервис сохраняет результат атомарно в одной транзакции.
  • Kafka-listenerprocessed_event и side-effect коммитятся в одной транзакции; offset подтверждается только после commit. При откате — повтор безопасен.
  • Outbox-relay — двух-фазный статус PENDING → PUBLISHING → PUBLISHED; зависшие строки возвращаются в PENDING фоновой задачей.
  • httpx-retryIdempotency-Key генерируется один раз до retry-цикла и передаётся во всех попытках.
  • CancelledError внутри транзакции нужно пробрасывать, а не подавлять — иначе откат не произойдёт.

Что почитать дальше

  • Kafka: правильный shutdown consumer и producer
  • Scheduled-задачи и outbox-relay
  • База данных и persistence при завершении
  • HTTP drain и таймауты Uvicorn