Опирается на правила:
R-SHUT-IDEM-1иR-SHUT-IDEM-X1— раздел 7. Идемпотентность in-flight.
Важно знать
- Операции, которые SIGTERM может прервать, обязаны быть retry-safe (
R-SHUT-IDEM-1).- Graceful-shutdown даёт время на завершение, но не гарантирует отсутствие partial-state при исчерпании бюджета.
- HTTP POST —
Idempotency-Keyв заголовке обязателен (AUTH-19); FastAPI читает его черезHeader(...).- aiokafka listener —
processed_event(event_id)коммитится в той же транзакции, что и side-effect; offset коммитится последним.- Outbox-relay — либо двух-фаза
PENDING → PUBLISHING → PUBLISHED, либо receiver-side dedup черезprocessed_event.- Money-операция с httpx-retry без
Idempotency-Key— SIGTERM в момент retry → новый pod спишет повторно (R-SHUT-IDEM-X1).- Идемпотентность — последняя линия защиты, когда graceful не успел.
Uvicorn-graceful даёт операциям шанс завершиться, но shutdown — это deadline в 60 секунд. Долгий async-cascade (httpx retry × 3 × 30s) может не уложиться, asyncio.CancelledError прерывает посередине. Если операция не идемпотентна — partial state → инцидент. R-SHUT-IDEM-1 формулирует: каждая операция, которую graceful может прервать, обязана быть replay-safe.
Три типа in-flight операций
1. HTTP POST с Idempotency-Key
Client → POST /payments Idempotency-Key: pay-abc
FastAPI: создаёт idempotency_record, обрабатывает
[SIGTERM посередине, uvicorn дожимает — timeout]
Client → retry: POST /payments Idempotency-Key: pay-abc
FastAPI (новый pod): находит idempotency_record, возвращает сохранённый response
Дубля нет
FastAPI-маршрут обязан принимать ключ явно:
from fastapi import APIRouter, Header
from uuid import UUID
router = APIRouter()
@router.post("/payments", status_code=201)
async def charge_order(
order_id: UUID,
amount: int,
idempotency_key: str = Header(..., alias="Idempotency-Key"),
service: PaymentService = Depends(get_payment_service),
) -> PaymentReceipt:
return await service.charge(order_id, amount, idempotency_key)
PaymentService.charge сначала ищет idempotency_record в БД. Если нашёл — возвращает сохранённый ответ без повторного списания. Если нет — выполняет списание и записывает результат атомарно.
2. aiokafka listener с processed_event
При ack-mode manual offset коммитится явно после side-effect. Риск: если CancelledError прилетит между side-effect и commit_offsets — restart получит тот же event снова.
Listener: получил event_id=EVT-XYZ
Начал транзакцию:
INSERT processed_event(event_id='EVT-XYZ', context='billing')
await billing_svc.charge(...) ← side-effect
COMMIT
await consumer.commit() ← offset
[SIGTERM после commit: всё хорошо]
[SIGTERM между транзакцией и consumer.commit: rollback → offset не committed]
Restart: Listener получил event_id=EVT-XYZ снова
INSERT processed_event → UNIQUE violation → пропуск
Дубля нет
from aiokafka import AIOKafkaConsumer, TopicPartition
from sqlalchemy.ext.asyncio import AsyncSession
from uuid import UUID
async def handle_order_confirmed(
event: OrderConfirmedEvent,
consumer: AIOKafkaConsumer,
tp: TopicPartition,
offset: int,
session: AsyncSession,
) -> None:
async with session.begin():
already = await session.execute(
select(ProcessedEvent).where(
ProcessedEvent.event_id == event.event_id,
ProcessedEvent.context == "billing",
)
)
if already.scalar_one_or_none():
await consumer.commit({tp: offset + 1})
return
session.add(ProcessedEvent(event_id=event.event_id, context="billing"))
await billing_service.charge(
order_id=event.order_id,
amount=event.total_amount,
idempotency_key=str(event.event_id),
)
await consumer.commit({tp: offset + 1})
processed_event и side-effect — одна транзакция. Если транзакция откатилась — offset тоже не коммитится, replay безопасен. billing_service.charge принимает idempotency_key и передаёт его в httpx-запрос к платёжному провайдеру — двойная защита.
3. Outbox-relay с двух-фазой
Relay: SELECT * FROM outbox_event WHERE status='PENDING' LIMIT 50 FOR UPDATE SKIP LOCKED
Для каждого row:
await producer.send('orders', value=payload)
UPDATE outbox_event SET status='PUBLISHED' WHERE id=...
[SIGTERM после send, до UPDATE]
Restart: SELECT * FROM outbox_event WHERE status='PENDING' LIMIT 50
Тот же row → второй send
Kafka получает дубль
Защита — промежуточный статус PUBLISHING:
from sqlalchemy import update, select
from datetime import datetime, timezone
async def relay_batch(session: AsyncSession, producer: AIOKafkaProducer) -> int:
now = datetime.now(timezone.utc)
result = await session.execute(
update(OutboxEvent)
.where(OutboxEvent.status == "PENDING")
.values(status="PUBLISHING", locked_at=now)
.returning(OutboxEvent.id, OutboxEvent.payload, OutboxEvent.topic)
.limit(50)
)
rows = result.fetchall()
if not rows:
return 0
for row_id, payload, topic in rows:
await producer.send_and_wait(topic, value=payload)
await session.execute(
update(OutboxEvent)
.where(OutboxEvent.id == row_id)
.values(status="PUBLISHED", published_at=now)
)
await session.commit()
return len(rows)
Если SIGTERM между send_and_wait и UPDATE — row остаётся в PUBLISHING. Cleanup-задача через N минут возвращает зависшие rows в PENDING. Kafka получит повторный send — consumer-side processed_event дедуплицирует.
Relay-цикл обязан проверять readiness-флаг, не while True:
async def outbox_loop(app_state: AppState) -> None:
while app_state.is_ready:
sent = await relay_batch(session, producer)
if sent == 0:
await asyncio.sleep(1.0)
Граничные случаи
httpx-retry без Idempotency-Key
# payment_client.py — платёж без ключа
async def charge(order_id: UUID, amount: int) -> dict:
async with httpx.AsyncClient() as client:
for attempt in range(3):
try:
resp = await client.post(
f"{PAYMENT_URL}/charge",
json={"order_id": str(order_id), "amount": amount},
timeout=10.0,
)
resp.raise_for_status()
return resp.json()
except httpx.TransportError:
if attempt == 2:
raise
await asyncio.sleep(0.5)
При SIGTERM в момент первого attempt: запрос ушёл, ответ не получен. Retry создаёт второй запрос. Провайдер обрабатывает оба — двойное списание.
Корректно — Idempotency-Key генерируется один раз, используется во всех retry:
async def charge(order_id: UUID, amount: int, idempotency_key: str) -> dict:
async with httpx.AsyncClient() as client:
for attempt in range(3):
try:
resp = await client.post(
f"{PAYMENT_URL}/charge",
json={"order_id": str(order_id), "amount": amount},
headers={"Idempotency-Key": idempotency_key},
timeout=10.0,
)
resp.raise_for_status()
return resp.json()
except httpx.TransportError:
if attempt == 2:
raise
await asyncio.sleep(0.5)
CancelledError внутри транзакции
asyncio.CancelledError при shutdown прилетает в любом await. Если он прилетит внутри async with session.begin() после side-effect, но до commit — SQLAlchemy выполнит rollback. Это правильное поведение: offset тоже не закоммитился → replay безопасен.
Если код перехватывает CancelledError и продолжает работу — он должен либо re-raise, либо явно откатить транзакцию:
async def handle_customer_merge(event: CustomerMergeEvent, session: AsyncSession) -> None:
try:
async with session.begin():
session.add(ProcessedEvent(event_id=event.event_id, context="crm"))
await crm_service.merge(event.source_id, event.target_id)
except asyncio.CancelledError:
# rollback выполнен context-manager-ом; re-raise обязателен
raise
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
httpx-retry без Idempotency-Key к money-endpoint | R-SHUT-IDEM-X1 | ключ генерируется до retry, передаётся заголовком |
| aiokafka offset commit до завершения side-effect | R-SHUT-IDEM-1 | commit после session.commit() |
| Outbox-relay без receiver-side dedup и без двух-фазы | R-SHUT-IDEM-1 | processed_event на consumer или статус PUBLISHING |
except asyncio.CancelledError: pass внутри транзакции | R-SHUT-IDEM-1 | re-raise после критичной секции |
Отдельный shutting_down: bool вместо readiness-флага | R-SHUT-CFG-X1 | единый AppState.is_ready, связанный с /health/ready |
while True в outbox-relay без проверки readiness | R-SHUT-SCHED-3 | while app_state.is_ready |
| Idempotency-Key только на стороне FastAPI, не к downstream | R-SHUT-IDEM-1 | end-to-end: client → FastAPI → downstream |
Куда дальше
- Бюджеты и observability — cumulative-бюджет, метрика
app_shutdown_duration_seconds, структурный лог SIGTERM. - БД и persistence — порядок
engine.dispose(), транзакции на shutdown. - HTTP drain — uvicorn graceful-timeout, preStop sleep, долгие эндпоинты → 202 Accepted.
- Конфигурация uvicorn graceful shutdown — lifespan, readiness-флаг, раздельные probes.
- Kafka shutdown —
await consumer.stop()/producer.stop()в lifespan, manual commit. - Kubernetes —
terminationGracePeriodSeconds: 60, preStop, maxUnavailable 0. - Scheduled / Async / Outbox — APScheduler
wait=True, обработкаCancelledError, outbox-relay цикл.