Опирается на правила: R-SHUT-KFK-1R-SHUT-KFK-4 и R-SHUT-KFK-X1 из Graceful Shutdown Style Guide → раздел 3. Kafka shutdown.

Важно знать

  • await consumer.stop() в lifespan-shutdown дожимает текущее сообщение и коммитит offset перед остановкой.
  • enable_auto_commit=False — обязательно; auto-commit может зафиксировать offset до завершения обработки.
  • Manual commitawait consumer.commit() явно после успешной обработки каждого сообщения или batch.
  • Listener не запускает долгий cascade — HTTP-вызовы с retry на 20+ секунд не уложатся в shutdown; cascade выносится в outbox.
  • await producer.stop() — flush all pending batches + close connection; без этого отправленные но не сброшенные сообщения теряются.
  • Порядок в lifespan: сначала отменить consumer-задачу и дождаться consumer.stop(), затем producer.stop(), в конце — engine.dispose().
  • asyncio.CancelledError в consumer-задаче — поймать, коммит не делать (offset не закоммичен → replay защищён идемпотентностью).

Kafka shutdown — место, где легко потерять данные или получить дубли. Если consumer убит до commit offset — следующий старт получит те же сообщения (replay). Если producer убит до flush — отправленные batch теряются. UCP формулирует одно условие: дожать сообщение, явный commit, flush producer — и только после этого закрыть пул БД.

Consumer stop в lifespan-shutdown

R-SHUT-KFK-1: aiokafka consumer на остановке коммитит offset и закрывается.

Типичная раскладка: consumer запускается как asyncio-задача в lifespan-startup, останавливается в lifespan-shutdown.

from contextlib import asynccontextmanager
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
from fastapi import FastAPI
import asyncio

consumer: AIOKafkaConsumer | None = None
producer: AIOKafkaProducer | None = None
consumer_task: asyncio.Task | None = None


@asynccontextmanager
async def lifespan(app: FastAPI):
    global consumer, producer, consumer_task

    consumer = AIOKafkaConsumer(
        "orders.confirmed",
        bootstrap_servers="kafka:9092",
        group_id="billing-service-confirmations",
        enable_auto_commit=False,
        auto_offset_reset="earliest",
    )
    producer = AIOKafkaProducer(bootstrap_servers="kafka:9092")

    await consumer.start()
    await producer.start()

    consumer_task = asyncio.create_task(consume_order_events())

    yield

    # shutdown — порядок важен
    consumer_task.cancel()
    try:
        await asyncio.wait_for(consumer_task, timeout=20.0)
    except (asyncio.CancelledError, asyncio.TimeoutError):
        pass
    await consumer.stop()    # flush offset commit + отсоединение от group
    await producer.stop()    # flush pending batches + close
    await engine.dispose()   # пул БД — последним


app = FastAPI(lifespan=lifespan)

await consumer.stop() выполняет:

  1. Прекращает getmany() / __aiter__.
  2. Делает финальный commit накопленных offsets (если был вызов consumer.commit() без немедленной отправки — отправляет).
  3. Отправляет LeaveGroup → rebalance в группе, другой consumer подхватывает партиции.

Задачу нужно отменить до consumer.stop() — иначе она продолжает читать сообщения пока stop уже выполняется.

Обработка сообщений с manual commit

R-SHUT-KFK-3: manual commit после обработки каждого сообщения.

async def consume_order_events():
    async for msg in consumer:
        try:
            await handle_order_confirmed(msg)
            await consumer.commit()
        except asyncio.CancelledError:
            # shutdown: offset не коммитим, replay при следующем старте
            raise
        except Exception:
            logger.exception(
                "order_event_processing_failed",
                topic=msg.topic,
                partition=msg.partition,
                offset=msg.offset,
            )
            # offset не коммитим — retry при следующем старте

asyncio.CancelledError при shutdown: consumer-задача получает cancel, текущее await handle_order_confirmed() прерывается. Offset не закоммичен — следующий старт прочитает то же сообщение. handle_order_confirmed должен быть идемпотентным (см. ниже).

Для batch-обработки — getmany():

async def consume_order_events():
    while True:
        batch = await consumer.getmany(timeout_ms=1000, max_records=50)
        for tp, messages in batch.items():
            for msg in messages:
                await handle_order_confirmed(msg)
        if batch:
            await consumer.commit()

При CancelledError во время getmany или внутри цикла — offset не коммитится, весь batch replay-ится.

Listener не запускает долгий cascade

R-SHUT-KFK-2: обработчик не держит HTTP-вызовы с retry внутри listener-цикла.

# ОПАСНО — cascade HTTP внутри consumer
async def handle_order_confirmed(msg):
    event = OrderConfirmedEvent.model_validate_json(msg.value)
    await payment_client.charge(event.order_id, event.total)    # 5s + retry 30s
    await notification_client.send(event.order_id)              # 3s + retry 15s
    await analytics_client.track(event.order_id)               # 2s + retry 10s

