← назад к разделу

Kafka — это место, где неправильная остановка приложения больнее всего бьёт по данным. Consumer убили до commit — следующий запуск прочитает те же сообщения снова. Producer убили до flush — последние сообщения просто исчезли. В этой статье разберём, как правильно останавливать aiokafka в FastAPI-приложении.

Почему обычный kill ломает Kafka

Когда процесс получает сигнал остановки (SIGTERM) и закрывается немедленно, происходят две вещи:

  • Consumer не успевает зафиксировать (закоммитить) offset — координатор группы не знает, до какого сообщения дошла обработка. При следующем старте consumer начнёт читать с последнего зафиксированного offset — те же сообщения придут ещё раз.
  • Producer не успевает отправить сообщения из внутреннего буфера. Эти сообщения теряются навсегда — клиент просто не знает, что они не дошли.

Решение: дать приложению время завершить текущую работу перед выходом. Это и есть корректная остановка.

Consumer stop в lifespan

Типичная схема для aiokafka в FastAPI — запустить consumer как asyncio-задачу в lifespan, а при остановке аккуратно её завершить:

from contextlib import asynccontextmanager
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
from fastapi import FastAPI
import asyncio

consumer: AIOKafkaConsumer | None = None
producer: AIOKafkaProducer | None = None
consumer_task: asyncio.Task | None = None


@asynccontextmanager
async def lifespan(app: FastAPI):
    global consumer, producer, consumer_task

    consumer = AIOKafkaConsumer(
        "orders.confirmed",
        bootstrap_servers="kafka:9092",
        group_id="billing-service-confirmations",
        enable_auto_commit=False,
        auto_offset_reset="earliest",
    )
    producer = AIOKafkaProducer(bootstrap_servers="kafka:9092")

    await consumer.start()
    await producer.start()

    ready = {"accepting": True}
    consumer_task = asyncio.create_task(consume_order_events(ready))

    yield

    # Остановка — порядок важен
    ready["accepting"] = False
    consumer_task.cancel()
    try:
        await asyncio.wait_for(consumer_task, timeout=20.0)
    except (asyncio.CancelledError, asyncio.TimeoutError):
        pass
    await consumer.stop()    # финальный commit offset + выход из группы
    await producer.stop()    # flush pending сообщений + закрыть соединение
    await engine.dispose()   # пул БД — последним


app = FastAPI(lifespan=lifespan)

Что делает await consumer.stop():

  1. Останавливает чтение (getmany() и __aiter__).
  2. Делает финальный commit накопленных offsets.
  3. Отправляет LeaveGroup — другой consumer в группе подхватывает партиции без лишнего ожидания.

Важный момент: задачу нужно отменить до вызова consumer.stop(). Иначе задача продолжает читать сообщения, пока stop уже выполняется — это создаёт гонку.

Почему auto commit нельзя оставлять включённым

enable_auto_commit=True — настройка по умолчанию во многих клиентах. Она удобна, но опасна: aiokafka фиксирует offset по таймеру, не дожидаясь завершения обработки сообщения.

Сценарий потери:

  1. Пришло сообщение, начался его длинный handle().
  2. Таймер сработал, offset закоммичен.
  3. SIGTERM — процесс упал посередине handle().
  4. При следующем старте сообщение не придёт — offset уже зафиксирован. Данные потеряны.

Правильный подход — enable_auto_commit=False и явный await consumer.commit() после успешной обработки каждого сообщения:

async def consume_order_events(ready: dict) -> None:
    async for msg in consumer:
        try:
            await handle_order_confirmed(msg)
            await consumer.commit()
        except asyncio.CancelledError:
            # При остановке: offset не коммитим, сообщение придёт снова
            raise
        except Exception:
            logger.exception(
                "order_event_processing_failed",
                topic=msg.topic,
                partition=msg.partition,
                offset=msg.offset,
            )
            # Не коммитим — при следующем старте обработка повторится

asyncio.CancelledError при остановке нужно пробрасывать дальше (raise), а не подавлять. Если подавить — задача продолжит работу после cancel, и остановка зависнет.

Обработка пачками

Если сообщений много, удобно читать их пачками через getmany(). Флаг ready["accepting"] позволяет остановить чтение новых пачек при получении сигнала остановки:

async def consume_order_events(ready: dict) -> None:
    while ready["accepting"]:
        batch = await consumer.getmany(timeout_ms=1000, max_records=50)
        for tp, messages in batch.items():
            for msg in messages:
                await handle_order_confirmed(msg)
        if batch:
            await consumer.commit()

