Опирается на правила: R-KFK-PROD-1R-KFK-PROD-4 и R-KFK-PROD-X1R-KFK-PROD-X4 из Kafka Style Guide → раздел 1. Producer.

Важно знать

  • enable_idempotence=True всегда. Автоматически даёт acks="all", retries=MAX, max_in_flight ≤ 5. Exactly-once на уровне partition.
  • Partition key обязателен для всех бизнес-событий. Дефолтный ключ — aggregate id.
  • JSON-сериализация (value_serializer=lambda v: json.dumps(v).encode()) по умолчанию. Avro/Protobuf — только для bandwidth-чувствительных топиков.
  • producer.send_and_wait напрямую из use case handler для domain events — запрещён. События идут через outbox.
  • acks=0/1 — никогда. Только acks="all".
  • Без partition key round-robin нарушает ordering для aggregate.
  • Kafka не XA — в одной транзакции с PG быть не может. Только outbox.

Kafka producer — точка, где сервис «публикует факт» во внешний мир. Ошибка здесь — потерянное событие или дубликат, который downstream не отличит от настоящего. UCP формулирует правила так, чтобы producer всегда был exactly-once на partition и атомарен с DB через outbox.

enable_idempotence=True — всегда

R-KFK-PROD-1: один флаг включает три гарантии сразу.

from aiokafka import AIOKafkaProducer

producer = AIOKafkaProducer(
    bootstrap_servers=settings.brokers,
    enable_idempotence=True,
    value_serializer=lambda v: json.dumps(v).encode(),
    key_serializer=lambda k: k.encode(),
)

Что aiokafka делает с enable_idempotence=True:

  • acks="all" — broker подтверждает только после репликации.
  • retries=MAX — producer retry-ит до победы или session-timeout.
  • max_in_flight_requests_per_connection ≤ 5 — гарантия ordering между retries.

Producer получает producer-id от broker-а; broker дедуплицирует записи по (producer-id, sequence-number) на partition. Это даёт exactly-once на уровне partition. Если producer retry-ит — broker молча отбрасывает дубль.

Без enable_idempotence retry на стороне producer создаёт дубликаты в Kafka, downstream consumer видит одно событие N раз.

Конфигурация через pydantic-settings (R-KFK-CFG-1):

from pydantic_settings import BaseSettings

class KafkaSettings(BaseSettings):
    brokers: str
    client_id: str = "order-service"

    model_config = {"env_prefix": "KAFKA_"}

Partition key — обязателен

R-KFK-PROD-2: ключ определяет, на какой partition уйдёт сообщение.

import json
from uuid import UUID

class OrderEventPublisher:
    def __init__(self, producer: AIOKafkaProducer) -> None:
        self._producer = producer

    async def publish_confirmed(self, order_id: UUID, payload: dict) -> None:
        await self._producer.send_and_wait(
            topic="order-service.order.confirmed",
            key=str(order_id),
            value=payload,
        )

Дефолтный ключ — str(order.id) (aggregate id). Это гарантирует: все события одного order_id уходят на один partition, а внутри partition Kafka сохраняет порядок.

Без ключа — producer.send_and_wait(topic, value=payload) — round-robin между partitions. Сценарий поломки:

  1. OrderCreated(order_id=42) уходит на partition 1.
  2. OrderConfirmed(order_id=42) уходит на partition 5.
  3. Consumer для partition 5 обрабатывает OrderConfirmed до того, как consumer для partition 1 обработал OrderCreated.
  4. Downstream получает confirm для несуществующего заказа.

Правило: для каждого aggregate — стабильный ключ. Обычно — str(aggregate_id).

JSON по умолчанию

R-KFK-PROD-3: дефолтный serializer.

import json

producer = AIOKafkaProducer(
    bootstrap_servers=settings.brokers,
    enable_idempotence=True,
    key_serializer=lambda k: k.encode(),
    value_serializer=lambda v: json.dumps(v).encode(),
)

JSON прост в отладке (kafka-console-consumer показывает читаемый payload), не требует Schema Registry. Цена — больше байт на bandwidth.

Avro/Protobuf — для high-throughput топиков (миллиарды событий в сутки). Требует Schema Registry, дополнительной инфры и compatibility-check на CI. В UCP-сервисах не дефолт.

Не send из use case handler

R-KFK-PROD-4: domain events публикуются через outbox, не прямым producer.send_and_wait.

