Опирается на правила: R-KFK-CONS-1R-KFK-CONS-6 и R-KFK-CONS-X1R-KFK-CONS-X4 из Kafka Style Guide → раздел 2. Consumer.

Важно знать

  • Уникальный group_id формата <service>-<consumer-purpose>. Один group = одна логическая роль.
  • Manual commit обязателенenable_auto_commit=False, await consumer.commit() строго после обработки.
  • Listener idempotent (R-KFK-IDEM-*) — at-least-once delivery, дубликаты возможны и ожидаемы.
  • auto_offset_reset="earliest" для critical-consumer'ов — "latest" пропускает события при новом group_id.
  • Concurrency — несколько asyncio-тасок на один топик, каждая ≤ числу partition'ов.
  • max_poll_interval_ms ≥ ожидаемого времени обработки — иначе Kafka считает consumer мёртвым.
  • asyncio.sleep/блокирующий вызов > 1s в теле цикла — держит poll-цикл и провоцирует rebalance.
  • HTTP к внешней системе без CB/bulkhead — listener зависает, rebalance, шторм дубликатов.

Consumer — точка, где сервис принимает факт из внешнего мира. Типичные поломки здесь — потерянное событие (offset закоммичен до успеха) и rebalance loop (consumer не успевает poll-нуть, Kafka отбирает partition).

group_id per logical purpose

R-KFK-CONS-1: формат <service>-<consumer-purpose>.

from aiokafka import AIOKafkaConsumer
from app.settings import KafkaSettings

settings = KafkaSettings()

consumer = AIOKafkaConsumer(
    "orders.confirmed",
    bootstrap_servers=settings.brokers,
    group_id="billing-order-confirmed",       # <service>-<purpose>
    enable_auto_commit=False,
    auto_offset_reset="earliest",
    value_deserializer=lambda b: json.loads(b),
)

Два listener-а в одном сервисе — два group_id:

order_consumer = AIOKafkaConsumer(
    "orders.confirmed",
    group_id="billing-order-confirmed",
    ...
)

product_consumer = AIOKafkaConsumer(
    "products.price_changed",
    group_id="billing-product-price-changed",
    ...
)

Один consumer-group = один offset, один rebalance. Если вынести оба listener'а под общий group_id="billing-service" — Kafka будет распределять partition'ы между ними как между однородными воркерами, и обработчик orders.confirmed может получить partition из топика products.price_changed.

Manual commit

R-KFK-CONS-2: коммитим offset только после успешной обработки.

import json
import asyncio
from aiokafka import AIOKafkaConsumer
from app.handlers import OrderConfirmedHandler
from app.events import OrderConfirmed
from app.settings import KafkaSettings

async def run_order_confirmed_consumer(handler: OrderConfirmedHandler) -> None:
    settings = KafkaSettings()
    consumer = AIOKafkaConsumer(
        "orders.confirmed",
        bootstrap_servers=settings.brokers,
        group_id="billing-order-confirmed",
        enable_auto_commit=False,
        auto_offset_reset="earliest",
        value_deserializer=lambda b: json.loads(b),
        max_poll_interval_ms=300_000,
    )
    await consumer.start()
    try:
        async for msg in consumer:
            event = OrderConfirmed(**msg.value)
            await handler.handle(event)        # идемпотентен по event_id (R-KFK-IDEM)
            await consumer.commit()            # коммит строго после обработки
    finally:
        await consumer.stop()

await consumer.commit() коммитит offset для всех присвоенных partition'ов. Если handle бросит исключение — commit не произойдёт, следующий poll повторит то же сообщение. Это at-least-once: ни одно событие не теряется, дубликаты обрабатываются через Idempotent consumer.

Listener idempotent

R-KFK-CONS-3: сообщение может прийти 2+ раз — это норма.

Типичные источники дубликатов в production:

  • Consumer rebalance до await consumer.commit().
  • DLQ replay.
  • Reset offset при миграции consumer-group.
  • Повторный старт pod'а после OOM-kill.

Защита — таблица processed_event в Postgres, проверка event_id до бизнес-операции. Подробнее — Idempotent consumer.

auto_offset_reset="earliest"

R-KFK-CONS-4: для critical-consumer'ов.

consumer = AIOKafkaConsumer(
    "orders.confirmed",
    auto_offset_reset="earliest",    # не "latest"
    ...
)

Срабатывает при первом старте consumer-group (нет сохранённого offset) или когда offset вышел за пределы retention.

  • "latest" (aiokafka default) — старт с самого нового. Все сообщения, которые успели прийти до первого poll, пропущены. Для денег и заказов — недопустимо.
  • "earliest" — старт с самого старого retained-сообщения. Прочитаем всё.

