Опирается на правила: R-DIST-OBX-1R-DIST-OBX-3 и R-DIST-OBX-X1R-DIST-OBX-X2 из Distributed Patterns Style Guide → раздел 5. Outbox + Inbox.

Важно знать

  • Outbox решает «UoW commit + message publish атомарно»: один AsyncSession — одна транзакция в PG.
  • Command-handler делает INSERT в таблицу домена и INSERT в outbox_event в одной транзакции через Unit of Work; asyncio-relay берёт unpublished-строки с FOR UPDATE SKIP LOCKED и публикует через aiokafka.
  • Inbox — обратное: consumer пишет полученное сообщение в inbox_event (processed=False), отдельная задача обрабатывает строки в своём темпе. Применяется только для критических сценариев — в большинстве случаев достаточно processed_event dedup.
  • Single source of truth — БД сервиса. aiokafka — транспорт сообщений, не источник правды.
  • При потере данных в Kafka outbox-таблица продолжает накапливать; после восстановления Kafka relay публикует отложенное.
  • Запрет direct send из command-handler: await producer.send(...) внутри UoW-блока не атомарен с commit.
  • Запрет паттерна «publish after commit» через connection.add_done_callback / background-task без outbox: commit прошёл, send упал → downstream не знает.

Outbox — фундаментальный паттерн UCP. Все события из write-handler-ов идут через outbox-таблицу, без исключений. Это даёт at-least-once гарантию доставки без двухфазного коммита.

Проблема «commit + publish»

Что кажется естественным в asyncio-стеке:

async def handle(command: CreateOrderCommand, session: AsyncSession, producer: AIOKafkaProducer) -> Order:
    order = Order.create(command)
    session.add(order)
    await session.commit()
    await producer.send("order.events", OrderCreatedEvent.from_order(order).model_dump_json().encode())
    return order

Что не работает:

  • commit() прошёл, producer.send(...) упал (network, broker недоступен) → событие потеряно, downstream не знает.
  • producer.send(...) прошёл, commit() упал → событие в Kafka есть, заказа в БД нет → downstream обрабатывает фантом.
  • Асинхронный network-split между двумя операциями — никаких гарантий.

Distributed transaction между PostgreSQL и Kafka не существует: Kafka не поддерживает XA. Outbox решает это через локальную транзакцию в PG.

Outbox pattern

R-DIST-OBX-1: outbox для исходящих событий обязателен.

Схема outbox-таблицы

CREATE TABLE outbox_event (
    id              bigint GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
    event_id        uuid        NOT NULL UNIQUE,
    aggregate_type  text        NOT NULL,
    aggregate_id    text        NOT NULL,
    event_type      text        NOT NULL,
    payload         jsonb       NOT NULL,
    topic           text        NOT NULL,
    partition_key   text        NOT NULL,
    created_at      timestamptz NOT NULL DEFAULT now(),
    published_at    timestamptz
);

CREATE INDEX ix_outbox_event_unpublished ON outbox_event(id)
    WHERE published_at IS NULL;

Partial index WHERE published_at IS NULL — relay сканирует только unpublished. После публикации строка остаётся в таблице (для audit), но из горячего индекса исчезает.

Unit of Work и outbox-publisher

# domain/events.py
from dataclasses import dataclass, field
from datetime import datetime, timezone
from uuid import UUID, uuid4

@dataclass
class OrderCreatedEvent:
    event_id: UUID = field(default_factory=uuid4)
    order_id: UUID = field(default_factory=uuid4)
    customer_id: UUID = field(default_factory=uuid4)
    amount: int = 0
    created_at: datetime = field(default_factory=lambda: datetime.now(timezone.utc))
# infrastructure/outbox.py
import json
from uuid import UUID
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import text

class OutboxPublisher:
    def __init__(self, session: AsyncSession) -> None:
        self._session = session

    async def publish(self, topic: str, partition_key: str, event_id: UUID, aggregate_type: str, aggregate_id: str, event_type: str, payload: dict) -> None:
        await self._session.execute(
            text("""
                INSERT INTO outbox_event (event_id, aggregate_type, aggregate_id, event_type, payload, topic, partition_key)
                VALUES (:event_id, :aggregate_type, :aggregate_id, :event_type, :payload, :topic, :partition_key)
            """),
            {
                "event_id": str(event_id),
                "aggregate_type": aggregate_type,
                "aggregate_id": aggregate_id,
                "event_type": event_type,
                "payload": json.dumps(payload),
                "topic": topic,
                "partition_key": partition_key,
            }
        )

Write-handler

# application/commands/create_order.py
from dataclasses import dataclass
from uuid import UUID
from sqlalchemy.ext.asyncio import AsyncSession

from domain.order import Order, OrderCreatedEvent
from infrastructure.outbox import OutboxPublisher

@dataclass(frozen=True)
class CreateOrderCommand:
    customer_id: UUID
    product_id: UUID
    amount: int

