Опирается на правила: R-KFK-OBX-1R-KFK-OBX-4 и R-KFK-OBX-X1R-KFK-OBX-X3 из Kafka Style Guide → раздел 3. Outbox publishing.

Важно знать

  • Domain events публикуются через outbox-relay, не напрямую producer.send_and_wait(...) из handler.
  • Запись в outbox_event идёт в той же DB-транзакции, что бизнес-write. Либо обе commit, либо обе rollback.
  • Outbox-relay — отдельная asyncio-задача (APScheduler/arq), читает unpublished через SELECT ... FOR UPDATE SKIP LOCKED, публикует, проставляет published_at.
  • Topic name derives от event_type/aggregate_type: <service>.<aggregate-type>.<event-name>.
  • Relay в batch (10-50 events) — снижает overhead DB-poll и Kafka-roundtrip.
  • partial-index WHERE published_at IS NULL обязателен — без него full scan на каждом poll.
  • Публикация через after-commit hook без outbox — потеря событий при сбое между commit и publish.
  • producer.send_and_wait в той же транзакции, что DB-операция — Kafka не XA; rollback БД не откатит publish.

Outbox publishing — фундаментальный паттерн UCP. Все domain events публикуются через него. Это даёт at-least-once доставку с атомарностью «commit DB + публикация» через локальную транзакцию в PG. Теория — Distributed → outbox + inbox.

Схема outbox-таблицы

R-KFK-OBX-3, R-KFK-OBX-X3: схема с partial-индексом.

CREATE TABLE outbox_event (
    id              bigint GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
    event_id        uuid        NOT NULL UNIQUE,
    aggregate_type  text        NOT NULL,
    aggregate_id    text        NOT NULL,
    event_type      text        NOT NULL,
    payload         jsonb       NOT NULL,
    topic           text        NOT NULL,
    partition_key   text        NOT NULL,
    created_at      timestamptz NOT NULL DEFAULT now(),
    published_at    timestamptz
);

CREATE INDEX ix_outbox_event_unpublished
    ON outbox_event (id)
    WHERE published_at IS NULL;

WHERE published_at IS NULL — partial index. После публикации запись выпадает из индекса; relay сканирует только «горячий» хвост. Без него — full scan миллиардной таблицы на каждом poll, CPU БД в 100% через неделю.

Доменное событие

R-KFK-EVT-4: событие — @dataclass(frozen=True), десериализация на границе — Pydantic-модель.

from dataclasses import dataclass
from uuid import UUID
from decimal import Decimal

@dataclass(frozen=True)
class OrderConfirmedEvent:
    event_id: UUID
    occurred_at: str
    aggregate_id: UUID
    order_id: UUID
    customer_id: UUID
    total_amount: Decimal
    currency: str
    event_type: str = "order.confirmed.v1"

Запись в outbox из handler

R-KFK-OBX-1: write-handler пишет в outbox_event в той же транзакции.

import uuid
import json
from dataclasses import asdict
from datetime import datetime, timezone

from sqlalchemy.ext.asyncio import AsyncSession

from app.orders.domain.event import OrderConfirmedEvent
from app.orders.domain.repository import OrderRepository

class ConfirmOrderHandler:
    def __init__(self, session: AsyncSession, order_repo: OrderRepository) -> None:
        self._session = session
        self._order_repo = order_repo

    async def handle(self, order_id: uuid.UUID) -> None:
        async with self._session.begin():
            order = await self._order_repo.get_for_update(order_id)
            order.confirm()
            await self._order_repo.save(order)

            event = OrderConfirmedEvent(
                event_id=uuid.uuid7(),
                occurred_at=datetime.now(timezone.utc).isoformat(),
                aggregate_id=order.id,
                order_id=order.id,
                customer_id=order.customer_id,
                total_amount=order.total_amount,
                currency=order.currency,
            )

            await self._session.execute(
                """
                INSERT INTO outbox_event
                    (event_id, aggregate_type, aggregate_id, event_type, payload, topic, partition_key)
                VALUES
                    (:event_id, :aggregate_type, :aggregate_id, :event_type, :payload, :topic, :partition_key)
                """,
                {
                    "event_id": str(event.event_id),
                    "aggregate_type": "Order",
                    "aggregate_id": str(order.id),
                    "event_type": event.event_type,
                    "payload": json.dumps(asdict(event), default=str),
                    "topic": "order-service.order.confirmed",
                    "partition_key": str(order.id),
                },
            )

Атомарность гарантирует PG: session.begin() охватывает и бизнес-write, и запись в outbox. Никакой XA с Kafka не нужен.

Outbox-relay

R-KFK-OBX-2: отдельная asyncio-задача. Пример через APScheduler:

import asyncio
import json
import logging
from datetime import datetime, timezone

from aiokafka import AIOKafkaProducer
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker

logger = logging.getLogger(__name__)

