Опирается на правила: R-SHUT-IDEM-1 и R-SHUT-IDEM-X1раздел 7. Идемпотентность in-flight.

Важно знать

  • Операции, которые SIGTERM может прервать, обязаны быть retry-safe (R-SHUT-IDEM-1).
  • Graceful-shutdown даёт время на завершение, но не гарантирует отсутствие partial-state при исчерпании бюджета.
  • HTTP POSTIdempotency-Key в заголовке обязателен (AUTH-19); FastAPI читает его через Header(...).
  • aiokafka listenerprocessed_event(event_id) коммитится в той же транзакции, что и side-effect; offset коммитится последним.
  • Outbox-relay — либо двух-фаза PENDING → PUBLISHING → PUBLISHED, либо receiver-side dedup через processed_event.
  • Money-операция с httpx-retry без Idempotency-Key — SIGTERM в момент retry → новый pod спишет повторно (R-SHUT-IDEM-X1).
  • Идемпотентность — последняя линия защиты, когда graceful не успел.

Uvicorn-graceful даёт операциям шанс завершиться, но shutdown — это deadline в 60 секунд. Долгий async-cascade (httpx retry × 3 × 30s) может не уложиться, asyncio.CancelledError прерывает посередине. Если операция не идемпотентна — partial state → инцидент. R-SHUT-IDEM-1 формулирует: каждая операция, которую graceful может прервать, обязана быть replay-safe.

Три типа in-flight операций

1. HTTP POST с Idempotency-Key

Client → POST /payments  Idempotency-Key: pay-abc
         FastAPI: создаёт idempotency_record, обрабатывает
         [SIGTERM посередине, uvicorn дожимает — timeout]

Client → retry: POST /payments  Idempotency-Key: pay-abc
         FastAPI (новый pod): находит idempotency_record, возвращает сохранённый response
         Дубля нет

FastAPI-маршрут обязан принимать ключ явно:

from fastapi import APIRouter, Header
from uuid import UUID

router = APIRouter()

@router.post("/payments", status_code=201)
async def charge_order(
    order_id: UUID,
    amount: int,
    idempotency_key: str = Header(..., alias="Idempotency-Key"),
    service: PaymentService = Depends(get_payment_service),
) -> PaymentReceipt:
    return await service.charge(order_id, amount, idempotency_key)

PaymentService.charge сначала ищет idempotency_record в БД. Если нашёл — возвращает сохранённый ответ без повторного списания. Если нет — выполняет списание и записывает результат атомарно.

2. aiokafka listener с processed_event

При ack-mode manual offset коммитится явно после side-effect. Риск: если CancelledError прилетит между side-effect и commit_offsets — restart получит тот же event снова.

Listener: получил event_id=EVT-XYZ
          Начал транзакцию:
            INSERT processed_event(event_id='EVT-XYZ', context='billing')
            await billing_svc.charge(...)   ← side-effect
          COMMIT
          await consumer.commit()           ← offset
          [SIGTERM после commit: всё хорошо]
          [SIGTERM между транзакцией и consumer.commit: rollback → offset не committed]

Restart:  Listener получил event_id=EVT-XYZ снова
          INSERT processed_event → UNIQUE violation → пропуск
          Дубля нет
from aiokafka import AIOKafkaConsumer, TopicPartition
from sqlalchemy.ext.asyncio import AsyncSession
from uuid import UUID

async def handle_order_confirmed(
    event: OrderConfirmedEvent,
    consumer: AIOKafkaConsumer,
    tp: TopicPartition,
    offset: int,
    session: AsyncSession,
) -> None:
    async with session.begin():
        already = await session.execute(
            select(ProcessedEvent).where(
                ProcessedEvent.event_id == event.event_id,
                ProcessedEvent.context == "billing",
            )
        )
        if already.scalar_one_or_none():
            await consumer.commit({tp: offset + 1})
            return
        session.add(ProcessedEvent(event_id=event.event_id, context="billing"))
        await billing_service.charge(
            order_id=event.order_id,
            amount=event.total_amount,
            idempotency_key=str(event.event_id),
        )
    await consumer.commit({tp: offset + 1})

processed_event и side-effect — одна транзакция. Если транзакция откатилась — offset тоже не коммитится, replay безопасен. billing_service.charge принимает idempotency_key и передаёт его в httpx-запрос к платёжному провайдеру — двойная защита.

3. Outbox-relay с двух-фазой

Relay: SELECT * FROM outbox_event WHERE status='PENDING' LIMIT 50 FOR UPDATE SKIP LOCKED
       Для каждого row:
         await producer.send('orders', value=payload)
         UPDATE outbox_event SET status='PUBLISHED' WHERE id=...
         [SIGTERM после send, до UPDATE]

Restart: SELECT * FROM outbox_event WHERE status='PENDING' LIMIT 50
         Тот же row → второй send
         Kafka получает дубль

Защита — промежуточный статус PUBLISHING:

from sqlalchemy import update, select
from datetime import datetime, timezone