Для аналитических топиков, где исторические данные не нужны, "latest" приемлемо. Но это исключение, не дефолт.

Concurrency через asyncio-таски

R-KFK-CONS-5: количество параллельных consumer-тасок ≤ числу partition'ов топика.

async def start_billing_consumers(handler: OrderConfirmedHandler) -> None:
    tasks = [
        asyncio.create_task(run_order_confirmed_consumer(handler))
        for _ in range(3)    # 3 таски = 3 consumer-instance в одном group
    ]
    await asyncio.gather(*tasks)

Каждая таска — отдельный экземпляр AIOKafkaConsumer с тем же group_id. Kafka раздаст им partition'ы в рамках одного group. Правило: количество тасок × количество pod'ов ≤ partitions. Лишние экземпляры просто простаивают без назначенных partition'ов.

Если топик orders.confirmed имеет 6 partition'ов, а сервис деплоится в 2 pod'а — 3 таски на pod = 6 экземпляров = 6 partition'ов. Нет смысла ставить 4 таски.

max_poll_interval_ms

R-KFK-CONS-6: лимит времени между двумя вызовами poll.

consumer = AIOKafkaConsumer(
    "orders.confirmed",
    max_poll_interval_ms=300_000,    # 5 минут (default)
    ...
)

В aiokafka async for msg in consumer неявно вызывает poll за каждое сообщение. Если обработка одного сообщения занимает долго — Kafka может зафиксировать consumer как мёртвый и отдать partition другому.

Типичный сценарий поломки:

  1. Consumer получил сообщение из топика customers.kyc_requested.
  2. Handler делает HTTP-запрос к внешней KYC-системе, та зависает на 6 минут.
  3. Kafka не видит poll 5+ минут → rebalance, partition отдан другому consumer.
  4. KYC-запрос завершился, handler попытался закоммитить offset — но partition уже не его → commit проигнорирован.
  5. Другой consumer получает то же сообщение → повторный KYC-запрос.

Варианты:

  • Ограничить timeout внешнего вызова (aiohttp.ClientTimeout), обработать ошибку через retry-топик.
  • Увеличить max_poll_interval_ms при заведомо долгой обработке.
  • Вынести тяжёлую работу в отдельную очередь (arq/Celery), не блокируя poll-цикл.

pydantic-settings для конфигурации

R-KFK-CFG-1: параметры через pydantic-settings, не хардкод.

from pydantic_settings import BaseSettings

class KafkaSettings(BaseSettings):
    brokers: str
    group_prefix: str = "billing"
    auto_offset_reset: str = "earliest"
    max_poll_interval_ms: int = 300_000

    class Config:
        env_prefix = "KAFKA_"

bootstrap_servers берётся из KAFKA_BROKERS в окружении. В production — адрес брокерного кластера, в тестах — localhost:9092. Никакого хардкода в коде (R-KFK-CFG-X2).

Что запрещено

АнтипаттернПравилоЧто взамен
enable_auto_commit=True в продеR-KFK-CONS-X1enable_auto_commit=False + await consumer.commit() после обработки
asyncio.sleep(N) / блокирующий вызов > 1s в циклеR-KFK-CONS-X2retry-топик с non-blocking delay
group_id отсутствует или общий для разных listener'овR-KFK-CONS-X3уникальный <service>-<purpose>
HTTP к внешней системе без CB/bulkheadR-KFK-CONS-X4timeout + circuit breaker + retry-топик
auto_offset_reset="latest" для criticalR-KFK-CONS-4"earliest"
Количество тасок > числа partition'овR-KFK-CONS-5≤ partitions
Тяжёлая обработка без настройки max_poll_interval_msR-KFK-CONS-6увеличить или ограничить timeout внешних вызовов
Проглатывание исключения + await consumer.commit()R-KFK-CONS-2DLQ + commit только при успехе

Куда дальше

  • Idempotent consumer — processed_event, dedup по event_id.
  • Retry topic + DLQ — non-blocking retry через отдельные топики.
  • Конфигурация — полный KafkaSettings + pydantic-settings.
  • Observability — consumer lag alerts, tracing через traceparent.
  • Producer — идемпотентный producer, partition key.
  • Outbox publishing — outbox-relay на aiokafka + APScheduler.
  • Event design — структура события, Pydantic-модель на границе consumer.
  • Security — TLS (security_protocol="SSL"), ACL per-сервис.