Опирается на правила:
R-SHUT-KFK-1…R-SHUT-KFK-4иR-SHUT-KFK-X1из Graceful Shutdown Style Guide → раздел 3. Kafka shutdown.
Важно знать
await consumer.stop()в lifespan-shutdown дожимает текущее сообщение и коммитит offset перед остановкой.enable_auto_commit=False— обязательно; auto-commit может зафиксировать offset до завершения обработки.- Manual commit —
await consumer.commit()явно после успешной обработки каждого сообщения или batch.- Listener не запускает долгий cascade — HTTP-вызовы с retry на 20+ секунд не уложатся в shutdown; cascade выносится в outbox.
await producer.stop()— flush all pending batches + close connection; без этого отправленные но не сброшенные сообщения теряются.- Порядок в lifespan: сначала отменить consumer-задачу и дождаться
consumer.stop(), затемproducer.stop(), в конце —engine.dispose().asyncio.CancelledErrorв consumer-задаче — поймать, коммит не делать (offset не закоммичен → replay защищён идемпотентностью).
Kafka shutdown — место, где легко потерять данные или получить дубли. Если consumer убит до commit offset — следующий старт получит те же сообщения (replay). Если producer убит до flush — отправленные batch теряются. UCP формулирует одно условие: дожать сообщение, явный commit, flush producer — и только после этого закрыть пул БД.
Consumer stop в lifespan-shutdown
R-SHUT-KFK-1: aiokafka consumer на остановке коммитит offset и закрывается.
Типичная раскладка: consumer запускается как asyncio-задача в lifespan-startup, останавливается в lifespan-shutdown.
from contextlib import asynccontextmanager
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
from fastapi import FastAPI
import asyncio
consumer: AIOKafkaConsumer | None = None
producer: AIOKafkaProducer | None = None
consumer_task: asyncio.Task | None = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global consumer, producer, consumer_task
consumer = AIOKafkaConsumer(
"orders.confirmed",
bootstrap_servers="kafka:9092",
group_id="billing-service-confirmations",
enable_auto_commit=False,
auto_offset_reset="earliest",
)
producer = AIOKafkaProducer(bootstrap_servers="kafka:9092")
await consumer.start()
await producer.start()
consumer_task = asyncio.create_task(consume_order_events())
yield
# shutdown — порядок важен
consumer_task.cancel()
try:
await asyncio.wait_for(consumer_task, timeout=20.0)
except (asyncio.CancelledError, asyncio.TimeoutError):
pass
await consumer.stop() # flush offset commit + отсоединение от group
await producer.stop() # flush pending batches + close
await engine.dispose() # пул БД — последним
app = FastAPI(lifespan=lifespan)
await consumer.stop() выполняет:
- Прекращает
getmany()/__aiter__. - Делает финальный commit накопленных offsets (если был вызов
consumer.commit()без немедленной отправки — отправляет). - Отправляет
LeaveGroup→ rebalance в группе, другой consumer подхватывает партиции.
Задачу нужно отменить до consumer.stop() — иначе она продолжает читать сообщения пока stop уже выполняется.
Обработка сообщений с manual commit
R-SHUT-KFK-3: manual commit после обработки каждого сообщения.
async def consume_order_events():
async for msg in consumer:
try:
await handle_order_confirmed(msg)
await consumer.commit()
except asyncio.CancelledError:
# shutdown: offset не коммитим, replay при следующем старте
raise
except Exception:
logger.exception(
"order_event_processing_failed",
topic=msg.topic,
partition=msg.partition,
offset=msg.offset,
)
# offset не коммитим — retry при следующем старте
asyncio.CancelledError при shutdown: consumer-задача получает cancel, текущее await handle_order_confirmed() прерывается. Offset не закоммичен — следующий старт прочитает то же сообщение. handle_order_confirmed должен быть идемпотентным (см. ниже).
Для batch-обработки — getmany():
async def consume_order_events():
while True:
batch = await consumer.getmany(timeout_ms=1000, max_records=50)
for tp, messages in batch.items():
for msg in messages:
await handle_order_confirmed(msg)
if batch:
await consumer.commit()
При CancelledError во время getmany или внутри цикла — offset не коммитится, весь batch replay-ится.
Listener не запускает долгий cascade
R-SHUT-KFK-2: обработчик не держит HTTP-вызовы с retry внутри listener-цикла.
# ОПАСНО — cascade HTTP внутри consumer
async def handle_order_confirmed(msg):
event = OrderConfirmedEvent.model_validate_json(msg.value)
await payment_client.charge(event.order_id, event.total) # 5s + retry 30s
await notification_client.send(event.order_id) # 3s + retry 15s
await analytics_client.track(event.order_id) # 2s + retry 10s
Худший случай — 65 секунд. При SIGTERM: cancel прилетает, payment_client.charge прерывается посередине. Если charge прошёл — ack не был сделан, replay при следующем старте → второй списание.
Корректно — только локальная транзакция + outbox:
async def handle_order_confirmed(msg):
event = OrderConfirmedEvent.model_validate_json(msg.value)
async with session_factory() as session:
async with session.begin():
exists = await session.scalar(
select(ProcessedEvent.id)
.where(ProcessedEvent.event_id == event.event_id)
.where(ProcessedEvent.consumer_group == "billing-confirmations")
)
if exists:
return # дедуп — уже обработан
session.add(ProcessedEvent(
event_id=event.event_id,
consumer_group="billing-confirmations",
))
session.add(OutboxEvent(
aggregate_id=str(event.order_id),
event_type="ChargePaymentRequested",
payload={"order_id": str(event.order_id), "total": str(event.total)},
))
Listener делает локальную транзакцию + outbox event. Реальный HTTP-charge — в отдельном outbox-relay worker. Listener завершается за < 100ms, shutdown безопасен.
Пример с доменом Customer — регистрация через outbox:
async def handle_customer_registered(msg):
event = CustomerRegisteredEvent.model_validate_json(msg.value)
async with session_factory() as session:
async with session.begin():
exists = await session.scalar(
select(ProcessedEvent.id)
.where(ProcessedEvent.event_id == event.event_id)
.where(ProcessedEvent.consumer_group == "crm-registrations")
)
if exists:
return
session.add(ProcessedEvent(
event_id=event.event_id,
consumer_group="crm-registrations",
))
session.add(OutboxEvent(
event_type="WelcomeEmailRequested",
payload={"customer_id": str(event.customer_id), "email": event.email},
))
Producer flush
R-SHUT-KFK-4: await producer.stop() — flush + close.
producer = AIOKafkaProducer(
bootstrap_servers="kafka:9092",
acks="all",
enable_idempotence=True,
)
При shutdown в lifespan:
await producer.stop() # flush all pending + close connection
producer.stop() выполняет flush() — все накопленные но ещё не отправленные сообщения дожимаются до broker, затем соединение закрывается. Без stop() — сообщения в internal buffer теряются.
Пример — producer в event-handler:
async def publish_product_price_updated(product_id: str, new_price: Decimal):
event = ProductPriceUpdatedEvent(
product_id=product_id,
new_price=str(new_price),
)
await producer.send_and_wait(
"products.price-updated",
value=event.model_dump_json().encode(),
key=product_id.encode(),
)
send_and_wait блокирует до получения ack от broker — для критичных сообщений это предпочтительно. Для высокотрафичных сценариев — send() (fire-and-forget) + await producer.flush() явно в конце batch, либо полагаться на producer.stop() при shutdown.
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
enable_auto_commit=True | R-SHUT-KFK-X1 | False + await consumer.commit() после обработки |
Нет await consumer.stop() в lifespan-shutdown | R-SHUT-KFK-1 | stop() в lifespan после отмены задачи |
| Долгий cascade (HTTP + retry > 20s) в consumer-цикле | R-SHUT-KFK-2 | outbox + отдельный relay worker |
consumer.stop() без предварительной отмены consumer-задачи | R-SHUT-KFK-1 | сначала task.cancel() + await task, потом stop() |
producer.stop() до consumer.stop() | R-SHUT-KFK-4 | сначала consumer, потом producer |
await producer.send() без send_and_wait для критичных событий | R-SHUT-KFK-4 | send_and_wait или явный flush() |
engine.dispose() до await producer.stop() | R-SHUT-DB-X1 | producer stop → engine dispose |
asyncio.CancelledError подавлен в consumer-цикле | R-SHUT-KFK-1 | re-raise, offset не коммитить при cancel |
Куда дальше
- FastAPI/uvicorn конфигурация —
--timeout-graceful-shutdown, readiness-флаг, lifespan-шаблон. - HTTP drain — uvicorn graceful, preStop sleep, долгие эндпоинты 202+polling.
- Идемпотентность in-flight —
processed_eventдедуп, replay-safety при SIGTERM. - Бюджеты и observability — Kafka в общем 60s budget,
app_shutdown_duration_seconds. - БД и persistence —
engine.dispose()послеproducer.stop(), порядок lifespan. - Фоновые задачи и outbox — outbox-relay,
CancelledError,while app_state.is_ready. - Kubernetes —
terminationGracePeriodSeconds, probes на/health/{live,ready},maxUnavailable 0.