Опирается на правила: R-KFK-OBX-1R-KFK-OBX-4 и R-KFK-OBX-X1R-KFK-OBX-X3 из Kafka Style Guide → раздел 3. Outbox publishing.

Важно знать

  • Domain events публикуются через outbox-relay, не напрямую producer.send_and_wait(...) из handler.
  • Запись в outbox_event идёт в той же DB-транзакции, что бизнес-write. Либо обе commit, либо обе rollback.
  • Outbox-relay — отдельная asyncio-задача (APScheduler/arq), читает unpublished через SELECT ... FOR UPDATE SKIP LOCKED, публикует, проставляет published_at.
  • Topic name derives от event_type/aggregate_type: <service>.<aggregate-type>.<event-name>.
  • Relay в batch (10-50 events) — снижает overhead DB-poll и Kafka-roundtrip.
  • partial-index WHERE published_at IS NULL обязателен — без него full scan на каждом poll.
  • Публикация через after-commit hook без outbox — потеря событий при сбое между commit и publish.
  • producer.send_and_wait в той же транзакции, что DB-операция — Kafka не XA; rollback БД не откатит publish.

Outbox publishing — фундаментальный паттерн UCP. Все domain events публикуются через него. Это даёт at-least-once доставку с атомарностью «commit DB + публикация» через локальную транзакцию в PG. Теория — Distributed → outbox + inbox.

Схема outbox-таблицы

R-KFK-OBX-3, R-KFK-OBX-X3: схема с partial-индексом.

CREATE TABLE outbox_event (
    id              bigint GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
    event_id        uuid        NOT NULL UNIQUE,
    aggregate_type  text        NOT NULL,
    aggregate_id    text        NOT NULL,
    event_type      text        NOT NULL,
    payload         jsonb       NOT NULL,
    topic           text        NOT NULL,
    partition_key   text        NOT NULL,
    created_at      timestamptz NOT NULL DEFAULT now(),
    published_at    timestamptz
);

CREATE INDEX ix_outbox_event_unpublished
    ON outbox_event (id)
    WHERE published_at IS NULL;

WHERE published_at IS NULL — partial index. После публикации запись выпадает из индекса; relay сканирует только «горячий» хвост. Без него — full scan миллиардной таблицы на каждом poll, CPU БД в 100% через неделю.

Доменное событие

R-KFK-EVT-4: событие — @dataclass(frozen=True), десериализация на границе — Pydantic-модель.

from dataclasses import dataclass
from uuid import UUID
from decimal import Decimal

@dataclass(frozen=True)
class OrderConfirmedEvent:
    event_id: UUID
    occurred_at: str
    aggregate_id: UUID
    order_id: UUID
    customer_id: UUID
    total_amount: Decimal
    currency: str
    event_type: str = "order.confirmed.v1"

Запись в outbox из handler

R-KFK-OBX-1: write-handler пишет в outbox_event в той же транзакции.

import uuid
from uuid6 import uuid7  # пакет uuid6 (PyPI) — UUID v7
import json
from dataclasses import asdict
from datetime import datetime, timezone

from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession

from app.orders.domain.event import OrderConfirmedEvent
from app.orders.domain.repository import OrderRepository

class ConfirmOrderHandler:
    def __init__(self, session: AsyncSession, order_repo: OrderRepository) -> None:
        self._session = session
        self._order_repo = order_repo

    async def handle(self, order_id: uuid.UUID) -> None:
        async with self._session.begin():
            order = await self._order_repo.get_for_update(order_id)
            order.confirm()
            await self._order_repo.save(order)

            event = OrderConfirmedEvent(
                event_id=uuid7(),
                occurred_at=datetime.now(timezone.utc).isoformat(),
                aggregate_id=order.id,
                order_id=order.id,
                customer_id=order.customer_id,
                total_amount=order.total_amount,
                currency=order.currency,
            )

            await self._session.execute(
                text("""
                INSERT INTO outbox_event
                    (event_id, aggregate_type, aggregate_id, event_type, payload, topic, partition_key)
                VALUES
                    (:event_id, :aggregate_type, :aggregate_id, :event_type, :payload, :topic, :partition_key)
                """),
                {
                    "event_id": str(event.event_id),
                    "aggregate_type": "Order",
                    "aggregate_id": str(order.id),
                    "event_type": event.event_type,
                    "payload": json.dumps(asdict(event), default=str),
                    "topic": "order-service.order.confirmed",
                    "partition_key": str(order.id),
                },
            )

Атомарность гарантирует PG: session.begin() охватывает и бизнес-write, и запись в outbox. Никакой XA с Kafka не нужен.

Outbox-relay

R-KFK-OBX-2: отдельная asyncio-задача. Пример через APScheduler:

import asyncio
import json
import logging
from datetime import datetime, timezone

from sqlalchemy import text
from aiokafka import AIOKafkaProducer
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker

logger = logging.getLogger(__name__)

