Опирается на правила:
R-KFK-PROD-1…R-KFK-PROD-4иR-KFK-PROD-X1…R-KFK-PROD-X4из Kafka Style Guide → раздел 1. Producer.
Важно знать
enable_idempotence=Trueвсегда. Автоматически даётacks="all",retries=MAX,max_in_flight ≤ 5. Exactly-once на уровне partition.- Partition key обязателен для всех бизнес-событий. Дефолтный ключ — aggregate id.
- JSON-сериализация (
value_serializer=lambda v: json.dumps(v).encode()) по умолчанию. Avro/Protobuf — только для bandwidth-чувствительных топиков.producer.send_and_waitнапрямую из use case handler для domain events — запрещён. События идут через outbox.acks=0/1— никогда. Толькоacks="all".- Без partition key round-robin нарушает ordering для aggregate.
- Kafka не XA — в одной транзакции с PG быть не может. Только outbox.
Kafka producer — точка, где сервис «публикует факт» во внешний мир. Ошибка здесь — потерянное событие или дубликат, который downstream не отличит от настоящего. UCP формулирует правила так, чтобы producer всегда был exactly-once на partition и атомарен с DB через outbox.
enable_idempotence=True — всегда
R-KFK-PROD-1: один флаг включает три гарантии сразу.
from aiokafka import AIOKafkaProducer
producer = AIOKafkaProducer(
bootstrap_servers=settings.brokers,
enable_idempotence=True,
value_serializer=lambda v: json.dumps(v).encode(),
key_serializer=lambda k: k.encode(),
)
Что aiokafka делает с enable_idempotence=True:
acks="all"— broker подтверждает только после репликации.retries=MAX— producer retry-ит до победы или session-timeout.max_in_flight_requests_per_connection ≤ 5— гарантия ordering между retries.
Producer получает producer-id от broker-а; broker дедуплицирует записи по (producer-id, sequence-number) на partition. Это даёт exactly-once на уровне partition. Если producer retry-ит — broker молча отбрасывает дубль.
Без enable_idempotence retry на стороне producer создаёт дубликаты в Kafka, downstream consumer видит одно событие N раз.
Конфигурация через pydantic-settings (R-KFK-CFG-1):
from pydantic_settings import BaseSettings
class KafkaSettings(BaseSettings):
brokers: str
client_id: str = "order-service"
model_config = {"env_prefix": "KAFKA_"}
Partition key — обязателен
R-KFK-PROD-2: ключ определяет, на какой partition уйдёт сообщение.
import json
from uuid import UUID
class OrderEventPublisher:
def __init__(self, producer: AIOKafkaProducer) -> None:
self._producer = producer
async def publish_confirmed(self, order_id: UUID, payload: dict) -> None:
await self._producer.send_and_wait(
topic="order-service.order.confirmed",
key=str(order_id),
value=payload,
)
Дефолтный ключ — str(order.id) (aggregate id). Это гарантирует: все события одного order_id уходят на один partition, а внутри partition Kafka сохраняет порядок.
Без ключа — producer.send_and_wait(topic, value=payload) — round-robin между partitions. Сценарий поломки:
OrderCreated(order_id=42)уходит на partition 1.OrderConfirmed(order_id=42)уходит на partition 5.- Consumer для partition 5 обрабатывает
OrderConfirmedдо того, как consumer для partition 1 обработалOrderCreated. - Downstream получает confirm для несуществующего заказа.
Правило: для каждого aggregate — стабильный ключ. Обычно — str(aggregate_id).
JSON по умолчанию
R-KFK-PROD-3: дефолтный serializer.
import json
producer = AIOKafkaProducer(
bootstrap_servers=settings.brokers,
enable_idempotence=True,
key_serializer=lambda k: k.encode(),
value_serializer=lambda v: json.dumps(v).encode(),
)
JSON прост в отладке (kafka-console-consumer показывает читаемый payload), не требует Schema Registry. Цена — больше байт на bandwidth.
Avro/Protobuf — для high-throughput топиков (миллиарды событий в сутки). Требует Schema Registry, дополнительной инфры и compatibility-check на CI. В UCP-сервисах не дефолт.
Не send из use case handler
R-KFK-PROD-4: domain events публикуются через outbox, не прямым producer.send_and_wait.
# ОШИБКА — Kafka и DB не атомарны
class ConfirmOrderHandler:
def __init__(
self,
repo: OrderRepository,
producer: AIOKafkaProducer,
db: AsyncSession,
) -> None:
self._repo = repo
self._producer = producer
self._db = db
async def handle(self, command: ConfirmOrderCommand) -> Order:
order = await self._repo.find_by_id(command.order_id)
order.confirm()
await self._repo.save(order)
await self._db.commit()
# publish уже ПОСЛЕ commit — если упадёт здесь, событие потеряно
await self._producer.send_and_wait(
topic="order-service.order.confirmed",
key=str(order.id),
value=OrderConfirmedEvent.from_order(order).model_dump(),
)
return order
Сценарии поломки:
send_and_waitпрошёл,commitупал с deadlock → событие опубликовано, в БД заказ не подтверждён. Inconsistency.commitпрошёл,send_and_waitупал с network error → заказ подтверждён, downstream не знает.
Корректно — через outbox:
from dataclasses import dataclass
from datetime import datetime
from uuid import UUID, uuid7
import json
@dataclass(frozen=True)
class OutboxEvent:
aggregate_type: str
aggregate_id: UUID
event_type: str
payload: str
topic: str
partition_key: str
class ConfirmOrderHandler:
def __init__(
self,
repo: OrderRepository,
outbox_repo: OutboxRepository,
db: AsyncSession,
) -> None:
self._repo = repo
self._outbox_repo = outbox_repo
self._db = db
async def handle(self, command: ConfirmOrderCommand) -> Order:
async with self._db.begin():
order = await self._repo.find_by_id_for_update(command.order_id)
order.confirm()
await self._repo.save(order)
event = OrderConfirmedEvent(
event_id=uuid7(),
occurred_at=datetime.utcnow(),
order_id=order.id,
customer_id=order.customer_id,
total_amount=str(order.total_amount),
)
await self._outbox_repo.append(OutboxEvent(
aggregate_type="Order",
aggregate_id=order.id,
event_type="order.confirmed.v1",
payload=json.dumps(event.model_dump()),
topic="order-service.order.confirmed",
partition_key=str(order.id),
))
return order
Запись в outbox_event идёт в той же DB-транзакции, что save. Атомарность гарантирована PG. Отдельный outbox-relay читает unpublished и публикует через aiokafka. Подробнее — Outbox publishing.
Допустимый прямой producer.send_and_wait:
- Технические audit-events (в дополнение к таблице
audit_log). - Метрики / health-сигналы.
- Команды другим сервисам без транзакционного контекста (например запрос на отчёт от admin-инструмента).
Lifecycle producer-а в FastAPI
Producer создаётся один раз при старте приложения и закрывается при остановке. Передаётся через DI.
from contextlib import asynccontextmanager
from fastapi import FastAPI
from aiokafka import AIOKafkaProducer
import json
_producer: AIOKafkaProducer | None = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global _producer
_producer = AIOKafkaProducer(
bootstrap_servers=settings.brokers,
enable_idempotence=True,
client_id=settings.client_id,
key_serializer=lambda k: k.encode(),
value_serializer=lambda v: json.dumps(v).encode(),
)
await _producer.start()
app.state.producer = _producer
try:
yield
finally:
await _producer.stop()
app = FastAPI(lifespan=lifespan)
Создание нового AIOKafkaProducer на каждый запрос — антипаттерн: каждый start() делает handshake с брокером, получает новый producer-id, разрывает sequence-numbering идемпотентности.
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
enable_idempotence=False | R-KFK-PROD-X1 | True всегда |
acks=0 / acks=1 | R-KFK-PROD-X2 | acks="all" |
| Send без partition key для бизнес-событий | R-KFK-PROD-X3 | aggregate id как key |
producer.send_and_wait из handler вместо outbox | R-KFK-PROD-X4 | outbox-relay |
Новый AIOKafkaProducer на каждый запрос | R-KFK-PROD-1 | singleton через lifespan |
max_in_flight_requests_per_connection > 5 | R-KFK-PROD-1 | ≤ 5 (auto при idempotence=True) |
| Агрегат целиком в payload | R-KFK-PROD-3 | явные поля (см. event-design) |
Куда дальше
- Конфигурация —
KafkaSettings, bootstrap_servers, env-substitution. - Outbox publishing — relay через APScheduler/arq,
FOR UPDATE SKIP LOCKED, batch. - Event design — payload format,
event_id, версионирование. - Consumer — manual commit, group_id, idempotent listener.
- Idempotent consumer —
processed_event, дедупликация под race. - Retry topic + DLQ — retry-топики, DLQ, max-attempts.
- Observability — prometheus-client, consumer lag, traceparent.
- Security — TLS, ACL, SASL_SSL.