async def relay_batch(session: AsyncSession, producer: AIOKafkaProducer) -> int:
    now = datetime.now(timezone.utc)
    result = await session.execute(
        update(OutboxEvent)
        .where(OutboxEvent.status == "PENDING")
        .values(status="PUBLISHING", locked_at=now)
        .returning(OutboxEvent.id, OutboxEvent.payload, OutboxEvent.topic)
        .limit(50)
    )
    rows = result.fetchall()
    if not rows:
        return 0

    for row_id, payload, topic in rows:
        await producer.send_and_wait(topic, value=payload)
        await session.execute(
            update(OutboxEvent)
            .where(OutboxEvent.id == row_id)
            .values(status="PUBLISHED", published_at=now)
        )
    await session.commit()
    return len(rows)

Если SIGTERM между send_and_wait и UPDATE — row остаётся в PUBLISHING. Cleanup-задача через N минут возвращает зависшие rows в PENDING. Kafka получит повторный send — consumer-side processed_event дедуплицирует.

Relay-цикл обязан проверять readiness-флаг, не while True:

async def outbox_loop(app_state: AppState) -> None:
    while app_state.is_ready:
        sent = await relay_batch(session, producer)
        if sent == 0:
            await asyncio.sleep(1.0)

Граничные случаи

httpx-retry без Idempotency-Key

# payment_client.py — платёж без ключа
async def charge(order_id: UUID, amount: int) -> dict:
    async with httpx.AsyncClient() as client:
        for attempt in range(3):
            try:
                resp = await client.post(
                    f"{PAYMENT_URL}/charge",
                    json={"order_id": str(order_id), "amount": amount},
                    timeout=10.0,
                )
                resp.raise_for_status()
                return resp.json()
            except httpx.TransportError:
                if attempt == 2:
                    raise
                await asyncio.sleep(0.5)

При SIGTERM в момент первого attempt: запрос ушёл, ответ не получен. Retry создаёт второй запрос. Провайдер обрабатывает оба — двойное списание.

Корректно — Idempotency-Key генерируется один раз, используется во всех retry:

async def charge(order_id: UUID, amount: int, idempotency_key: str) -> dict:
    async with httpx.AsyncClient() as client:
        for attempt in range(3):
            try:
                resp = await client.post(
                    f"{PAYMENT_URL}/charge",
                    json={"order_id": str(order_id), "amount": amount},
                    headers={"Idempotency-Key": idempotency_key},
                    timeout=10.0,
                )
                resp.raise_for_status()
                return resp.json()
            except httpx.TransportError:
                if attempt == 2:
                    raise
                await asyncio.sleep(0.5)

CancelledError внутри транзакции

asyncio.CancelledError при shutdown прилетает в любом await. Если он прилетит внутри async with session.begin() после side-effect, но до commit — SQLAlchemy выполнит rollback. Это правильное поведение: offset тоже не закоммитился → replay безопасен.

Если код перехватывает CancelledError и продолжает работу — он должен либо re-raise, либо явно откатить транзакцию:

async def handle_customer_merge(event: CustomerMergeEvent, session: AsyncSession) -> None:
    try:
        async with session.begin():
            session.add(ProcessedEvent(event_id=event.event_id, context="crm"))
            await crm_service.merge(event.source_id, event.target_id)
    except asyncio.CancelledError:
        # rollback выполнен context-manager-ом; re-raise обязателен
        raise

Что запрещено

АнтипаттернПравилоЧто взамен
httpx-retry без Idempotency-Key к money-endpointR-SHUT-IDEM-X1ключ генерируется до retry, передаётся заголовком
aiokafka offset commit до завершения side-effectR-SHUT-IDEM-1commit после session.commit()
Outbox-relay без receiver-side dedup и без двух-фазыR-SHUT-IDEM-1processed_event на consumer или статус PUBLISHING
except asyncio.CancelledError: pass внутри транзакцииR-SHUT-IDEM-1re-raise после критичной секции
Отдельный shutting_down: bool вместо readiness-флагаR-SHUT-CFG-X1единый AppState.is_ready, связанный с /health/ready
while True в outbox-relay без проверки readinessR-SHUT-SCHED-3while app_state.is_ready
Idempotency-Key только на стороне FastAPI, не к downstreamR-SHUT-IDEM-1end-to-end: client → FastAPI → downstream

Куда дальше

  • Бюджеты и observability — cumulative-бюджет, метрика app_shutdown_duration_seconds, структурный лог SIGTERM.
  • БД и persistence — порядок engine.dispose(), транзакции на shutdown.
  • HTTP drain — uvicorn graceful-timeout, preStop sleep, долгие эндпоинты → 202 Accepted.
  • Конфигурация uvicorn graceful shutdown — lifespan, readiness-флаг, раздельные probes.
  • Kafka shutdown — await consumer.stop() / producer.stop() в lifespan, manual commit.
  • Kubernetes — terminationGracePeriodSeconds: 60, preStop, maxUnavailable 0.
  • Scheduled / Async / Outbox — APScheduler wait=True, обработка CancelledError, outbox-relay цикл.