Опирается на правила: R-CQRS-SYNC-1R-CQRS-SYNC-5 и R-CQRS-SYNC-X1R-CQRS-SYNC-X3 из CQRS Style Guide → раздел 5. Синхронизация через события.

Важно знать

  • Sync write → read идёт через outbox + Kafka, не synchronously. Outbox-запись вставляется в той же AsyncSession, что и изменение агрегата, и живёт до delivery.
  • Idempotent consumer обязателен: read-model UPDATE может прийти дважды. Защита — processed_event таблица или idempotent UPDATE … WHERE version < ?.
  • Bootstrap-rebuild при первом запуске или потере read-model: батч по агрегатам через write-сессию, не ждём события за 30 дней из Kafka.
  • Eventual consistency декларируется в API. FastAPI — code-first: описание задержки в docstring endpoint'а или в поле description response-модели.
  • Read-your-writes при необходимости — sticky session, polling в post-commit hook или отдельный endpoint из write-store.
  • Никакого синхронного INSERT в read-таблицу внутри command-UoW. Decoupling сразу теряется, при rollback read-model расходится.
  • Никаких PG-триггеров: невидимая магия, ломается на bulk, не переносится cross-DB.
  • Никаких schema-coupled events: payload — независимый @dataclass(frozen=True), не ORM-модель write-схемы.

Сердце CQRS — не разделение моделей, а способ их связать. Если write и read физически разнесены, нужен надёжный механизм передачи изменений, и он же определяет всё: latency, отказоустойчивость, формат данных. Outbox + Kafka — стандартная связка; всё остальное — детали её правильного применения в идиомах Python.

Outbox-pattern: атомарность с агрегатом

R-CQRS-SYNC-1: command-handler регистрирует событие через агрегат (order.confirm() поднимает domain-event); на await session.flush() / await uow.commit() событие попадает в outbox-таблицу той же БД, в той же AsyncSession, что и изменение агрегата.

# core/order/domain/events.py
@dataclass(frozen=True)
class OrderConfirmed:
    event_id: UUID
    order_id: OrderId
    status: str
    confirmed_at: datetime
    aggregate_version: int
# adapter/out/persistence/order_repository.py
class SqlAlchemyOrderRepository:
    def __init__(self, session: AsyncSession) -> None:
        self._session = session

    async def save(self, order: Order) -> None:
        orm_row = to_orm(order)
        await self._session.merge(orm_row)
        for event in order.pull_events():
            await self._session.execute(
                insert(OutboxRow).values(
                    id=uuid4(),
                    event_type=type(event).__name__,
                    payload=to_json(event),
                    aggregate_id=str(order.id.value),
                    created_at=utcnow(),
                    published=False,
                )
            )
# core/order/usecase/confirm_order.py
class ConfirmOrderHandler:
    def __init__(self, orders: OrderRepository, uow: UnitOfWork, clock: Clock) -> None:
        self._orders = orders
        self._uow = uow
        self._clock = clock

    async def handle(self, cmd: ConfirmOrder) -> OrderId:
        async with self._uow:
            order = await self._orders.by_id(cmd.order_id)
            order.confirm(self._clock)
            await self._orders.save(order)
            await self._uow.commit()
        return order.id

После commit:

  • Outbox-relay — asyncio-задача (или отдельный воркер), которая раз в N миллисекунд делает SELECT … FOR UPDATE SKIP LOCKED LIMIT 100 из outbox, публикует в Kafka через aiokafka.AIOKafkaProducer, помечает строки как опубликованные.
  • Продюсер настроен idempotent (enable_idempotence=True): Kafka обеспечивает at-least-once с дедупом на уровне partition.

Зачем outbox, а не «после commit — publish»:

  • Commit прошёл, Kafka недоступна — событие потеряно, read-model рассинхронизируется навсегда.
  • Commit не прошёл, Kafka уже получила — phantom-event, в БД нет соответствующего state.
  • Outbox решает обе проблемы: пока строка лежит в outbox, relay будет ретраить до delivery.

Idempotent consumer

R-CQRS-SYNC-2: consumer read-стороны обязан быть идемпотентным. At-least-once означает: одно сообщение придёт дважды. Два варианта защиты:

processed_event таблица

