Опирается на правила: R-KFK-CFG-1R-KFK-CFG-4 и R-KFK-CFG-X1R-KFK-CFG-X2 из Kafka Style Guide → раздел 7. Конфигурация.

Важно знать

  • pydantic-settings (KafkaSettings) — типизированный, валидируемый источник параметров; R-KFK-CFG-1.
  • bootstrap_servers только через env (KAFKA_BROKERS) — никакого hard-code; R-KFK-CFG-X2.
  • Producer: enable_idempotence=True, acks="all" — включаются единожды в ProducerSettings; R-KFK-PROD-1.
  • Consumer: enable_auto_commit=False, auto_offset_reset="earliest" для critical-топиков; R-KFK-CONS-2, R-KFK-CONS-4.
  • Реестр event_type → Pydantic-модель — статический allow-list вместо importlib по строке из payload; R-KFK-CFG-3.
  • Проверка топиков на стартеAdminClient.describe_topics(...) в lifespan, fail-fast при отсутствии; R-KFK-CFG-4.
  • dynamical import по строке из payload — RCE-риск, аналог Java trusted.packages: '*'; R-KFK-CFG-X1.

Конфигурация Kafka — место, где малая ошибка стоит больно. enable_idempotence=False → дубликаты во всём пайплайне. Динамический импорт класса по строке из payload → RCE. Отсутствие проверки топиков на старте → consumer запускается, тихо ничего не читает, никто не замечает неделями.

KafkaSettings через pydantic-settings

R-KFK-CFG-1: параметры через валидируемый конфиг.

from pydantic import AnyUrl, Field, model_validator
from pydantic_settings import BaseSettings, SettingsConfigDict


class ProducerSettings(BaseSettings):
    model_config = SettingsConfigDict(env_prefix="KAFKA_PRODUCER_")

    enable_idempotence: bool = True
    acks: str = "all"
    request_timeout_ms: int = 30_000
    retry_backoff_ms: int = 100


class ConsumerSettings(BaseSettings):
    model_config = SettingsConfigDict(env_prefix="KAFKA_CONSUMER_")

    group_id: str
    auto_offset_reset: str = "earliest"
    enable_auto_commit: bool = False
    max_poll_interval_ms: int = 600_000
    max_poll_records: int = 100


class KafkaSettings(BaseSettings):
    model_config = SettingsConfigDict(env_prefix="KAFKA_")

    brokers: str = Field(..., alias="KAFKA_BROKERS")
    producer: ProducerSettings = Field(default_factory=ProducerSettings)
    consumer: ConsumerSettings = Field(default_factory=ConsumerSettings)
    topics: list[str] = Field(default_factory=list)

    @model_validator(mode="after")
    def check_brokers_not_hardcoded(self) -> "KafkaSettings":
        hardcoded = {"localhost:9092", "kafka:9092"}
        if self.brokers in hardcoded and not self._is_dev_env():
            raise ValueError("KAFKA_BROKERS must be set via env in non-dev environments")
        return self

    def _is_dev_env(self) -> bool:
        import os
        return os.getenv("APP_ENV", "dev") == "dev"

Что это даёт:

  • При старте FastAPI проверяет все Field(...) — если KAFKA_BROKERS не задан, приложение падает с ясным сообщением, не со KeyError в недрах aiokafka.
  • IDE и mypy видят типизированный settings.kafka.brokers вместо os.environ["KAFKA_BROKERS"] россыпью.
  • model_validator дополнительно запрещает hard-code адреса вне dev-среды.

Подробнее — Validation → @ConfigurationProperties.

Инициализация AIOKafkaProducer и AIOKafkaConsumer

R-KFK-CFG-2: полный пример с централизованной инициализацией.

from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
from contextlib import asynccontextmanager
import json


_producer: AIOKafkaProducer | None = None
_consumer: AIOKafkaConsumer | None = None


def get_producer() -> AIOKafkaProducer:
    assert _producer is not None, "Producer not initialized"
    return _producer


@asynccontextmanager
async def kafka_lifespan(settings: KafkaSettings):
    global _producer, _consumer

    _producer = AIOKafkaProducer(
        bootstrap_servers=settings.brokers,
        enable_idempotence=settings.producer.enable_idempotence,   # True
        acks=settings.producer.acks,                               # "all"
        request_timeout_ms=settings.producer.request_timeout_ms,
        retry_backoff_ms=settings.producer.retry_backoff_ms,
        key_serializer=lambda k: k.encode() if isinstance(k, str) else k,
        value_serializer=lambda v: json.dumps(v).encode(),
    )
    _consumer = AIOKafkaConsumer(
        *settings.topics,
        bootstrap_servers=settings.brokers,
        group_id=settings.consumer.group_id,
        enable_auto_commit=settings.consumer.enable_auto_commit,    # False
        auto_offset_reset=settings.consumer.auto_offset_reset,      # "earliest"
        max_poll_interval_ms=settings.consumer.max_poll_interval_ms,
        value_deserializer=lambda b: json.loads(b),
    )

    await _check_topics(settings)      # fail-fast (R-KFK-CFG-4)

    await _producer.start()
    await _consumer.start()
    try:
        yield
    finally:
        await _producer.stop()
        await _consumer.stop()

Каждая опция соответствует правилу:

  • enable_idempotence, acks — Producer.
  • enable_auto_commit, auto_offset_reset — Consumer.
  • observation-enabled аналог — Observability.

Статический реестр event_type → Pydantic-модель

R-KFK-CFG-3: allow-list десериализации.