class OutboxRelay:
    def __init__(
        self,
        session_factory: async_sessionmaker[AsyncSession],
        producer: AIOKafkaProducer,
        batch_size: int = 50,
    ) -> None:
        self._session_factory = session_factory
        self._producer = producer
        self._batch_size = batch_size

    async def publish(self) -> None:
        async with self._session_factory() as session:
            async with session.begin():
                rows = (
                    await session.execute(
                        """
                        SELECT id, event_id, topic, partition_key, payload
                        FROM outbox_event
                        WHERE published_at IS NULL
                        ORDER BY id
                        LIMIT :limit
                        FOR UPDATE SKIP LOCKED
                        """,
                        {"limit": self._batch_size},
                    )
                ).fetchall()

                if not rows:
                    return

                for row in rows:
                    try:
                        await self._producer.send_and_wait(
                            topic=row.topic,
                            key=row.partition_key.encode(),
                            value=json.dumps(row.payload).encode(),
                        )
                        await session.execute(
                            "UPDATE outbox_event SET published_at = :now WHERE id = :id",
                            {"now": datetime.now(timezone.utc), "id": row.id},
                        )
                    except Exception:
                        logger.exception(
                            "Failed to publish outbox event id=%s topic=%s",
                            row.id,
                            row.topic,
                        )
                        raise


def start_relay(relay: OutboxRelay) -> AsyncIOScheduler:
    scheduler = AsyncIOScheduler()
    scheduler.add_job(relay.publish, "interval", seconds=0.5)
    scheduler.start()
    return scheduler

FOR UPDATE SKIP LOCKED — несколько pod-ов relay гонят параллельно, каждый забирает свою порцию без блокировок. Горизонтальное масштабирование без координации.

interval=0.5s даёт типичную задержку ~500ms. Для критичных flow — 0.1s, для аналитики — 5s.

Подключение через lifespan (FastAPI)

from contextlib import asynccontextmanager
from aiokafka import AIOKafkaProducer
from fastapi import FastAPI

from app.kafka.outbox_relay import OutboxRelay, start_relay
from app.config import Settings

@asynccontextmanager
async def lifespan(app: FastAPI):
    settings = Settings()
    producer = AIOKafkaProducer(
        bootstrap_servers=settings.kafka_brokers,
        enable_idempotence=True,
        value_serializer=lambda v: v,
        key_serializer=lambda k: k,
    )
    await producer.start()

    relay = OutboxRelay(session_factory=app.state.session_factory, producer=producer)
    scheduler = start_relay(relay)

    yield

    scheduler.shutdown(wait=False)
    await producer.stop()

app = FastAPI(lifespan=lifespan)

Producer запускается один раз при старте приложения и разделяется между outbox-relay и всеми handler-ами, которым нужен прямой send (для нон-доменных событий).

Topic naming

R-KFK-OBX-3: convention <service>.<aggregate-type>.<event-name>.

СервисTopic
order-serviceorder-service.order.created
order-serviceorder-service.order.confirmed
order-serviceorder-service.order.cancelled
payment-servicepayment-service.payment.charged
payment-servicepayment-service.payment.refunded
customer-servicecustomer-service.customer.registered

Альтернатива — один топик на aggregate с разными event_type в payload:

order-service.order  → {"event_type": "OrderCreated.v1", ...}
                     → {"event_type": "OrderConfirmed.v1", ...}
                     → {"event_type": "OrderCancelled.v1", ...}

Удобнее для consumer-side, который хочет «все события по заказу» — один consumer читает всё, фильтрует по event_type. Цена — нельзя subscribe только на OrderCancelled без чтения всего топика.

Batch-обработка

R-KFK-OBX-4: relay читает 10-50 events за раз.

LIMIT :limit  -- 50 по умолчанию

Почему не по одному:

  • DB-poll overhead — каждый запрос ~1-2ms даже с indexed scan.
  • Kafka roundtripsend_and_wait ждёт ACK, ~5-20ms per message.
  • При 100 events/s по одному — relay постоянно загружен, latency растёт.

С batch 50 — один запрос поднимает 50 events, отправляет последовательно через send_and_wait (порядок per-partition сохранён с enable_idempotence=True), published_at проставляется в одном UPDATE. Throughput x10-20.

Pydantic-конфигурация relay

R-KFK-CFG-1: все параметры через pydantic-settings.

from pydantic_settings import BaseSettings

class KafkaSettings(BaseSettings):
    kafka_brokers: str
    outbox_relay_interval_seconds: float = 0.5
    outbox_relay_batch_size: int = 50

    model_config = {"env_file": ".env", "env_file_encoding": "utf-8"}

bootstrap_servers — из env, не хардкодом (R-KFK-CFG-X2).

Что запрещено

АнтипаттернПравилоЧто взамен
producer.send_and_wait в той же транзакции, что DB-writeR-KFK-OBX-X1outbox в той же транзакции, relay публикует отдельно
Публикация через after-commit hook без outboxR-KFK-OBX-X2outbox-relay (retry автоматически)
Outbox без published_at partial indexR-KFK-OBX-X3WHERE published_at IS NULL
Relay без FOR UPDATE SKIP LOCKEDR-KFK-OBX-2параллельные relay-поды через SKIP LOCKED
Relay по одному событиюR-KFK-OBX-4batch 10-50
Outbox без event_id UNIQUER-KFK-OBX-1UNIQUE constraint защищает от двойной записи
producer.send_and_wait без enable_idempotence=TrueR-KFK-PROD-X1enable_idempotence=True в producer

Куда дальше

  • Producer — почему нельзя send напрямую из handler.
  • Idempotent consumer — receiver side at-least-once.
  • Event design — формат payload в outbox.
  • Consumer — manual commit, group_id, auto_offset_reset.
  • Retry topic + DLQ — retry-топики и DLQ.
  • Observability — lag alert, traceparent в headers.
  • Конфигурация — KafkaSettings через pydantic-settings.
  • Security — TLS, ACL, PII в топиках.