CREATE TABLE processed_event (
    event_id    UUID        NOT NULL,
    consumer    TEXT        NOT NULL,
    processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    PRIMARY KEY (event_id, consumer)
);
# adapter/in/kafka/order_summary_projector.py
class OrderSummaryProjector:
    def __init__(self, session_factory: async_sessionmaker) -> None:
        self._session_factory = session_factory

    async def on_order_confirmed(self, raw: bytes) -> None:
        event: OrderConfirmed = from_json(raw, OrderConfirmed)
        async with self._session_factory() as session:
            async with session.begin():
                already = await session.scalar(
                    select(ProcessedEventRow).where(
                        ProcessedEventRow.event_id == event.event_id,
                        ProcessedEventRow.consumer == "order-summary-projector",
                    )
                )
                if already:
                    return
                session.add(ProcessedEventRow(
                    event_id=event.event_id,
                    consumer="order-summary-projector",
                ))
                await session.execute(
                    update(OrderSummaryRow)
                    .where(OrderSummaryRow.order_id == event.order_id.value)
                    .values(status=event.status, confirmed_at=event.confirmed_at)
                )

Плюс: точная гарантия дедупа. Минус: дополнительная запись под каждое событие.

Idempotent UPDATE по version

В read-таблице хранится version (номер версии агрегата). UPDATE применяется только если событие новее:

async def on_order_confirmed(self, raw: bytes) -> None:
    event: OrderConfirmed = from_json(raw, OrderConfirmed)
    async with self._session_factory() as session:
        async with session.begin():
            result = await session.execute(
                update(OrderSummaryRow)
                .where(
                    OrderSummaryRow.order_id == event.order_id.value,
                    OrderSummaryRow.version < event.aggregate_version,
                )
                .values(
                    status=event.status,
                    confirmed_at=event.confirmed_at,
                    version=event.aggregate_version,
                )
            )
            if result.rowcount == 0:
                logger.debug("skip stale or duplicate: event_id=%s", event.event_id)

Плюс: не нужна отдельная таблица. Минус: требует aggregate_version в каждом событии. Подходит когда события одного агрегата идут в одной Kafka-партиции по aggregate_id.

Bootstrap и disaster recovery — синхронный rebuild

R-CQRS-SYNC-3: при первом запуске нового read-store или после потери — не ждём пока события придут из Kafka. Запускаем батч-rebuild из write-стороны.

# adapter/out/persistence/order_summary_bootstrap.py
class OrderSummaryBootstrap:
    def __init__(
        self,
        read_session_factory: async_sessionmaker,
        write_session_factory: async_sessionmaker,
    ) -> None:
        self._read = read_session_factory
        self._write = write_session_factory

    async def run_if_empty(self) -> None:
        async with self._read() as session:
            count = await session.scalar(select(func.count()).select_from(OrderSummaryRow))
        if count:
            return
        logger.info("order_summary is empty — running bootstrap rebuild")
        await self._rebuild_all()

    async def _rebuild_all(self) -> None:
        last_id = 0
        while True:
            async with self._write() as session:
                rows: list[OrderRow] = (await session.scalars(
                    select(OrderRow)
                    .where(OrderRow.id > last_id)
                    .order_by(OrderRow.id)
                    .limit(500)
                )).all()
            if not rows:
                break
            async with self._read() as session:
                async with session.begin():
                    for row in rows:
                        await session.merge(to_summary(row))
            last_id = rows[-1].id

Bootstrap — разовый: при создании нового read-store, после disaster recovery или при структурной миграции (добавили колонку, нужно дозаполнить). Запускается через lifespan FastAPI до начала приёма трафика:

@asynccontextmanager
async def lifespan(app: FastAPI):
    await order_summary_bootstrap.run_if_empty()
    async with kafka_relay_task():
        yield

Eventual consistency декларируется в API

R-CQRS-SYNC-4: FastAPI — code-first, поэтому описание задержки живёт в docstring endpoint'а. Клиент не должен угадывать.

