Опирается на правила:
R-DIST-OBX-1…R-DIST-OBX-3иR-DIST-OBX-X1…R-DIST-OBX-X2из Distributed Patterns Style Guide → раздел 5. Outbox + Inbox.
Важно знать
- Outbox решает «UoW commit + message publish атомарно»: один
AsyncSession— одна транзакция в PG.- Command-handler делает
INSERTв таблицу домена иINSERTвoutbox_eventв одной транзакции через Unit of Work; asyncio-relay берёт unpublished-строки сFOR UPDATE SKIP LOCKEDи публикует черезaiokafka.- Inbox — обратное: consumer пишет полученное сообщение в
inbox_event(processed=False), отдельная задача обрабатывает строки в своём темпе. Применяется только для критических сценариев — в большинстве случаев достаточноprocessed_eventdedup.- Single source of truth — БД сервиса. aiokafka — транспорт сообщений, не источник правды.
- При потере данных в Kafka outbox-таблица продолжает накапливать; после восстановления Kafka relay публикует отложенное.
- Запрет direct send из command-handler:
await producer.send(...)внутри UoW-блока не атомарен с commit.- Запрет паттерна «publish after commit» через
connection.add_done_callback/ background-task без outbox: commit прошёл, send упал → downstream не знает.
Outbox — фундаментальный паттерн UCP. Все события из write-handler-ов идут через outbox-таблицу, без исключений. Это даёт at-least-once гарантию доставки без двухфазного коммита.
Проблема «commit + publish»
Что кажется естественным в asyncio-стеке:
async def handle(command: CreateOrderCommand, session: AsyncSession, producer: AIOKafkaProducer) -> Order:
order = Order.create(command)
session.add(order)
await session.commit()
await producer.send("order.events", OrderCreatedEvent.from_order(order).model_dump_json().encode())
return order
Что не работает:
commit()прошёл,producer.send(...)упал (network, broker недоступен) → событие потеряно, downstream не знает.producer.send(...)прошёл,commit()упал → событие в Kafka есть, заказа в БД нет → downstream обрабатывает фантом.- Асинхронный network-split между двумя операциями — никаких гарантий.
Distributed transaction между PostgreSQL и Kafka не существует: Kafka не поддерживает XA. Outbox решает это через локальную транзакцию в PG.
Outbox pattern
R-DIST-OBX-1: outbox для исходящих событий обязателен.
Схема outbox-таблицы
CREATE TABLE outbox_event (
id bigint GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
event_id uuid NOT NULL UNIQUE,
aggregate_type text NOT NULL,
aggregate_id text NOT NULL,
event_type text NOT NULL,
payload jsonb NOT NULL,
topic text NOT NULL,
partition_key text NOT NULL,
created_at timestamptz NOT NULL DEFAULT now(),
published_at timestamptz
);
CREATE INDEX ix_outbox_event_unpublished ON outbox_event(id)
WHERE published_at IS NULL;
Partial index WHERE published_at IS NULL — relay сканирует только unpublished. После публикации строка остаётся в таблице (для audit), но из горячего индекса исчезает.
Unit of Work и outbox-publisher
# domain/events.py
from dataclasses import dataclass, field
from datetime import datetime, timezone
from uuid import UUID, uuid4
@dataclass
class OrderCreatedEvent:
event_id: UUID = field(default_factory=uuid4)
order_id: UUID = field(default_factory=uuid4)
customer_id: UUID = field(default_factory=uuid4)
amount: int = 0
created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
# infrastructure/outbox.py
import json
from uuid import UUID
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import text
class OutboxPublisher:
def __init__(self, session: AsyncSession) -> None:
self._session = session
async def publish(self, topic: str, partition_key: str, event_id: UUID, aggregate_type: str, aggregate_id: str, event_type: str, payload: dict) -> None:
await self._session.execute(
text("""
INSERT INTO outbox_event (event_id, aggregate_type, aggregate_id, event_type, payload, topic, partition_key)
VALUES (:event_id, :aggregate_type, :aggregate_id, :event_type, :payload, :topic, :partition_key)
"""),
{
"event_id": str(event_id),
"aggregate_type": aggregate_type,
"aggregate_id": aggregate_id,
"event_type": event_type,
"payload": json.dumps(payload),
"topic": topic,
"partition_key": partition_key,
}
)
Write-handler
# application/commands/create_order.py
from dataclasses import dataclass
from uuid import UUID
from sqlalchemy.ext.asyncio import AsyncSession
from domain.order import Order, OrderCreatedEvent
from infrastructure.outbox import OutboxPublisher
@dataclass(frozen=True)
class CreateOrderCommand:
customer_id: UUID
product_id: UUID
amount: int
async def handle_create_order(
command: CreateOrderCommand,
session: AsyncSession,
) -> Order:
async with session.begin():
order = Order.create(
customer_id=command.customer_id,
product_id=command.product_id,
amount=command.amount,
)
session.add(order)
event = OrderCreatedEvent(
order_id=order.id,
customer_id=order.customer_id,
amount=order.amount,
)
outbox = OutboxPublisher(session)
await outbox.publish(
topic="order.events",
partition_key=str(order.id),
event_id=event.event_id,
aggregate_type="Order",
aggregate_id=str(order.id),
event_type="OrderCreated",
payload={
"event_id": str(event.event_id),
"order_id": str(event.order_id),
"customer_id": str(event.customer_id),
"amount": event.amount,
"created_at": event.created_at.isoformat(),
},
)
return order
session.begin() охватывает оба INSERT: либо оба commit, либо оба откатываются. OutboxPublisher.publish делает INSERT в той же открытой транзакции.
FastAPI-router
# api/orders.py
from fastapi import APIRouter, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from uuid import UUID
from api.deps import get_session
from application.commands.create_order import CreateOrderCommand, handle_create_order
router = APIRouter(prefix="/orders", tags=["orders"])
class CreateOrderRequest(BaseModel):
customer_id: UUID
product_id: UUID
amount: int
@router.post("/", status_code=201)
async def create_order(
body: CreateOrderRequest,
session: AsyncSession = Depends(get_session),
):
command = CreateOrderCommand(
customer_id=body.customer_id,
product_id=body.product_id,
amount=body.amount,
)
order = await handle_create_order(command, session)
return {"order_id": str(order.id)}
Outbox-relay
# infrastructure/outbox_relay.py
import asyncio
import json
from datetime import datetime, timezone
from aiokafka import AIOKafkaProducer
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
BATCH_SIZE = 100
POLL_INTERVAL_SECONDS = 0.5
async def run_outbox_relay(
session_factory: async_sessionmaker[AsyncSession],
producer: AIOKafkaProducer,
) -> None:
while True:
await _publish_batch(session_factory, producer)
await asyncio.sleep(POLL_INTERVAL_SECONDS)
async def _publish_batch(
session_factory: async_sessionmaker[AsyncSession],
producer: AIOKafkaProducer,
) -> None:
async with session_factory() as session:
async with session.begin():
rows = (await session.execute(
text("""
SELECT id, event_id, topic, partition_key, payload
FROM outbox_event
WHERE published_at IS NULL
ORDER BY id
LIMIT :limit
FOR UPDATE SKIP LOCKED
"""),
{"limit": BATCH_SIZE},
)).mappings().all()
for row in rows:
await producer.send_and_wait(
row["topic"],
key=row["partition_key"].encode(),
value=row["payload"].encode() if isinstance(row["payload"], str) else json.dumps(row["payload"]).encode(),
)
await session.execute(
text("UPDATE outbox_event SET published_at = :now WHERE id = :id"),
{"now": datetime.now(timezone.utc), "id": row["id"]},
)
FOR UPDATE SKIP LOCKED — несколько инстансов relay-я могут работать параллельно: каждый берёт свою порцию unpublished строк, не блокируя других. Это даёт горизонтальное масштабирование без внешней координации.
После публикации published_at = now(). Если relay упал между send_and_wait и UPDATE — следующий запуск возьмёт то же событие и опубликует снова. At-least-once, именно поэтому receiver обязан быть идемпотентным.
Запуск relay как lifespan-задачи
# main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI
from aiokafka import AIOKafkaProducer
from infrastructure.outbox_relay import run_outbox_relay
from infrastructure.db import session_factory
@asynccontextmanager
async def lifespan(app: FastAPI):
producer = AIOKafkaProducer(bootstrap_servers="kafka:9092", enable_idempotence=True)
await producer.start()
relay_task = asyncio.create_task(run_outbox_relay(session_factory, producer))
try:
yield
finally:
relay_task.cancel()
await asyncio.gather(relay_task, return_exceptions=True)
await producer.stop()
app = FastAPI(lifespan=lifespan)
enable_idempotence=True для aiokafka-producer — R-DIST-IDEM-X2: producer тоже exactly-once на уровне брокера.
Single source of truth
R-DIST-OBX-3: БД сервиса — единственный источник правды. Kafka — транспорт.
Что это даёт:
- Потеря Kafka-данных (retention истёк, broker восстановлен) — outbox-таблица продолжает накапливать, relay публикует после восстановления.
- Rebuild read-projection — скрипт перечитывает всю
outbox_eventи заново публикует события; receiver обрабатывает их идемпотентно. - Audit — каждое событие, которое сервис когда-либо породил, остаётся в БД.
Inbox pattern
R-DIST-OBX-2: inbox — обратная сторона outbox. Consumer сохраняет полученное сообщение в inbox_event с processed=False, отдельная asyncio-задача обрабатывает unprocessed-строки в локальных транзакциях.
CREATE TABLE inbox_event (
event_id uuid PRIMARY KEY,
received_at timestamptz NOT NULL DEFAULT now(),
payload jsonb NOT NULL,
processed boolean NOT NULL DEFAULT false,
processed_at timestamptz
);
CREATE INDEX ix_inbox_event_unprocessed ON inbox_event(received_at)
WHERE NOT processed;
# infrastructure/inbox_consumer.py
from aiokafka import AIOKafkaConsumer
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from sqlalchemy import text
import json
async def run_inbox_consumer(
session_factory: async_sessionmaker[AsyncSession],
consumer: AIOKafkaConsumer,
) -> None:
async for msg in consumer:
payload = json.loads(msg.value)
event_id = payload["event_id"]
async with session_factory() as session:
async with session.begin():
await session.execute(
text("""
INSERT INTO inbox_event (event_id, payload)
VALUES (:event_id, :payload)
ON CONFLICT (event_id) DO NOTHING
"""),
{"event_id": event_id, "payload": json.dumps(payload)},
)
# infrastructure/inbox_processor.py
import asyncio
from datetime import datetime, timezone
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from sqlalchemy import text
async def run_inbox_processor(
session_factory: async_sessionmaker[AsyncSession],
) -> None:
while True:
await _process_batch(session_factory)
await asyncio.sleep(0.2)
async def _process_batch(session_factory: async_sessionmaker[AsyncSession]) -> None:
async with session_factory() as session:
async with session.begin():
rows = (await session.execute(
text("""
SELECT event_id, payload FROM inbox_event
WHERE NOT processed
ORDER BY received_at
LIMIT 100
FOR UPDATE SKIP LOCKED
""")
)).mappings().all()
for row in rows:
await _handle_payment_event(row["payload"], session)
await session.execute(
text("UPDATE inbox_event SET processed = true, processed_at = :now WHERE event_id = :eid"),
{"now": datetime.now(timezone.utc), "eid": row["event_id"]},
)
async def _handle_payment_event(payload: dict, session: AsyncSession) -> None:
order_id = payload["order_id"]
await session.execute(
text("UPDATE orders SET payment_status = 'paid' WHERE id = :order_id"),
{"order_id": order_id},
)
Когда использовать inbox
- Bursty traffic — Kafka даёт burst 10 000 msg/s, обработка тяжёлая. Inbox развязывает приём и обработку: consumer принимает быстро, processor работает в своём темпе.
- Critical financial flows — нужно жёстко гарантировать, что ни одно сообщение не потеряется между приёмом и обработкой даже при перезапуске.
В большинстве случаев inbox избыточен — достаточно processed_event dedup-таблицы с ON CONFLICT DO NOTHING. Если обработка лёгкая и retry-topic + DLQ достаточно — inbox добавляет сложности без выигрыша.
| Критерий | Только processed_event | Inbox + processor |
|---|---|---|
| Сложность | низкая | средняя |
| Burst handling | ограничен concurrency consumer-а | развязка приёма и обработки |
| Recovery после перезапуска | re-consume из Kafka | отдельная обработка inbox |
| Когда применять | дефолт | financial / high burst |
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
await producer.send(...) в command-handler без outbox | R-DIST-OBX-X1 | outbox-таблица + asyncio relay |
| Publish после commit через background-task без outbox | R-DIST-OBX-X2 | INSERT в outbox в той же транзакции |
Relay без FOR UPDATE SKIP LOCKED | R-DIST-OBX-1 | FOR UPDATE SKIP LOCKED для конкурентных relay-задач |
| Outbox без partial-index unpublished | R-DIST-OBX-1 | WHERE published_at IS NULL |
| Kafka как источник правды | R-DIST-OBX-3 | PG единственный SoT, Kafka — транспорт |
| Inbox для каждого consumer-а | R-DIST-OBX-2 | processed_event dedup дефолт, inbox только для critical |
DELETE опубликованных строк из outbox | R-DIST-OBX-3 | hold для audit/rebuild, cleanup отдельным job-ом по retention |
enable_idempotence=False в aiokafka-producer relay-я | R-DIST-IDEM-X2 | enable_idempotence=True — exactly-once на уровне брокера |
Куда дальше
- Idempotency — получатель обязан быть идемпотентным при at-least-once доставке.
- Saga — saga-state-таблица и outbox работают вместе; orchestrator шлёт команды через outbox.
- Eventual consistency — outbox — главный механизм eventual consistency в UCP.
- Compensation — compensation-команды публикуются через outbox.
- Distributed transactions — почему 2PC и multi-datasource-commit-цепочки запрещены.
- Когда нужны распределённые паттерны — проверь альтернативы перед введением outbox.