from pydantic import BaseModel
from typing import Type


class OrderConfirmed(BaseModel):
    event_id: str
    order_id: str
    customer_id: str
    amount_rub: int
    occurred_at: str


class ProductReserved(BaseModel):
    event_id: str
    product_id: str
    quantity: int
    occurred_at: str


class CustomerRegistered(BaseModel):
    event_id: str
    customer_id: str
    occurred_at: str


EVENT_REGISTRY: dict[str, Type[BaseModel]] = {
    "OrderConfirmed": OrderConfirmed,
    "ProductReserved": ProductReserved,
    "CustomerRegistered": CustomerRegistered,
}


def deserialize_event(event_type: str, payload: dict) -> BaseModel:
    model_class = EVENT_REGISTRY.get(event_type)
    if model_class is None:
        raise ValueError(f"Unknown event_type: {event_type!r}")
    return model_class.model_validate(payload)

Почему статический реестр, а не importlib:

# ЗАПРЕЩЕНО — R-KFK-CFG-X1
import importlib

def deserialize_dynamic(event_type: str, payload: dict):
    module_name, class_name = event_type.rsplit(".", 1)
    module = importlib.import_module(module_name)   # импортирует ЛЮБОЙ модуль
    cls = getattr(module, class_name)               # инстанцирует ЛЮБОЙ класс
    return cls(**payload)

Если в Kafka-сообщении event_type: "os.system" или любой другой gadget — importlib выполнит его. Это прямой аналог Java trusted.packages: '*'. Только статический EVENT_REGISTRY разрешает конкретные модели.

Pydantic дополнительно валидирует поля — если в payload нет обязательного event_id, model_validate бросает ValidationError до любой бизнес-логики.

Проверка топиков на старте

R-KFK-CFG-4: fail-fast при отсутствии ожидаемого топика.

from aiokafka.admin import AIOKafkaAdminClient


async def _check_topics(settings: KafkaSettings) -> None:
    if not settings.topics:
        return

    admin = AIOKafkaAdminClient(bootstrap_servers=settings.brokers)
    await admin.start()
    try:
        cluster_topics = await admin.list_topics()
        missing = [t for t in settings.topics if t not in cluster_topics]
        if missing:
            raise RuntimeError(
                f"Required Kafka topics not found: {missing}. "
                "Create them before starting the service."
            )
    finally:
        await admin.close()

Сценарий без этой проверки применительно к сервису сбора заказов Sber:

  1. Деплой v2.3 c новым consumer топика sber.orders.risk-scored.
  2. Команда инфраструктуры забыла создать топик в проде.
  3. AIOKafkaConsumer стартует — aiokafka просто ждёт появления топика (поведение по умолчанию при auto_offset_reset="earliest").
  4. Никаких событий не приходит, никаких алертов, функционал скоринга рисков не работает.
  5. Через две недели QA замечает аномалию в отчётах.

С _check_topics — lifespan падает при старте, K8s не поднимает pod, дежурный сразу видит ошибку.

В dev-среде топик можно создавать автоматически через AIOKafkaAdminClient.create_topics(...), но в prod/staging — только fail-fast.

Разделение конфигов по сервисам

При нескольких сервисах (заказы, продукты, клиенты) — отдельные KafkaSettings с разными group_id и topics:

# billing_service/config.py
kafka = KafkaSettings(
    brokers=os.environ["KAFKA_BROKERS"],
    consumer=ConsumerSettings(
        group_id="billing-order-confirmed",        # R-KFK-CONS-1
        auto_offset_reset="earliest",
    ),
    topics=["orders.confirmed"],
)

# inventory_service/config.py
kafka = KafkaSettings(
    brokers=os.environ["KAFKA_BROKERS"],
    consumer=ConsumerSettings(
        group_id="inventory-order-confirmed",      # уникален per-роль
        auto_offset_reset="earliest",
    ),
    topics=["orders.confirmed", "products.reserved"],
)

group_id всегда в формате <service>-<purpose> (R-KFK-CONS-1). Два сервиса с одним group_id делят партиции — каждое сообщение уходит только одному из них.

Что запрещено

АнтипаттернПравилоЧто взамен
importlib.import_module по строке из payloadR-KFK-CFG-X1статический EVENT_REGISTRY
bootstrap_servers="kafka-prod:9092" в кодеR-KFK-CFG-X2os.environ["KAFKA_BROKERS"] / pydantic-settings
Нет проверки топиков на стартеR-KFK-CFG-4_check_topics(settings) в lifespan
KafkaSettings без валидации полейR-KFK-CFG-1pydantic-settings + Field(...)
enable_auto_commit=True в конфигеR-KFK-CONS-2False + await consumer.commit()
auto_offset_reset="latest" для critical-топиковR-KFK-CONS-4"earliest"
Учётные данные брокера в кодеR-KFK-CFG-X2env / Vault / Secrets Manager
Один group_id на разные consumer'ы сервисаR-KFK-CONS-1уникальный <service>-<purpose>

Куда дальше

  • Producer — enable_idempotence=True, acks="all", partition key.
  • Consumer — manual commit, group_id, auto_offset_reset.
  • Security — TLS (security_protocol="SSL"), ACL, credentials через env.
  • Observability — метрики, consumer lag alert, traceparent в headers.
  • Event design — структура payload, event_id, forward-compat.
  • Idempotent consumer — processed_event, дедупликация по event_id.
  • Outbox publishing — outbox-relay, FOR UPDATE SKIP LOCKED, batch.
  • Retry topic + DLQ — retry-топики, DLQ, max-attempts.