@router.get(
    "/orders/{order_id}/summary",
    response_model=OrderSummaryResponse,
    summary="Get order summary (read-projection)",
)
async def get_order_summary(
    order_id: UUID,
    handler: Annotated[GetOrderSummaryHandler, Depends(get_order_summary_handler)],
) -> OrderSummaryResponse:
    """
    Возвращает read-проекцию заказа из денормализованной таблицы `order_summary`.

    **Eventual consistency**: возможна задержка до 1 секунды между write-операцией
    и появлением обновления в этой проекции.

    Для immediate consistency (например, сразу после `POST /orders`)
    используйте `GET /orders/{order_id}` — полный агрегат из write-store.
    """
    result = await handler.handle(GetOrderSummary(order_id=OrderId(order_id)))
    return to_response(result)

Зачем:

  • Клиент знает: после POST /orders немедленный GET /orders/{id}/summary может вернуть устаревшие данные или 404.
  • В тестах и production troubleshooting eventual consistency — архитектурное свойство, а не баг, и оно задокументировано.
  • Документация генерируется автоматически в Swagger UI — видно без открытия кода.

Read-your-writes — три механизма

R-CQRS-SYNC-5: иногда нужно гарантировать, что клиент после своего write увидит этот write в read-проекции. Три варианта, в порядке возрастания инвазивности:

1. Sticky session в gateway

Запросы одного клиента приходят на тот же pod, который только что обработал write. Если read-model — локальный in-memory cache этого же сервиса, обновляемый consumer-ом синхронно после commit, клиент увидит свежие данные.

Не работает кросс-pod, кросс-сервис. Хрупко.

2. Polling в post-commit hook

class ConfirmOrderHandler:
    async def handle(self, cmd: ConfirmOrder) -> OrderId:
        async with self._uow:
            order = await self._orders.by_id(cmd.order_id)
            order.confirm(self._clock)
            await self._orders.save(order)
            await self._uow.commit()
        await self._poll_until_visible(order.id, timeout=2.0)
        return order.id

    async def _poll_until_visible(self, order_id: OrderId, timeout: float) -> None:
        deadline = monotonic() + timeout
        while monotonic() < deadline:
            exists = await self._summary_view.exists(order_id)
            if exists:
                return
            await asyncio.sleep(0.05)

Контракт: 99% укладывается в 2 секунды, в остальных — 504 / async-pattern на стороне клиента. Минус: реальный latency POST становится p99 = 2 с. Не подходит для частых операций.

3. Отдельный endpoint из write-store

Самое простое и честное решение: для сценариев, критичных к immediate consistency, — отдельный endpoint прямо из write-стороны.

@router.get("/orders/{order_id}/summary", response_model=OrderSummaryResponse)
async def get_order_summary(...) -> OrderSummaryResponse:
    """Eventual consistency, from read-projection (order_summary)."""
    ...

@router.get("/orders/{order_id}", response_model=OrderFullResponse)
async def get_order(...) -> OrderFullResponse:
    """Immediate consistency, from write-store (orders aggregate)."""
    ...

Два endpoint'а явно показывают trade-off клиенту. В большинстве случаев — правильный выбор.

Что запрещено

АнтипаттернПравилоЧто взамен
Синхронный INSERT/UPDATE read-таблицы внутри command-UoWR-CQRS-SYNC-X1Outbox + Kafka + consumer
PG-триггер write-таблица → read-таблицаR-CQRS-SYNC-X2Явный consumer в Python
Event payload — ORM-модель write-схемы (OrderRow)R-CQRS-SYNC-X3Независимый @dataclass(frozen=True) с версионированием
Consumer без idempotency-защитыR-CQRS-SYNC-2processed_event или idempotent UPDATE по version
Eventual consistency не задекларирована в APIR-CQRS-SYNC-4Явный docstring с описанием задержки в FastAPI
Ожидание событий из Kafka на bootstrap нового read-storeR-CQRS-SYNC-3Батч-rebuild из write-сессии

Куда дальше

  • Command side — как outbox-событие регистрируется в write-handler через order.pull_events().
  • Query side — <X>ViewRepository-Protocol и read-only сессия.
  • Read-model — где и в каком виде хранится проекция.
  • Уровень и эволюция — уровни CQRS и когда переходить к event-driven read-model.
  • Когда CQRS оправдан — когда lightweight, когда full split, когда не нужен вовсе.
  • CQRS → раздел 5. Синхронизация через события — нормативные R-CQRS-SYNC-*.