Худший случай — 65 секунд. При SIGTERM: cancel прилетает, payment_client.charge прерывается посередине. Если charge прошёл — ack не был сделан, replay при следующем старте → второй списание.

Корректно — только локальная транзакция + outbox:

async def handle_order_confirmed(msg):
    event = OrderConfirmedEvent.model_validate_json(msg.value)

    async with session_factory() as session:
        async with session.begin():
            exists = await session.scalar(
                select(ProcessedEvent.id)
                .where(ProcessedEvent.event_id == event.event_id)
                .where(ProcessedEvent.consumer_group == "billing-confirmations")
            )
            if exists:
                return  # дедуп — уже обработан

            session.add(ProcessedEvent(
                event_id=event.event_id,
                consumer_group="billing-confirmations",
            ))
            session.add(OutboxEvent(
                aggregate_id=str(event.order_id),
                event_type="ChargePaymentRequested",
                payload={"order_id": str(event.order_id), "total": str(event.total)},
            ))

Listener делает локальную транзакцию + outbox event. Реальный HTTP-charge — в отдельном outbox-relay worker. Listener завершается за < 100ms, shutdown безопасен.

Пример с доменом Customer — регистрация через outbox:

async def handle_customer_registered(msg):
    event = CustomerRegisteredEvent.model_validate_json(msg.value)

    async with session_factory() as session:
        async with session.begin():
            exists = await session.scalar(
                select(ProcessedEvent.id)
                .where(ProcessedEvent.event_id == event.event_id)
                .where(ProcessedEvent.consumer_group == "crm-registrations")
            )
            if exists:
                return

            session.add(ProcessedEvent(
                event_id=event.event_id,
                consumer_group="crm-registrations",
            ))
            session.add(OutboxEvent(
                event_type="WelcomeEmailRequested",
                payload={"customer_id": str(event.customer_id), "email": event.email},
            ))

Producer flush

R-SHUT-KFK-4: await producer.stop() — flush + close.

producer = AIOKafkaProducer(
    bootstrap_servers="kafka:9092",
    acks="all",
    enable_idempotence=True,
)

При shutdown в lifespan:

await producer.stop()  # flush all pending + close connection

producer.stop() выполняет flush() — все накопленные но ещё не отправленные сообщения дожимаются до broker, затем соединение закрывается. Без stop() — сообщения в internal buffer теряются.

Пример — producer в event-handler:

async def publish_product_price_updated(product_id: str, new_price: Decimal):
    event = ProductPriceUpdatedEvent(
        product_id=product_id,
        new_price=str(new_price),
    )
    await producer.send_and_wait(
        "products.price-updated",
        value=event.model_dump_json().encode(),
        key=product_id.encode(),
    )

send_and_wait блокирует до получения ack от broker — для критичных сообщений это предпочтительно. Для высокотрафичных сценариев — send() (fire-and-forget) + await producer.flush() явно в конце batch, либо полагаться на producer.stop() при shutdown.

Что запрещено

АнтипаттернПравилоЧто взамен
enable_auto_commit=TrueR-SHUT-KFK-X1False + await consumer.commit() после обработки
Нет await consumer.stop() в lifespan-shutdownR-SHUT-KFK-1stop() в lifespan после отмены задачи
Долгий cascade (HTTP + retry > 20s) в consumer-циклеR-SHUT-KFK-2outbox + отдельный relay worker
consumer.stop() без предварительной отмены consumer-задачиR-SHUT-KFK-1сначала task.cancel() + await task, потом stop()
producer.stop() до consumer.stop()R-SHUT-KFK-4сначала consumer, потом producer
await producer.send() без send_and_wait для критичных событийR-SHUT-KFK-4send_and_wait или явный flush()
engine.dispose() до await producer.stop()R-SHUT-DB-X1producer stop → engine dispose
asyncio.CancelledError подавлен в consumer-циклеR-SHUT-KFK-1re-raise, offset не коммитить при cancel

Куда дальше

  • FastAPI/uvicorn конфигурация--timeout-graceful-shutdown, readiness-флаг, lifespan-шаблон.
  • HTTP drain — uvicorn graceful, preStop sleep, долгие эндпоинты 202+polling.
  • Идемпотентность in-flight — processed_event дедуп, replay-safety при SIGTERM.
  • Бюджеты и observability — Kafka в общем 60s budget, app_shutdown_duration_seconds.
  • БД и persistence — engine.dispose() после producer.stop(), порядок lifespan.
  • Фоновые задачи и outbox — outbox-relay, CancelledError, while app_state.is_ready.
  • Kubernetes — terminationGracePeriodSeconds, probes на /health/{live,ready}, maxUnavailable 0.