Kafka — это место, где неправильная остановка приложения больнее всего бьёт по данным. Consumer убили до commit — следующий запуск прочитает те же сообщения снова. Producer убили до flush — последние сообщения просто исчезли. В этой статье разберём, как правильно останавливать aiokafka в FastAPI-приложении.
Почему обычный kill ломает Kafka
Когда процесс получает сигнал остановки (SIGTERM) и закрывается немедленно, происходят две вещи:
- Consumer не успевает зафиксировать (закоммитить) offset — координатор группы не знает, до какого сообщения дошла обработка. При следующем старте consumer начнёт читать с последнего зафиксированного offset — те же сообщения придут ещё раз.
- Producer не успевает отправить сообщения из внутреннего буфера. Эти сообщения теряются навсегда — клиент просто не знает, что они не дошли.
Решение: дать приложению время завершить текущую работу перед выходом. Это и есть корректная остановка.
Consumer stop в lifespan
Типичная схема для aiokafka в FastAPI — запустить consumer как asyncio-задачу в lifespan, а при остановке аккуратно её завершить:
from contextlib import asynccontextmanager
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
from fastapi import FastAPI
import asyncio
consumer: AIOKafkaConsumer | None = None
producer: AIOKafkaProducer | None = None
consumer_task: asyncio.Task | None = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global consumer, producer, consumer_task
consumer = AIOKafkaConsumer(
"orders.confirmed",
bootstrap_servers="kafka:9092",
group_id="billing-service-confirmations",
enable_auto_commit=False,
auto_offset_reset="earliest",
)
producer = AIOKafkaProducer(bootstrap_servers="kafka:9092")
await consumer.start()
await producer.start()
ready = {"accepting": True}
consumer_task = asyncio.create_task(consume_order_events(ready))
yield
# Остановка — порядок важен
ready["accepting"] = False
consumer_task.cancel()
try:
await asyncio.wait_for(consumer_task, timeout=20.0)
except (asyncio.CancelledError, asyncio.TimeoutError):
pass
await consumer.stop() # финальный commit offset + выход из группы
await producer.stop() # flush pending сообщений + закрыть соединение
await engine.dispose() # пул БД — последним
app = FastAPI(lifespan=lifespan)
Что делает await consumer.stop():
- Останавливает чтение (
getmany()и__aiter__). - Делает финальный commit накопленных offsets.
- Отправляет
LeaveGroup— другой consumer в группе подхватывает партиции без лишнего ожидания.
Важный момент: задачу нужно отменить до вызова consumer.stop(). Иначе задача продолжает читать сообщения, пока stop уже выполняется — это создаёт гонку.
Почему auto commit нельзя оставлять включённым
enable_auto_commit=True — настройка по умолчанию во многих клиентах. Она удобна, но опасна: aiokafka фиксирует offset по таймеру, не дожидаясь завершения обработки сообщения.
Сценарий потери:
- Пришло сообщение, начался его длинный
handle(). - Таймер сработал, offset закоммичен.
- SIGTERM — процесс упал посередине
handle(). - При следующем старте сообщение не придёт — offset уже зафиксирован. Данные потеряны.
Правильный подход — enable_auto_commit=False и явный await consumer.commit() после успешной обработки каждого сообщения:
async def consume_order_events(ready: dict) -> None:
async for msg in consumer:
try:
await handle_order_confirmed(msg)
await consumer.commit()
except asyncio.CancelledError:
# При остановке: offset не коммитим, сообщение придёт снова
raise
except Exception:
logger.exception(
"order_event_processing_failed",
topic=msg.topic,
partition=msg.partition,
offset=msg.offset,
)
# Не коммитим — при следующем старте обработка повторится
asyncio.CancelledError при остановке нужно пробрасывать дальше (raise), а не подавлять. Если подавить — задача продолжит работу после cancel, и остановка зависнет.
Обработка пачками
Если сообщений много, удобно читать их пачками через getmany(). Флаг ready["accepting"] позволяет остановить чтение новых пачек при получении сигнала остановки:
async def consume_order_events(ready: dict) -> None:
while ready["accepting"]:
batch = await consumer.getmany(timeout_ms=1000, max_records=50)
for tp, messages in batch.items():
for msg in messages:
await handle_order_confirmed(msg)
if batch:
await consumer.commit()
При остановке lifespan выставляет ready["accepting"] = False — текущая пачка дообрабатывается, новая не начинается. Задача выходит из цикла чисто. Если прилетит CancelledError внутри цикла — offset не коммитится, вся пачка придёт снова при следующем старте.
Долгие вызовы в обработчике — частая ошибка
Внутри consumer-цикла не стоит делать длинные HTTP-запросы с повторными попытками:
# Опасно: если это прервётся на полпути — получим дубль
async def handle_order_confirmed(msg):
event = OrderConfirmedEvent.model_validate_json(msg.value)
await payment_client.charge(event.order_id, event.total) # до 35 секунд с повторами
await notification_client.send(event.order_id) # ещё 15 секунд
Проблема: если SIGTERM пришёл пока payment_client.charge в процессе — платёж мог уйти в банк, но offset ещё не закоммичен. Следующий старт повторит обработку — второй платёж.
Правильно — делать только локальную транзакцию и записывать задание в outbox. Отдельный worker потом отправит HTTP-запросы с нужными повторами:
async def handle_order_confirmed(msg):
event = OrderConfirmedEvent.model_validate_json(msg.value)
async with session_factory() as session:
async with session.begin():
exists = await session.scalar(
select(ProcessedEvent.id)
.where(ProcessedEvent.event_id == event.event_id)
.where(ProcessedEvent.consumer_group == "billing-confirmations")
)
if exists:
return # уже обработано, пропускаем
session.add(ProcessedEvent(
event_id=event.event_id,
consumer_group="billing-confirmations",
))
session.add(OutboxEvent(
aggregate_id=str(event.order_id),
event_type="ChargePaymentRequested",
payload={"order_id": str(event.order_id), "total": str(event.total)},
))
Такой обработчик работает менее 100 миллисекунд — остановка безопасна в любой момент.
Producer flush
Producer тоже нужно остановить явно. await producer.stop() делает две вещи: сначала flush() — дожимает все сообщения из внутреннего буфера до брокера, потом закрывает соединение.
producer = AIOKafkaProducer(
bootstrap_servers="kafka:9092",
acks="all",
enable_idempotence=True,
)
При остановке в lifespan:
await producer.stop() # flush всех pending сообщений + закрыть соединение
Без этого вызова сообщения, которые клиент принял но ещё не отправил, потеряются.
Для критичных событий удобен send_and_wait — он ждёт подтверждения от брокера прямо при отправке:
async def publish_product_price_updated(product_id: str, new_price: Decimal):
event = ProductPriceUpdatedEvent(
product_id=product_id,
new_price=str(new_price),
)
await producer.send_and_wait(
"products.price-updated",
value=event.model_dump_json().encode(),
key=product_id.encode(),
)
Для высокой нагрузки — send() (не ждёт ack) плюс явный await producer.flush() в конце пачки или полагаться на producer.stop() при остановке.
Порядок остановки
Последовательность в lifespan-shutdown важна:
- Выставить флаг
ready["accepting"] = False. - Отменить consumer-задачу:
consumer_task.cancel(). - Дождаться завершения задачи:
await asyncio.wait_for(consumer_task, timeout=20.0). await consumer.stop()— финальный commit offset, выход из группы.await producer.stop()— flush pending сообщений, закрыть соединение.await engine.dispose()— закрыть пул базы данных.
Если поменять порядок — например, закрыть пул базы до consumer.stop() — обработчик, который пытается записать outbox event в БД при завершении, получит ошибку соединения.
Коротко
enable_auto_commit=Falseобязательно — с auto commit offset фиксируется до завершения обработки, при сбое данные теряются.- Явный
await consumer.commit()после каждого успешно обработанного сообщения или пачки. asyncio.CancelledErrorнужно пробрасывать, не подавлять — иначе задача не завершится при отмене.- Долгие HTTP-вызовы с повторами не должны быть внутри consumer-цикла — только локальная транзакция плюс outbox.
await producer.stop()делает flush всех pending сообщений перед закрытием соединения.- Порядок в lifespan-shutdown: сначала consumer (commit + LeaveGroup), потом producer (flush), последним — пул БД.
Что почитать дальше
- HTTP drain — uvicorn graceful, preStop sleep, долгие эндпоинты.
- Фоновые задачи и outbox — outbox-relay,
CancelledError,while app_state.is_ready. - БД и persistence —
engine.dispose()послеproducer.stop(), порядок lifespan. - Kubernetes —
terminationGracePeriodSeconds, пробы на/health/{live,ready}.