Опирается на правила:
R-KFK-OBX-1…R-KFK-OBX-4иR-KFK-OBX-X1…R-KFK-OBX-X3из Kafka Style Guide → раздел 3. Outbox publishing.
Важно знать
- Domain events публикуются через outbox-relay, не напрямую
producer.send_and_wait(...)из handler.- Запись в
outbox_eventидёт в той же DB-транзакции, что бизнес-write. Либо обе commit, либо обе rollback.- Outbox-relay — отдельная asyncio-задача (APScheduler/arq), читает unpublished через
SELECT ... FOR UPDATE SKIP LOCKED, публикует, проставляетpublished_at.- Topic name derives от
event_type/aggregate_type:<service>.<aggregate-type>.<event-name>.- Relay в batch (10-50 events) — снижает overhead DB-poll и Kafka-roundtrip.
partial-index WHERE published_at IS NULLобязателен — без него full scan на каждом poll.- Публикация через after-commit hook без outbox — потеря событий при сбое между commit и publish.
producer.send_and_waitв той же транзакции, что DB-операция — Kafka не XA; rollback БД не откатит publish.
Outbox publishing — фундаментальный паттерн UCP. Все domain events публикуются через него. Это даёт at-least-once доставку с атомарностью «commit DB + публикация» через локальную транзакцию в PG. Теория — Distributed → outbox + inbox.
Схема outbox-таблицы
R-KFK-OBX-3, R-KFK-OBX-X3: схема с partial-индексом.
CREATE TABLE outbox_event (
id bigint GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
event_id uuid NOT NULL UNIQUE,
aggregate_type text NOT NULL,
aggregate_id text NOT NULL,
event_type text NOT NULL,
payload jsonb NOT NULL,
topic text NOT NULL,
partition_key text NOT NULL,
created_at timestamptz NOT NULL DEFAULT now(),
published_at timestamptz
);
CREATE INDEX ix_outbox_event_unpublished
ON outbox_event (id)
WHERE published_at IS NULL;
WHERE published_at IS NULL — partial index. После публикации запись выпадает из индекса; relay сканирует только «горячий» хвост. Без него — full scan миллиардной таблицы на каждом poll, CPU БД в 100% через неделю.
Доменное событие
R-KFK-EVT-4: событие — @dataclass(frozen=True), десериализация на границе — Pydantic-модель.
from dataclasses import dataclass
from uuid import UUID
from decimal import Decimal
@dataclass(frozen=True)
class OrderConfirmedEvent:
event_id: UUID
occurred_at: str
aggregate_id: UUID
order_id: UUID
customer_id: UUID
total_amount: Decimal
currency: str
event_type: str = "order.confirmed.v1"
Запись в outbox из handler
R-KFK-OBX-1: write-handler пишет в outbox_event в той же транзакции.
import uuid
import json
from dataclasses import asdict
from datetime import datetime, timezone
from sqlalchemy.ext.asyncio import AsyncSession
from app.orders.domain.event import OrderConfirmedEvent
from app.orders.domain.repository import OrderRepository
class ConfirmOrderHandler:
def __init__(self, session: AsyncSession, order_repo: OrderRepository) -> None:
self._session = session
self._order_repo = order_repo
async def handle(self, order_id: uuid.UUID) -> None:
async with self._session.begin():
order = await self._order_repo.get_for_update(order_id)
order.confirm()
await self._order_repo.save(order)
event = OrderConfirmedEvent(
event_id=uuid.uuid7(),
occurred_at=datetime.now(timezone.utc).isoformat(),
aggregate_id=order.id,
order_id=order.id,
customer_id=order.customer_id,
total_amount=order.total_amount,
currency=order.currency,
)
await self._session.execute(
"""
INSERT INTO outbox_event
(event_id, aggregate_type, aggregate_id, event_type, payload, topic, partition_key)
VALUES
(:event_id, :aggregate_type, :aggregate_id, :event_type, :payload, :topic, :partition_key)
""",
{
"event_id": str(event.event_id),
"aggregate_type": "Order",
"aggregate_id": str(order.id),
"event_type": event.event_type,
"payload": json.dumps(asdict(event), default=str),
"topic": "order-service.order.confirmed",
"partition_key": str(order.id),
},
)
Атомарность гарантирует PG: session.begin() охватывает и бизнес-write, и запись в outbox. Никакой XA с Kafka не нужен.
Outbox-relay
R-KFK-OBX-2: отдельная asyncio-задача. Пример через APScheduler:
import asyncio
import json
import logging
from datetime import datetime, timezone
from aiokafka import AIOKafkaProducer
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
logger = logging.getLogger(__name__)
class OutboxRelay:
def __init__(
self,
session_factory: async_sessionmaker[AsyncSession],
producer: AIOKafkaProducer,
batch_size: int = 50,
) -> None:
self._session_factory = session_factory
self._producer = producer
self._batch_size = batch_size
async def publish(self) -> None:
async with self._session_factory() as session:
async with session.begin():
rows = (
await session.execute(
"""
SELECT id, event_id, topic, partition_key, payload
FROM outbox_event
WHERE published_at IS NULL
ORDER BY id
LIMIT :limit
FOR UPDATE SKIP LOCKED
""",
{"limit": self._batch_size},
)
).fetchall()
if not rows:
return
for row in rows:
try:
await self._producer.send_and_wait(
topic=row.topic,
key=row.partition_key.encode(),
value=json.dumps(row.payload).encode(),
)
await session.execute(
"UPDATE outbox_event SET published_at = :now WHERE id = :id",
{"now": datetime.now(timezone.utc), "id": row.id},
)
except Exception:
logger.exception(
"Failed to publish outbox event id=%s topic=%s",
row.id,
row.topic,
)
raise
def start_relay(relay: OutboxRelay) -> AsyncIOScheduler:
scheduler = AsyncIOScheduler()
scheduler.add_job(relay.publish, "interval", seconds=0.5)
scheduler.start()
return scheduler
FOR UPDATE SKIP LOCKED — несколько pod-ов relay гонят параллельно, каждый забирает свою порцию без блокировок. Горизонтальное масштабирование без координации.
interval=0.5s даёт типичную задержку ~500ms. Для критичных flow — 0.1s, для аналитики — 5s.
Подключение через lifespan (FastAPI)
from contextlib import asynccontextmanager
from aiokafka import AIOKafkaProducer
from fastapi import FastAPI
from app.kafka.outbox_relay import OutboxRelay, start_relay
from app.config import Settings
@asynccontextmanager
async def lifespan(app: FastAPI):
settings = Settings()
producer = AIOKafkaProducer(
bootstrap_servers=settings.kafka_brokers,
enable_idempotence=True,
value_serializer=lambda v: v,
key_serializer=lambda k: k,
)
await producer.start()
relay = OutboxRelay(session_factory=app.state.session_factory, producer=producer)
scheduler = start_relay(relay)
yield
scheduler.shutdown(wait=False)
await producer.stop()
app = FastAPI(lifespan=lifespan)
Producer запускается один раз при старте приложения и разделяется между outbox-relay и всеми handler-ами, которым нужен прямой send (для нон-доменных событий).
Topic naming
R-KFK-OBX-3: convention <service>.<aggregate-type>.<event-name>.
| Сервис | Topic |
|---|---|
| order-service | order-service.order.created |
| order-service | order-service.order.confirmed |
| order-service | order-service.order.cancelled |
| payment-service | payment-service.payment.charged |
| payment-service | payment-service.payment.refunded |
| customer-service | customer-service.customer.registered |
Альтернатива — один топик на aggregate с разными event_type в payload:
order-service.order → {"event_type": "OrderCreated.v1", ...}
→ {"event_type": "OrderConfirmed.v1", ...}
→ {"event_type": "OrderCancelled.v1", ...}
Удобнее для consumer-side, который хочет «все события по заказу» — один consumer читает всё, фильтрует по event_type. Цена — нельзя subscribe только на OrderCancelled без чтения всего топика.
Batch-обработка
R-KFK-OBX-4: relay читает 10-50 events за раз.
LIMIT :limit -- 50 по умолчанию
Почему не по одному:
- DB-poll overhead — каждый запрос ~1-2ms даже с indexed scan.
- Kafka roundtrip —
send_and_waitждёт ACK, ~5-20ms per message. - При 100 events/s по одному — relay постоянно загружен, latency растёт.
С batch 50 — один запрос поднимает 50 events, отправляет последовательно через send_and_wait (порядок per-partition сохранён с enable_idempotence=True), published_at проставляется в одном UPDATE. Throughput x10-20.
Pydantic-конфигурация relay
R-KFK-CFG-1: все параметры через pydantic-settings.
from pydantic_settings import BaseSettings
class KafkaSettings(BaseSettings):
kafka_brokers: str
outbox_relay_interval_seconds: float = 0.5
outbox_relay_batch_size: int = 50
model_config = {"env_file": ".env", "env_file_encoding": "utf-8"}
bootstrap_servers — из env, не хардкодом (R-KFK-CFG-X2).
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
producer.send_and_wait в той же транзакции, что DB-write | R-KFK-OBX-X1 | outbox в той же транзакции, relay публикует отдельно |
| Публикация через after-commit hook без outbox | R-KFK-OBX-X2 | outbox-relay (retry автоматически) |
Outbox без published_at partial index | R-KFK-OBX-X3 | WHERE published_at IS NULL |
Relay без FOR UPDATE SKIP LOCKED | R-KFK-OBX-2 | параллельные relay-поды через SKIP LOCKED |
| Relay по одному событию | R-KFK-OBX-4 | batch 10-50 |
Outbox без event_id UNIQUE | R-KFK-OBX-1 | UNIQUE constraint защищает от двойной записи |
producer.send_and_wait без enable_idempotence=True | R-KFK-PROD-X1 | enable_idempotence=True в producer |
Куда дальше
- Producer — почему нельзя send напрямую из handler.
- Idempotent consumer — receiver side at-least-once.
- Event design — формат payload в outbox.
- Consumer — manual commit, group_id, auto_offset_reset.
- Retry topic + DLQ — retry-топики и DLQ.
- Observability — lag alert, traceparent в headers.
- Конфигурация — KafkaSettings через pydantic-settings.
- Security — TLS, ACL, PII в топиках.