async def handle_create_order(
    command: CreateOrderCommand,
    session: AsyncSession,
) -> Order:
    async with session.begin():
        order = Order.create(
            customer_id=command.customer_id,
            product_id=command.product_id,
            amount=command.amount,
        )
        session.add(order)

        event = OrderCreatedEvent(
            order_id=order.id,
            customer_id=order.customer_id,
            amount=order.amount,
        )

        outbox = OutboxPublisher(session)
        await outbox.publish(
            topic="order.events",
            partition_key=str(order.id),
            event_id=event.event_id,
            aggregate_type="Order",
            aggregate_id=str(order.id),
            event_type="OrderCreated",
            payload={
                "event_id": str(event.event_id),
                "order_id": str(event.order_id),
                "customer_id": str(event.customer_id),
                "amount": event.amount,
                "created_at": event.created_at.isoformat(),
            },
        )

    return order

session.begin() охватывает оба INSERT: либо оба commit, либо оба откатываются. OutboxPublisher.publish делает INSERT в той же открытой транзакции.

FastAPI-router

# api/orders.py
from fastapi import APIRouter, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from uuid import UUID

from api.deps import get_session
from application.commands.create_order import CreateOrderCommand, handle_create_order

router = APIRouter(prefix="/orders", tags=["orders"])

class CreateOrderRequest(BaseModel):
    customer_id: UUID
    product_id: UUID
    amount: int

@router.post("/", status_code=201)
async def create_order(
    body: CreateOrderRequest,
    session: AsyncSession = Depends(get_session),
):
    command = CreateOrderCommand(
        customer_id=body.customer_id,
        product_id=body.product_id,
        amount=body.amount,
    )
    order = await handle_create_order(command, session)
    return {"order_id": str(order.id)}

Outbox-relay

# infrastructure/outbox_relay.py
import asyncio
import json
from datetime import datetime, timezone
from aiokafka import AIOKafkaProducer
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker

BATCH_SIZE = 100
POLL_INTERVAL_SECONDS = 0.5

async def run_outbox_relay(
    session_factory: async_sessionmaker[AsyncSession],
    producer: AIOKafkaProducer,
) -> None:
    while True:
        await _publish_batch(session_factory, producer)
        await asyncio.sleep(POLL_INTERVAL_SECONDS)

async def _publish_batch(
    session_factory: async_sessionmaker[AsyncSession],
    producer: AIOKafkaProducer,
) -> None:
    async with session_factory() as session:
        async with session.begin():
            rows = (await session.execute(
                text("""
                    SELECT id, event_id, topic, partition_key, payload
                    FROM outbox_event
                    WHERE published_at IS NULL
                    ORDER BY id
                    LIMIT :limit
                    FOR UPDATE SKIP LOCKED
                """),
                {"limit": BATCH_SIZE},
            )).mappings().all()

            for row in rows:
                await producer.send_and_wait(
                    row["topic"],
                    key=row["partition_key"].encode(),
                    value=row["payload"].encode() if isinstance(row["payload"], str) else json.dumps(row["payload"]).encode(),
                )
                await session.execute(
                    text("UPDATE outbox_event SET published_at = :now WHERE id = :id"),
                    {"now": datetime.now(timezone.utc), "id": row["id"]},
                )

FOR UPDATE SKIP LOCKED — несколько инстансов relay-я могут работать параллельно: каждый берёт свою порцию unpublished строк, не блокируя других. Это даёт горизонтальное масштабирование без внешней координации.

После публикации published_at = now(). Если relay упал между send_and_wait и UPDATE — следующий запуск возьмёт то же событие и опубликует снова. At-least-once, именно поэтому receiver обязан быть идемпотентным.

Запуск relay как lifespan-задачи

# main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI
from aiokafka import AIOKafkaProducer

from infrastructure.outbox_relay import run_outbox_relay
from infrastructure.db import session_factory

@asynccontextmanager
async def lifespan(app: FastAPI):
    producer = AIOKafkaProducer(bootstrap_servers="kafka:9092", enable_idempotence=True)
    await producer.start()

    relay_task = asyncio.create_task(run_outbox_relay(session_factory, producer))
    try:
        yield
    finally:
        relay_task.cancel()
        await asyncio.gather(relay_task, return_exceptions=True)
        await producer.stop()

app = FastAPI(lifespan=lifespan)

enable_idempotence=True для aiokafka-producer — R-DIST-IDEM-X2: producer тоже exactly-once на уровне брокера.

Single source of truth

R-DIST-OBX-3: БД сервиса — единственный источник правды. Kafka — транспорт.

Что это даёт:

  • Потеря Kafka-данных (retention истёк, broker восстановлен) — outbox-таблица продолжает накапливать, relay публикует после восстановления.
  • Rebuild read-projection — скрипт перечитывает всю outbox_event и заново публикует события; receiver обрабатывает их идемпотентно.
  • Audit — каждое событие, которое сервис когда-либо породил, остаётся в БД.

Inbox pattern