class OutboxRelay:
    def __init__(
        self,
        session_factory: async_sessionmaker[AsyncSession],
        producer: AIOKafkaProducer,
        batch_size: int = 50,
    ) -> None:
        self._session_factory = session_factory
        self._producer = producer
        self._batch_size = batch_size

    async def publish(self) -> None:
        async with self._session_factory() as session:
            async with session.begin():
                rows = (
                    await session.execute(
                        text("""
                        SELECT id, event_id, topic, partition_key, payload
                        FROM outbox_event
                        WHERE published_at IS NULL
                        ORDER BY id
                        LIMIT :limit
                        FOR UPDATE SKIP LOCKED
                        """),
                        {"limit": self._batch_size},
                    )
                ).fetchall()

                if not rows:
                    return

                for row in rows:
                    try:
                        await self._producer.send_and_wait(
                            topic=row.topic,
                            key=row.partition_key.encode(),
                            value=json.dumps(row.payload).encode(),
                        )
                        await session.execute(
                            text("UPDATE outbox_event SET published_at = :now WHERE id = :id"),
                            {"now": datetime.now(timezone.utc), "id": row.id},
                        )
                    except Exception:
                        logger.exception(
                            "Failed to publish outbox event id=%s topic=%s",
                            row.id,
                            row.topic,
                        )
                        raise


def start_relay(relay: OutboxRelay) -> AsyncIOScheduler:
    scheduler = AsyncIOScheduler()
    scheduler.add_job(relay.publish, "interval", seconds=0.5)
    scheduler.start()
    return scheduler

FOR UPDATE SKIP LOCKED — несколько pod-ов relay гонят параллельно, каждый забирает свою порцию без блокировок. Горизонтальное масштабирование без координации.

interval=0.5s даёт типичную задержку ~500ms. Для критичных flow — 0.1s, для аналитики — 5s.

Подключение через lifespan (FastAPI)

from contextlib import asynccontextmanager
from aiokafka import AIOKafkaProducer
from fastapi import FastAPI

from app.kafka.outbox_relay import OutboxRelay, start_relay
from app.config import Settings

@asynccontextmanager
async def lifespan(app: FastAPI):
    settings = Settings()
    producer = AIOKafkaProducer(
        bootstrap_servers=settings.kafka_brokers,
        enable_idempotence=True,
        value_serializer=lambda v: v,
        key_serializer=lambda k: k,
    )
    await producer.start()

    relay = OutboxRelay(session_factory=app.state.session_factory, producer=producer)
    scheduler = start_relay(relay)

    yield

    scheduler.shutdown(wait=False)
    await producer.stop()

app = FastAPI(lifespan=lifespan)

Producer запускается один раз при старте приложения и разделяется между outbox-relay и всеми handler-ами, которым нужен прямой send (для нон-доменных событий).

Topic naming

R-KFK-OBX-3: convention <service>.<aggregate-type>.<event-name>.

СервисTopic
order-serviceorder-service.order.created
order-serviceorder-service.order.confirmed
order-serviceorder-service.order.cancelled
payment-servicepayment-service.payment.charged
payment-servicepayment-service.payment.refunded
customer-servicecustomer-service.customer.registered

Альтернатива — один топик на aggregate с разными event_type в payload:

order-service.order  → {"event_type": "OrderCreated.v1", ...}
                     → {"event_type": "OrderConfirmed.v1", ...}
                     → {"event_type": "OrderCancelled.v1", ...}

Удобнее для consumer-side, который хочет «все события по заказу» — один consumer читает всё, фильтрует по event_type. Цена — нельзя subscribe только на OrderCancelled без чтения всего топика.

Batch-обработка

R-KFK-OBX-4: relay читает 10-50 events за раз.

LIMIT :limit  -- 50 по умолчанию

Почему не по одному:

  • DB-poll overhead — каждый запрос ~1-2ms даже с indexed scan.
  • Kafka roundtripsend_and_wait ждёт ACK, ~5-20ms per message.
  • При 100 events/s по одному — relay постоянно загружен, latency растёт.

С batch 50 — один запрос поднимает 50 events, отправляет последовательно через send_and_wait (порядок per-partition сохранён с enable_idempotence=True), published_at проставляется в одном UPDATE. Throughput x10-20.

Pydantic-конфигурация relay

R-KFK-CFG-1: все параметры через pydantic-settings.

from pydantic_settings import BaseSettings

class KafkaSettings(BaseSettings):
    kafka_brokers: str
    outbox_relay_interval_seconds: float = 0.5
    outbox_relay_batch_size: int = 50

    model_config = {"env_file": ".env", "env_file_encoding": "utf-8"}

bootstrap_servers — из env, не хардкодом (R-KFK-CFG-X2).

Что запрещено

АнтипаттернПравилоЧто взамен
producer.send_and_wait в той же транзакции, что DB-writeR-KFK-OBX-X1outbox в той же транзакции, relay публикует отдельно
Публикация через after-commit hook без outboxR-KFK-OBX-X2outbox-relay (retry автоматически)
Outbox без published_at partial indexR-KFK-OBX-X3WHERE published_at IS NULL
Relay без FOR UPDATE SKIP LOCKEDR-KFK-OBX-2параллельные relay-поды через SKIP LOCKED
Relay по одному событиюR-KFK-OBX-4batch 10-50
Outbox без event_id UNIQUER-KFK-OBX-1UNIQUE constraint защищает от двойной записи
producer.send_and_wait без enable_idempotence=TrueR-KFK-PROD-X1enable_idempotence=True в producer

Куда дальше

  • Producer — почему нельзя send напрямую из handler.
  • Idempotent consumer — receiver side at-least-once.
  • Event design — формат payload в outbox.
  • Consumer — manual commit, group_id, auto_offset_reset.
  • Retry topic + DLQ — retry-топики и DLQ.
  • Observability — lag alert, traceparent в headers.
  • Конфигурация — KafkaSettings через pydantic-settings.
  • Security — TLS, ACL, PII в топиках.