Опирается на правила:
R-KFK-OBX-1…R-KFK-OBX-4иR-KFK-OBX-X1…R-KFK-OBX-X3из Kafka Style Guide → раздел 3. Outbox publishing.
Важно знать
- Domain events публикуются через outbox-relay, не напрямую
producer.send_and_wait(...)из handler.- Запись в
outbox_eventидёт в той же DB-транзакции, что бизнес-write. Либо обе commit, либо обе rollback.- Outbox-relay — отдельная asyncio-задача (APScheduler/arq), читает unpublished через
SELECT ... FOR UPDATE SKIP LOCKED, публикует, проставляетpublished_at.- Topic name derives от
event_type/aggregate_type:<service>.<aggregate-type>.<event-name>.- Relay в batch (10-50 events) — снижает overhead DB-poll и Kafka-roundtrip.
partial-index WHERE published_at IS NULLобязателен — без него full scan на каждом poll.- Публикация через after-commit hook без outbox — потеря событий при сбое между commit и publish.
producer.send_and_waitв той же транзакции, что DB-операция — Kafka не XA; rollback БД не откатит publish.
Outbox publishing — фундаментальный паттерн UCP. Все domain events публикуются через него. Это даёт at-least-once доставку с атомарностью «commit DB + публикация» через локальную транзакцию в PG. Теория — Distributed → outbox + inbox.
Схема outbox-таблицы
R-KFK-OBX-3, R-KFK-OBX-X3: схема с partial-индексом.
CREATE TABLE outbox_event (
id bigint GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
event_id uuid NOT NULL UNIQUE,
aggregate_type text NOT NULL,
aggregate_id text NOT NULL,
event_type text NOT NULL,
payload jsonb NOT NULL,
topic text NOT NULL,
partition_key text NOT NULL,
created_at timestamptz NOT NULL DEFAULT now(),
published_at timestamptz
);
CREATE INDEX ix_outbox_event_unpublished
ON outbox_event (id)
WHERE published_at IS NULL;
WHERE published_at IS NULL — partial index. После публикации запись выпадает из индекса; relay сканирует только «горячий» хвост. Без него — full scan миллиардной таблицы на каждом poll, CPU БД в 100% через неделю.
Доменное событие
R-KFK-EVT-4: событие — @dataclass(frozen=True), десериализация на границе — Pydantic-модель.
from dataclasses import dataclass
from uuid import UUID
from decimal import Decimal
@dataclass(frozen=True)
class OrderConfirmedEvent:
event_id: UUID
occurred_at: str
aggregate_id: UUID
order_id: UUID
customer_id: UUID
total_amount: Decimal
currency: str
event_type: str = "order.confirmed.v1"
Запись в outbox из handler
R-KFK-OBX-1: write-handler пишет в outbox_event в той же транзакции.
import uuid
from uuid6 import uuid7 # пакет uuid6 (PyPI) — UUID v7
import json
from dataclasses import asdict
from datetime import datetime, timezone
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
from app.orders.domain.event import OrderConfirmedEvent
from app.orders.domain.repository import OrderRepository
class ConfirmOrderHandler:
def __init__(self, session: AsyncSession, order_repo: OrderRepository) -> None:
self._session = session
self._order_repo = order_repo
async def handle(self, order_id: uuid.UUID) -> None:
async with self._session.begin():
order = await self._order_repo.get_for_update(order_id)
order.confirm()
await self._order_repo.save(order)
event = OrderConfirmedEvent(
event_id=uuid7(),
occurred_at=datetime.now(timezone.utc).isoformat(),
aggregate_id=order.id,
order_id=order.id,
customer_id=order.customer_id,
total_amount=order.total_amount,
currency=order.currency,
)
await self._session.execute(
text("""
INSERT INTO outbox_event
(event_id, aggregate_type, aggregate_id, event_type, payload, topic, partition_key)
VALUES
(:event_id, :aggregate_type, :aggregate_id, :event_type, :payload, :topic, :partition_key)
"""),
{
"event_id": str(event.event_id),
"aggregate_type": "Order",
"aggregate_id": str(order.id),
"event_type": event.event_type,
"payload": json.dumps(asdict(event), default=str),
"topic": "order-service.order.confirmed",
"partition_key": str(order.id),
},
)
Атомарность гарантирует PG: session.begin() охватывает и бизнес-write, и запись в outbox. Никакой XA с Kafka не нужен.
Outbox-relay
R-KFK-OBX-2: отдельная asyncio-задача. Пример через APScheduler:
import asyncio
import json
import logging
from datetime import datetime, timezone
from sqlalchemy import text
from aiokafka import AIOKafkaProducer
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
logger = logging.getLogger(__name__)
class OutboxRelay:
def __init__(
self,
session_factory: async_sessionmaker[AsyncSession],
producer: AIOKafkaProducer,
batch_size: int = 50,
) -> None:
self._session_factory = session_factory
self._producer = producer
self._batch_size = batch_size
async def publish(self) -> None:
async with self._session_factory() as session:
async with session.begin():
rows = (
await session.execute(
text("""
SELECT id, event_id, topic, partition_key, payload
FROM outbox_event
WHERE published_at IS NULL
ORDER BY id
LIMIT :limit
FOR UPDATE SKIP LOCKED
"""),
{"limit": self._batch_size},
)
).fetchall()
if not rows:
return
for row in rows:
try:
await self._producer.send_and_wait(
topic=row.topic,
key=row.partition_key.encode(),
value=json.dumps(row.payload).encode(),
)
await session.execute(
text("UPDATE outbox_event SET published_at = :now WHERE id = :id"),
{"now": datetime.now(timezone.utc), "id": row.id},
)
except Exception:
logger.exception(
"Failed to publish outbox event id=%s topic=%s",
row.id,
row.topic,
)
raise
def start_relay(relay: OutboxRelay) -> AsyncIOScheduler:
scheduler = AsyncIOScheduler()
scheduler.add_job(relay.publish, "interval", seconds=0.5)
scheduler.start()
return scheduler
FOR UPDATE SKIP LOCKED — несколько pod-ов relay гонят параллельно, каждый забирает свою порцию без блокировок. Горизонтальное масштабирование без координации.
interval=0.5s даёт типичную задержку ~500ms. Для критичных flow — 0.1s, для аналитики — 5s.
Подключение через lifespan (FastAPI)
from contextlib import asynccontextmanager
from aiokafka import AIOKafkaProducer
from fastapi import FastAPI
from app.kafka.outbox_relay import OutboxRelay, start_relay
from app.config import Settings
@asynccontextmanager
async def lifespan(app: FastAPI):
settings = Settings()
producer = AIOKafkaProducer(
bootstrap_servers=settings.kafka_brokers,
enable_idempotence=True,
value_serializer=lambda v: v,
key_serializer=lambda k: k,
)
await producer.start()
relay = OutboxRelay(session_factory=app.state.session_factory, producer=producer)
scheduler = start_relay(relay)
yield
scheduler.shutdown(wait=False)
await producer.stop()
app = FastAPI(lifespan=lifespan)
Producer запускается один раз при старте приложения и разделяется между outbox-relay и всеми handler-ами, которым нужен прямой send (для нон-доменных событий).
Topic naming
R-KFK-OBX-3: convention <service>.<aggregate-type>.<event-name>.
| Сервис | Topic |
|---|---|
| order-service | order-service.order.created |
| order-service | order-service.order.confirmed |
| order-service | order-service.order.cancelled |
| payment-service | payment-service.payment.charged |
| payment-service | payment-service.payment.refunded |
| customer-service | customer-service.customer.registered |
Альтернатива — один топик на aggregate с разными event_type в payload:
order-service.order → {"event_type": "OrderCreated.v1", ...}
→ {"event_type": "OrderConfirmed.v1", ...}
→ {"event_type": "OrderCancelled.v1", ...}
Удобнее для consumer-side, который хочет «все события по заказу» — один consumer читает всё, фильтрует по event_type. Цена — нельзя subscribe только на OrderCancelled без чтения всего топика.
Batch-обработка
R-KFK-OBX-4: relay читает 10-50 events за раз.
LIMIT :limit -- 50 по умолчанию
Почему не по одному:
- DB-poll overhead — каждый запрос ~1-2ms даже с indexed scan.
- Kafka roundtrip —
send_and_waitждёт ACK, ~5-20ms per message. - При 100 events/s по одному — relay постоянно загружен, latency растёт.
С batch 50 — один запрос поднимает 50 events, отправляет последовательно через send_and_wait (порядок per-partition сохранён с enable_idempotence=True), published_at проставляется в одном UPDATE. Throughput x10-20.
Pydantic-конфигурация relay
R-KFK-CFG-1: все параметры через pydantic-settings.
from pydantic_settings import BaseSettings
class KafkaSettings(BaseSettings):
kafka_brokers: str
outbox_relay_interval_seconds: float = 0.5
outbox_relay_batch_size: int = 50
model_config = {"env_file": ".env", "env_file_encoding": "utf-8"}
bootstrap_servers — из env, не хардкодом (R-KFK-CFG-X2).
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
producer.send_and_wait в той же транзакции, что DB-write | R-KFK-OBX-X1 | outbox в той же транзакции, relay публикует отдельно |
| Публикация через after-commit hook без outbox | R-KFK-OBX-X2 | outbox-relay (retry автоматически) |
Outbox без published_at partial index | R-KFK-OBX-X3 | WHERE published_at IS NULL |
Relay без FOR UPDATE SKIP LOCKED | R-KFK-OBX-2 | параллельные relay-поды через SKIP LOCKED |
| Relay по одному событию | R-KFK-OBX-4 | batch 10-50 |
Outbox без event_id UNIQUE | R-KFK-OBX-1 | UNIQUE constraint защищает от двойной записи |
producer.send_and_wait без enable_idempotence=True | R-KFK-PROD-X1 | enable_idempotence=True в producer |
Куда дальше
- Producer — почему нельзя send напрямую из handler.
- Idempotent consumer — receiver side at-least-once.
- Event design — формат payload в outbox.
- Consumer — manual commit, group_id, auto_offset_reset.
- Retry topic + DLQ — retry-топики и DLQ.
- Observability — lag alert, traceparent в headers.
- Конфигурация — KafkaSettings через pydantic-settings.
- Security — TLS, ACL, PII в топиках.