R-DIST-OBX-2: inbox — обратная сторона outbox. Consumer сохраняет полученное сообщение в inbox_event с processed=False, отдельная asyncio-задача обрабатывает unprocessed-строки в локальных транзакциях.

CREATE TABLE inbox_event (
    event_id     uuid        PRIMARY KEY,
    received_at  timestamptz NOT NULL DEFAULT now(),
    payload      jsonb       NOT NULL,
    processed    boolean     NOT NULL DEFAULT false,
    processed_at timestamptz
);

CREATE INDEX ix_inbox_event_unprocessed ON inbox_event(received_at)
    WHERE NOT processed;
# infrastructure/inbox_consumer.py
from aiokafka import AIOKafkaConsumer
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from sqlalchemy import text
import json

async def run_inbox_consumer(
    session_factory: async_sessionmaker[AsyncSession],
    consumer: AIOKafkaConsumer,
) -> None:
    async for msg in consumer:
        payload = json.loads(msg.value)
        event_id = payload["event_id"]

        async with session_factory() as session:
            async with session.begin():
                await session.execute(
                    text("""
                        INSERT INTO inbox_event (event_id, payload)
                        VALUES (:event_id, :payload)
                        ON CONFLICT (event_id) DO NOTHING
                    """),
                    {"event_id": event_id, "payload": json.dumps(payload)},
                )
# infrastructure/inbox_processor.py
import asyncio
from datetime import datetime, timezone
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
from sqlalchemy import text

async def run_inbox_processor(
    session_factory: async_sessionmaker[AsyncSession],
) -> None:
    while True:
        await _process_batch(session_factory)
        await asyncio.sleep(0.2)

async def _process_batch(session_factory: async_sessionmaker[AsyncSession]) -> None:
    async with session_factory() as session:
        async with session.begin():
            rows = (await session.execute(
                text("""
                    SELECT event_id, payload FROM inbox_event
                    WHERE NOT processed
                    ORDER BY received_at
                    LIMIT 100
                    FOR UPDATE SKIP LOCKED
                """)
            )).mappings().all()

            for row in rows:
                await _handle_payment_event(row["payload"], session)
                await session.execute(
                    text("UPDATE inbox_event SET processed = true, processed_at = :now WHERE event_id = :eid"),
                    {"now": datetime.now(timezone.utc), "eid": row["event_id"]},
                )

async def _handle_payment_event(payload: dict, session: AsyncSession) -> None:
    order_id = payload["order_id"]
    await session.execute(
        text("UPDATE orders SET payment_status = 'paid' WHERE id = :order_id"),
        {"order_id": order_id},
    )

Когда использовать inbox

  • Bursty traffic — Kafka даёт burst 10 000 msg/s, обработка тяжёлая. Inbox развязывает приём и обработку: consumer принимает быстро, processor работает в своём темпе.
  • Critical financial flows — нужно жёстко гарантировать, что ни одно сообщение не потеряется между приёмом и обработкой даже при перезапуске.

В большинстве случаев inbox избыточен — достаточно processed_event dedup-таблицы с ON CONFLICT DO NOTHING. Если обработка лёгкая и retry-topic + DLQ достаточно — inbox добавляет сложности без выигрыша.

КритерийТолько processed_eventInbox + processor
Сложностьнизкаясредняя
Burst handlingограничен concurrency consumer-аразвязка приёма и обработки
Recovery после перезапускаre-consume из Kafkaотдельная обработка inbox
Когда применятьдефолтfinancial / high burst

Что запрещено

АнтипаттернПравилоЧто взамен
await producer.send(...) в command-handler без outboxR-DIST-OBX-X1outbox-таблица + asyncio relay
Publish после commit через background-task без outboxR-DIST-OBX-X2INSERT в outbox в той же транзакции
Relay без FOR UPDATE SKIP LOCKEDR-DIST-OBX-1FOR UPDATE SKIP LOCKED для конкурентных relay-задач
Outbox без partial-index unpublishedR-DIST-OBX-1WHERE published_at IS NULL
Kafka как источник правдыR-DIST-OBX-3PG единственный SoT, Kafka — транспорт
Inbox для каждого consumer-аR-DIST-OBX-2processed_event dedup дефолт, inbox только для critical
DELETE опубликованных строк из outboxR-DIST-OBX-3hold для audit/rebuild, cleanup отдельным job-ом по retention
enable_idempotence=False в aiokafka-producer relay-яR-DIST-IDEM-X2enable_idempotence=True — exactly-once на уровне брокера

Куда дальше

  • Idempotency — получатель обязан быть идемпотентным при at-least-once доставке.
  • Saga — saga-state-таблица и outbox работают вместе; orchestrator шлёт команды через outbox.
  • Eventual consistency — outbox — главный механизм eventual consistency в UCP.
  • Compensation — compensation-команды публикуются через outbox.
  • Distributed transactions — почему 2PC и multi-datasource-commit-цепочки запрещены.
  • Когда нужны распределённые паттерны — проверь альтернативы перед введением outbox.