Опирается на правила:
R-KFK-CONS-1…R-KFK-CONS-6иR-KFK-CONS-X1…R-KFK-CONS-X4из Kafka Style Guide → раздел 2. Consumer.
Важно знать
- Уникальный
group_idформата<service>-<consumer-purpose>. Один group = одна логическая роль.- Manual commit обязателен —
enable_auto_commit=False,await consumer.commit()строго после обработки.- Listener idempotent (
R-KFK-IDEM-*) — at-least-once delivery, дубликаты возможны и ожидаемы.auto_offset_reset="earliest"для critical-consumer'ов —"latest"пропускает события при новом group_id.- Concurrency — несколько asyncio-тасок на один топик, каждая ≤ числу partition'ов.
max_poll_interval_ms≥ ожидаемого времени обработки — иначе Kafka считает consumer мёртвым.asyncio.sleep/блокирующий вызов > 1s в теле цикла — держит poll-цикл и провоцирует rebalance.- HTTP к внешней системе без CB/bulkhead — listener зависает, rebalance, шторм дубликатов.
Consumer — точка, где сервис принимает факт из внешнего мира. Типичные поломки здесь — потерянное событие (offset закоммичен до успеха) и rebalance loop (consumer не успевает poll-нуть, Kafka отбирает partition).
group_id per logical purpose
R-KFK-CONS-1: формат <service>-<consumer-purpose>.
from aiokafka import AIOKafkaConsumer
from app.settings import KafkaSettings
settings = KafkaSettings()
consumer = AIOKafkaConsumer(
"orders.confirmed",
bootstrap_servers=settings.brokers,
group_id="billing-order-confirmed", # <service>-<purpose>
enable_auto_commit=False,
auto_offset_reset="earliest",
value_deserializer=lambda b: json.loads(b),
)
Два listener-а в одном сервисе — два group_id:
order_consumer = AIOKafkaConsumer(
"orders.confirmed",
group_id="billing-order-confirmed",
...
)
product_consumer = AIOKafkaConsumer(
"products.price_changed",
group_id="billing-product-price-changed",
...
)
Один consumer-group = один offset, один rebalance. Если вынести оба listener'а под общий group_id="billing-service" — Kafka будет распределять partition'ы между ними как между однородными воркерами, и обработчик orders.confirmed может получить partition из топика products.price_changed.
Manual commit
R-KFK-CONS-2: коммитим offset только после успешной обработки.
import json
import asyncio
from aiokafka import AIOKafkaConsumer
from app.handlers import OrderConfirmedHandler
from app.events import OrderConfirmed
from app.settings import KafkaSettings
async def run_order_confirmed_consumer(handler: OrderConfirmedHandler) -> None:
settings = KafkaSettings()
consumer = AIOKafkaConsumer(
"orders.confirmed",
bootstrap_servers=settings.brokers,
group_id="billing-order-confirmed",
enable_auto_commit=False,
auto_offset_reset="earliest",
value_deserializer=lambda b: json.loads(b),
max_poll_interval_ms=300_000,
)
await consumer.start()
try:
async for msg in consumer:
event = OrderConfirmed(**msg.value)
await handler.handle(event) # идемпотентен по event_id (R-KFK-IDEM)
await consumer.commit() # коммит строго после обработки
finally:
await consumer.stop()
await consumer.commit() коммитит offset для всех присвоенных partition'ов. Если handle бросит исключение — commit не произойдёт, следующий poll повторит то же сообщение. Это at-least-once: ни одно событие не теряется, дубликаты обрабатываются через Idempotent consumer.
Listener idempotent
R-KFK-CONS-3: сообщение может прийти 2+ раз — это норма.
Типичные источники дубликатов в production:
- Consumer rebalance до
await consumer.commit(). - DLQ replay.
- Reset offset при миграции consumer-group.
- Повторный старт pod'а после OOM-kill.
Защита — таблица processed_event в Postgres, проверка event_id до бизнес-операции. Подробнее — Idempotent consumer.
auto_offset_reset="earliest"
R-KFK-CONS-4: для critical-consumer'ов.
consumer = AIOKafkaConsumer(
"orders.confirmed",
auto_offset_reset="earliest", # не "latest"
...
)
Срабатывает при первом старте consumer-group (нет сохранённого offset) или когда offset вышел за пределы retention.
"latest"(aiokafka default) — старт с самого нового. Все сообщения, которые успели прийти до первого poll, пропущены. Для денег и заказов — недопустимо."earliest"— старт с самого старого retained-сообщения. Прочитаем всё.
Для аналитических топиков, где исторические данные не нужны, "latest" приемлемо. Но это исключение, не дефолт.
Concurrency через asyncio-таски
R-KFK-CONS-5: количество параллельных consumer-тасок ≤ числу partition'ов топика.
async def start_billing_consumers(handler: OrderConfirmedHandler) -> None:
tasks = [
asyncio.create_task(run_order_confirmed_consumer(handler))
for _ in range(3) # 3 таски = 3 consumer-instance в одном group
]
await asyncio.gather(*tasks)
Каждая таска — отдельный экземпляр AIOKafkaConsumer с тем же group_id. Kafka раздаст им partition'ы в рамках одного group. Правило: количество тасок × количество pod'ов ≤ partitions. Лишние экземпляры просто простаивают без назначенных partition'ов.
Если топик orders.confirmed имеет 6 partition'ов, а сервис деплоится в 2 pod'а — 3 таски на pod = 6 экземпляров = 6 partition'ов. Нет смысла ставить 4 таски.
max_poll_interval_ms
R-KFK-CONS-6: лимит времени между двумя вызовами poll.
consumer = AIOKafkaConsumer(
"orders.confirmed",
max_poll_interval_ms=300_000, # 5 минут (default)
...
)
В aiokafka async for msg in consumer неявно вызывает poll за каждое сообщение. Если обработка одного сообщения занимает долго — Kafka может зафиксировать consumer как мёртвый и отдать partition другому.
Типичный сценарий поломки:
- Consumer получил сообщение из топика
customers.kyc_requested. - Handler делает HTTP-запрос к внешней KYC-системе, та зависает на 6 минут.
- Kafka не видит poll 5+ минут → rebalance, partition отдан другому consumer.
- KYC-запрос завершился, handler попытался закоммитить offset — но partition уже не его → commit проигнорирован.
- Другой consumer получает то же сообщение → повторный KYC-запрос.
Варианты:
- Ограничить timeout внешнего вызова (
aiohttp.ClientTimeout), обработать ошибку через retry-топик. - Увеличить
max_poll_interval_msпри заведомо долгой обработке. - Вынести тяжёлую работу в отдельную очередь (arq/Celery), не блокируя poll-цикл.
pydantic-settings для конфигурации
R-KFK-CFG-1: параметры через pydantic-settings, не хардкод.
from pydantic_settings import BaseSettings
class KafkaSettings(BaseSettings):
brokers: str
group_prefix: str = "billing"
auto_offset_reset: str = "earliest"
max_poll_interval_ms: int = 300_000
class Config:
env_prefix = "KAFKA_"
bootstrap_servers берётся из KAFKA_BROKERS в окружении. В production — адрес брокерного кластера, в тестах — localhost:9092. Никакого хардкода в коде (R-KFK-CFG-X2).
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
enable_auto_commit=True в проде | R-KFK-CONS-X1 | enable_auto_commit=False + await consumer.commit() после обработки |
asyncio.sleep(N) / блокирующий вызов > 1s в цикле | R-KFK-CONS-X2 | retry-топик с non-blocking delay |
group_id отсутствует или общий для разных listener'ов | R-KFK-CONS-X3 | уникальный <service>-<purpose> |
| HTTP к внешней системе без CB/bulkhead | R-KFK-CONS-X4 | timeout + circuit breaker + retry-топик |
auto_offset_reset="latest" для critical | R-KFK-CONS-4 | "earliest" |
| Количество тасок > числа partition'ов | R-KFK-CONS-5 | ≤ partitions |
Тяжёлая обработка без настройки max_poll_interval_ms | R-KFK-CONS-6 | увеличить или ограничить timeout внешних вызовов |
Проглатывание исключения + await consumer.commit() | R-KFK-CONS-2 | DLQ + commit только при успехе |
Куда дальше
- Idempotent consumer —
processed_event, dedup поevent_id. - Retry topic + DLQ — non-blocking retry через отдельные топики.
- Конфигурация — полный
KafkaSettings+pydantic-settings. - Observability — consumer lag alerts, tracing через
traceparent. - Producer — идемпотентный producer, partition key.
- Outbox publishing — outbox-relay на aiokafka + APScheduler.
- Event design — структура события, Pydantic-модель на границе consumer.
- Security — TLS (
security_protocol="SSL"), ACL per-сервис.