# ОШИБКА — Kafka и DB не атомарны
class ConfirmOrderHandler:
    def __init__(
        self,
        repo: OrderRepository,
        producer: AIOKafkaProducer,
        db: AsyncSession,
    ) -> None:
        self._repo = repo
        self._producer = producer
        self._db = db

    async def handle(self, command: ConfirmOrderCommand) -> Order:
        order = await self._repo.find_by_id(command.order_id)
        order.confirm()
        await self._repo.save(order)
        await self._db.commit()

        # publish уже ПОСЛЕ commit — если упадёт здесь, событие потеряно
        await self._producer.send_and_wait(
            topic="order-service.order.confirmed",
            key=str(order.id),
            value=OrderConfirmedEvent.from_order(order).model_dump(),
        )
        return order

Сценарии поломки:

  1. send_and_wait прошёл, commit упал с deadlock → событие опубликовано, в БД заказ не подтверждён. Inconsistency.
  2. commit прошёл, send_and_wait упал с network error → заказ подтверждён, downstream не знает.

Корректно — через outbox:

from dataclasses import dataclass
from datetime import datetime
from uuid import UUID, uuid7
import json

@dataclass(frozen=True)
class OutboxEvent:
    aggregate_type: str
    aggregate_id: UUID
    event_type: str
    payload: str
    topic: str
    partition_key: str


class ConfirmOrderHandler:
    def __init__(
        self,
        repo: OrderRepository,
        outbox_repo: OutboxRepository,
        db: AsyncSession,
    ) -> None:
        self._repo = repo
        self._outbox_repo = outbox_repo
        self._db = db

    async def handle(self, command: ConfirmOrderCommand) -> Order:
        async with self._db.begin():
            order = await self._repo.find_by_id_for_update(command.order_id)
            order.confirm()
            await self._repo.save(order)

            event = OrderConfirmedEvent(
                event_id=uuid7(),
                occurred_at=datetime.utcnow(),
                order_id=order.id,
                customer_id=order.customer_id,
                total_amount=str(order.total_amount),
            )
            await self._outbox_repo.append(OutboxEvent(
                aggregate_type="Order",
                aggregate_id=order.id,
                event_type="order.confirmed.v1",
                payload=json.dumps(event.model_dump()),
                topic="order-service.order.confirmed",
                partition_key=str(order.id),
            ))
        return order

Запись в outbox_event идёт в той же DB-транзакции, что save. Атомарность гарантирована PG. Отдельный outbox-relay читает unpublished и публикует через aiokafka. Подробнее — Outbox publishing.

Допустимый прямой producer.send_and_wait:

  • Технические audit-events (в дополнение к таблице audit_log).
  • Метрики / health-сигналы.
  • Команды другим сервисам без транзакционного контекста (например запрос на отчёт от admin-инструмента).

Lifecycle producer-а в FastAPI

Producer создаётся один раз при старте приложения и закрывается при остановке. Передаётся через DI.

from contextlib import asynccontextmanager
from fastapi import FastAPI
from aiokafka import AIOKafkaProducer
import json

_producer: AIOKafkaProducer | None = None


@asynccontextmanager
async def lifespan(app: FastAPI):
    global _producer
    _producer = AIOKafkaProducer(
        bootstrap_servers=settings.brokers,
        enable_idempotence=True,
        client_id=settings.client_id,
        key_serializer=lambda k: k.encode(),
        value_serializer=lambda v: json.dumps(v).encode(),
    )
    await _producer.start()
    app.state.producer = _producer
    try:
        yield
    finally:
        await _producer.stop()


app = FastAPI(lifespan=lifespan)

Создание нового AIOKafkaProducer на каждый запрос — антипаттерн: каждый start() делает handshake с брокером, получает новый producer-id, разрывает sequence-numbering идемпотентности.

Что запрещено

АнтипаттернПравилоЧто взамен
enable_idempotence=FalseR-KFK-PROD-X1True всегда
acks=0 / acks=1R-KFK-PROD-X2acks="all"
Send без partition key для бизнес-событийR-KFK-PROD-X3aggregate id как key
producer.send_and_wait из handler вместо outboxR-KFK-PROD-X4outbox-relay
Новый AIOKafkaProducer на каждый запросR-KFK-PROD-1singleton через lifespan
max_in_flight_requests_per_connection > 5R-KFK-PROD-1≤ 5 (auto при idempotence=True)
Агрегат целиком в payloadR-KFK-PROD-3явные поля (см. event-design)

Куда дальше

  • Конфигурация — KafkaSettings, bootstrap_servers, env-substitution.
  • Outbox publishing — relay через APScheduler/arq, FOR UPDATE SKIP LOCKED, batch.
  • Event design — payload format, event_id, версионирование.
  • Consumer — manual commit, group_id, idempotent listener.
  • Idempotent consumer — processed_event, дедупликация под race.
  • Retry topic + DLQ — retry-топики, DLQ, max-attempts.
  • Observability — prometheus-client, consumer lag, traceparent.
  • Security — TLS, ACL, SASL_SSL.