При остановке lifespan выставляет ready["accepting"] = False — текущая пачка дообрабатывается, новая не начинается. Задача выходит из цикла чисто. Если прилетит CancelledError внутри цикла — offset не коммитится, вся пачка придёт снова при следующем старте.

Долгие вызовы в обработчике — частая ошибка

Внутри consumer-цикла не стоит делать длинные HTTP-запросы с повторными попытками:

# Опасно: если это прервётся на полпути — получим дубль
async def handle_order_confirmed(msg):
    event = OrderConfirmedEvent.model_validate_json(msg.value)
    await payment_client.charge(event.order_id, event.total)    # до 35 секунд с повторами
    await notification_client.send(event.order_id)              # ещё 15 секунд

Проблема: если SIGTERM пришёл пока payment_client.charge в процессе — платёж мог уйти в банк, но offset ещё не закоммичен. Следующий старт повторит обработку — второй платёж.

Правильно — делать только локальную транзакцию и записывать задание в outbox. Отдельный worker потом отправит HTTP-запросы с нужными повторами:

async def handle_order_confirmed(msg):
    event = OrderConfirmedEvent.model_validate_json(msg.value)

    async with session_factory() as session:
        async with session.begin():
            exists = await session.scalar(
                select(ProcessedEvent.id)
                .where(ProcessedEvent.event_id == event.event_id)
                .where(ProcessedEvent.consumer_group == "billing-confirmations")
            )
            if exists:
                return  # уже обработано, пропускаем

            session.add(ProcessedEvent(
                event_id=event.event_id,
                consumer_group="billing-confirmations",
            ))
            session.add(OutboxEvent(
                aggregate_id=str(event.order_id),
                event_type="ChargePaymentRequested",
                payload={"order_id": str(event.order_id), "total": str(event.total)},
            ))

Такой обработчик работает менее 100 миллисекунд — остановка безопасна в любой момент.

Producer flush

Producer тоже нужно остановить явно. await producer.stop() делает две вещи: сначала flush() — дожимает все сообщения из внутреннего буфера до брокера, потом закрывает соединение.

producer = AIOKafkaProducer(
    bootstrap_servers="kafka:9092",
    acks="all",
    enable_idempotence=True,
)

При остановке в lifespan:

await producer.stop()  # flush всех pending сообщений + закрыть соединение

Без этого вызова сообщения, которые клиент принял но ещё не отправил, потеряются.

Для критичных событий удобен send_and_wait — он ждёт подтверждения от брокера прямо при отправке:

async def publish_product_price_updated(product_id: str, new_price: Decimal):
    event = ProductPriceUpdatedEvent(
        product_id=product_id,
        new_price=str(new_price),
    )
    await producer.send_and_wait(
        "products.price-updated",
        value=event.model_dump_json().encode(),
        key=product_id.encode(),
    )

Для высокой нагрузки — send() (не ждёт ack) плюс явный await producer.flush() в конце пачки или полагаться на producer.stop() при остановке.

Порядок остановки

Последовательность в lifespan-shutdown важна:

  1. Выставить флаг ready["accepting"] = False.
  2. Отменить consumer-задачу: consumer_task.cancel().
  3. Дождаться завершения задачи: await asyncio.wait_for(consumer_task, timeout=20.0).
  4. await consumer.stop() — финальный commit offset, выход из группы.
  5. await producer.stop() — flush pending сообщений, закрыть соединение.
  6. await engine.dispose() — закрыть пул базы данных.

Если поменять порядок — например, закрыть пул базы до consumer.stop() — обработчик, который пытается записать outbox event в БД при завершении, получит ошибку соединения.

Коротко

  • enable_auto_commit=False обязательно — с auto commit offset фиксируется до завершения обработки, при сбое данные теряются.
  • Явный await consumer.commit() после каждого успешно обработанного сообщения или пачки.
  • asyncio.CancelledError нужно пробрасывать, не подавлять — иначе задача не завершится при отмене.
  • Долгие HTTP-вызовы с повторами не должны быть внутри consumer-цикла — только локальная транзакция плюс outbox.
  • await producer.stop() делает flush всех pending сообщений перед закрытием соединения.
  • Порядок в lifespan-shutdown: сначала consumer (commit + LeaveGroup), потом producer (flush), последним — пул БД.

Что почитать дальше

  • HTTP drain — uvicorn graceful, preStop sleep, долгие эндпоинты.
  • Фоновые задачи и outbox — outbox-relay, CancelledError, while app_state.is_ready.
  • БД и persistence — engine.dispose() после producer.stop(), порядок lifespan.
  • Kubernetes — terminationGracePeriodSeconds, пробы на /health/{live,ready}.