Опирается на правила:
R-KFK-CFG-1…R-KFK-CFG-4иR-KFK-CFG-X1…R-KFK-CFG-X2из Kafka Style Guide → раздел 7. Конфигурация.
Важно знать
pydantic-settings(KafkaSettings) — типизированный, валидируемый источник параметров;R-KFK-CFG-1.bootstrap_serversтолько через env (KAFKA_BROKERS) — никакого hard-code;R-KFK-CFG-X2.- Producer:
enable_idempotence=True,acks="all"— включаются единожды вProducerSettings;R-KFK-PROD-1.- Consumer:
enable_auto_commit=False,auto_offset_reset="earliest"для critical-топиков;R-KFK-CONS-2,R-KFK-CONS-4.- Реестр
event_type → Pydantic-модель— статический allow-list вместоimportlibпо строке из payload;R-KFK-CFG-3.- Проверка топиков на старте —
AdminClient.describe_topics(...)в lifespan, fail-fast при отсутствии;R-KFK-CFG-4.dynamical importпо строке из payload — RCE-риск, аналог Javatrusted.packages: '*';R-KFK-CFG-X1.
Конфигурация Kafka — место, где малая ошибка стоит больно. enable_idempotence=False → дубликаты во всём пайплайне. Динамический импорт класса по строке из payload → RCE. Отсутствие проверки топиков на старте → consumer запускается, тихо ничего не читает, никто не замечает неделями.
KafkaSettings через pydantic-settings
R-KFK-CFG-1: параметры через валидируемый конфиг.
from pydantic import AnyUrl, Field, model_validator
from pydantic_settings import BaseSettings, SettingsConfigDict
class ProducerSettings(BaseSettings):
model_config = SettingsConfigDict(env_prefix="KAFKA_PRODUCER_")
enable_idempotence: bool = True
acks: str = "all"
request_timeout_ms: int = 30_000
retry_backoff_ms: int = 100
class ConsumerSettings(BaseSettings):
model_config = SettingsConfigDict(env_prefix="KAFKA_CONSUMER_")
group_id: str
auto_offset_reset: str = "earliest"
enable_auto_commit: bool = False
max_poll_interval_ms: int = 600_000
max_poll_records: int = 100
class KafkaSettings(BaseSettings):
model_config = SettingsConfigDict(env_prefix="KAFKA_")
brokers: str = Field(..., alias="KAFKA_BROKERS")
producer: ProducerSettings = Field(default_factory=ProducerSettings)
consumer: ConsumerSettings = Field(default_factory=ConsumerSettings)
topics: list[str] = Field(default_factory=list)
@model_validator(mode="after")
def check_brokers_not_hardcoded(self) -> "KafkaSettings":
hardcoded = {"localhost:9092", "kafka:9092"}
if self.brokers in hardcoded and not self._is_dev_env():
raise ValueError("KAFKA_BROKERS must be set via env in non-dev environments")
return self
def _is_dev_env(self) -> bool:
import os
return os.getenv("APP_ENV", "dev") == "dev"
Что это даёт:
- При старте FastAPI проверяет все
Field(...)— еслиKAFKA_BROKERSне задан, приложение падает с ясным сообщением, не соKeyErrorв недрах aiokafka. - IDE и
mypyвидят типизированныйsettings.kafka.brokersвместоos.environ["KAFKA_BROKERS"]россыпью. model_validatorдополнительно запрещает hard-code адреса вне dev-среды.
Подробнее — Validation → @ConfigurationProperties.
Инициализация AIOKafkaProducer и AIOKafkaConsumer
R-KFK-CFG-2: полный пример с централизованной инициализацией.
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
from contextlib import asynccontextmanager
import json
_producer: AIOKafkaProducer | None = None
_consumer: AIOKafkaConsumer | None = None
def get_producer() -> AIOKafkaProducer:
assert _producer is not None, "Producer not initialized"
return _producer
@asynccontextmanager
async def kafka_lifespan(settings: KafkaSettings):
global _producer, _consumer
_producer = AIOKafkaProducer(
bootstrap_servers=settings.brokers,
enable_idempotence=settings.producer.enable_idempotence, # True
acks=settings.producer.acks, # "all"
request_timeout_ms=settings.producer.request_timeout_ms,
retry_backoff_ms=settings.producer.retry_backoff_ms,
key_serializer=lambda k: k.encode() if isinstance(k, str) else k,
value_serializer=lambda v: json.dumps(v).encode(),
)
_consumer = AIOKafkaConsumer(
*settings.topics,
bootstrap_servers=settings.brokers,
group_id=settings.consumer.group_id,
enable_auto_commit=settings.consumer.enable_auto_commit, # False
auto_offset_reset=settings.consumer.auto_offset_reset, # "earliest"
max_poll_interval_ms=settings.consumer.max_poll_interval_ms,
value_deserializer=lambda b: json.loads(b),
)
await _check_topics(settings) # fail-fast (R-KFK-CFG-4)
await _producer.start()
await _consumer.start()
try:
yield
finally:
await _producer.stop()
await _consumer.stop()
Каждая опция соответствует правилу:
enable_idempotence,acks— Producer.enable_auto_commit,auto_offset_reset— Consumer.observation-enabledаналог — Observability.
Статический реестр event_type → Pydantic-модель
R-KFK-CFG-3: allow-list десериализации.
from pydantic import BaseModel
from typing import Type
class OrderConfirmed(BaseModel):
event_id: str
order_id: str
customer_id: str
amount_rub: int
occurred_at: str
class ProductReserved(BaseModel):
event_id: str
product_id: str
quantity: int
occurred_at: str
class CustomerRegistered(BaseModel):
event_id: str
customer_id: str
occurred_at: str
EVENT_REGISTRY: dict[str, Type[BaseModel]] = {
"OrderConfirmed": OrderConfirmed,
"ProductReserved": ProductReserved,
"CustomerRegistered": CustomerRegistered,
}
def deserialize_event(event_type: str, payload: dict) -> BaseModel:
model_class = EVENT_REGISTRY.get(event_type)
if model_class is None:
raise ValueError(f"Unknown event_type: {event_type!r}")
return model_class.model_validate(payload)
Почему статический реестр, а не importlib:
# ЗАПРЕЩЕНО — R-KFK-CFG-X1
import importlib
def deserialize_dynamic(event_type: str, payload: dict):
module_name, class_name = event_type.rsplit(".", 1)
module = importlib.import_module(module_name) # импортирует ЛЮБОЙ модуль
cls = getattr(module, class_name) # инстанцирует ЛЮБОЙ класс
return cls(**payload)
Если в Kafka-сообщении event_type: "os.system" или любой другой gadget — importlib выполнит его. Это прямой аналог Java trusted.packages: '*'. Только статический EVENT_REGISTRY разрешает конкретные модели.
Pydantic дополнительно валидирует поля — если в payload нет обязательного event_id, model_validate бросает ValidationError до любой бизнес-логики.
Проверка топиков на старте
R-KFK-CFG-4: fail-fast при отсутствии ожидаемого топика.
from aiokafka.admin import AIOKafkaAdminClient
async def _check_topics(settings: KafkaSettings) -> None:
if not settings.topics:
return
admin = AIOKafkaAdminClient(bootstrap_servers=settings.brokers)
await admin.start()
try:
cluster_topics = await admin.list_topics()
missing = [t for t in settings.topics if t not in cluster_topics]
if missing:
raise RuntimeError(
f"Required Kafka topics not found: {missing}. "
"Create them before starting the service."
)
finally:
await admin.close()
Сценарий без этой проверки применительно к сервису сбора заказов Sber:
- Деплой v2.3 c новым consumer топика
sber.orders.risk-scored. - Команда инфраструктуры забыла создать топик в проде.
AIOKafkaConsumerстартует — aiokafka просто ждёт появления топика (поведение по умолчанию приauto_offset_reset="earliest").- Никаких событий не приходит, никаких алертов, функционал скоринга рисков не работает.
- Через две недели QA замечает аномалию в отчётах.
С _check_topics — lifespan падает при старте, K8s не поднимает pod, дежурный сразу видит ошибку.
В dev-среде топик можно создавать автоматически через AIOKafkaAdminClient.create_topics(...), но в prod/staging — только fail-fast.
Разделение конфигов по сервисам
При нескольких сервисах (заказы, продукты, клиенты) — отдельные KafkaSettings с разными group_id и topics:
# billing_service/config.py
kafka = KafkaSettings(
brokers=os.environ["KAFKA_BROKERS"],
consumer=ConsumerSettings(
group_id="billing-order-confirmed", # R-KFK-CONS-1
auto_offset_reset="earliest",
),
topics=["orders.confirmed"],
)
# inventory_service/config.py
kafka = KafkaSettings(
brokers=os.environ["KAFKA_BROKERS"],
consumer=ConsumerSettings(
group_id="inventory-order-confirmed", # уникален per-роль
auto_offset_reset="earliest",
),
topics=["orders.confirmed", "products.reserved"],
)
group_id всегда в формате <service>-<purpose> (R-KFK-CONS-1). Два сервиса с одним group_id делят партиции — каждое сообщение уходит только одному из них.
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
importlib.import_module по строке из payload | R-KFK-CFG-X1 | статический EVENT_REGISTRY |
bootstrap_servers="kafka-prod:9092" в коде | R-KFK-CFG-X2 | os.environ["KAFKA_BROKERS"] / pydantic-settings |
| Нет проверки топиков на старте | R-KFK-CFG-4 | _check_topics(settings) в lifespan |
KafkaSettings без валидации полей | R-KFK-CFG-1 | pydantic-settings + Field(...) |
enable_auto_commit=True в конфиге | R-KFK-CONS-2 | False + await consumer.commit() |
auto_offset_reset="latest" для critical-топиков | R-KFK-CONS-4 | "earliest" |
| Учётные данные брокера в коде | R-KFK-CFG-X2 | env / Vault / Secrets Manager |
Один group_id на разные consumer'ы сервиса | R-KFK-CONS-1 | уникальный <service>-<purpose> |
Куда дальше
- Producer —
enable_idempotence=True,acks="all", partition key. - Consumer — manual commit,
group_id,auto_offset_reset. - Security — TLS (
security_protocol="SSL"), ACL, credentials через env. - Observability — метрики, consumer lag alert,
traceparentв headers. - Event design — структура payload,
event_id, forward-compat. - Idempotent consumer —
processed_event, дедупликация поevent_id. - Outbox publishing — outbox-relay,
FOR UPDATE SKIP LOCKED, batch. - Retry topic + DLQ — retry-топики, DLQ, max-attempts.