Опирается на правила:
R-CQRS-SYNC-1…R-CQRS-SYNC-5иR-CQRS-SYNC-X1…R-CQRS-SYNC-X3из CQRS Style Guide → раздел 5. Синхронизация через события.
Важно знать
- Sync write → read идёт через outbox + Kafka, не synchronously. Outbox-запись вставляется в той же
AsyncSession, что и изменение агрегата, и живёт до delivery.- Idempotent consumer обязателен: read-model UPDATE может прийти дважды. Защита —
processed_eventтаблица или idempotentUPDATE … WHERE version < ?.- Bootstrap-rebuild при первом запуске или потере read-model: батч по агрегатам через write-сессию, не ждём события за 30 дней из Kafka.
- Eventual consistency декларируется в API. FastAPI — code-first: описание задержки в docstring endpoint'а или в поле
descriptionresponse-модели.- Read-your-writes при необходимости — sticky session, polling в post-commit hook или отдельный endpoint из write-store.
- Никакого синхронного
INSERTв read-таблицу внутри command-UoW. Decoupling сразу теряется, при rollback read-model расходится.- Никаких PG-триггеров: невидимая магия, ломается на bulk, не переносится cross-DB.
- Никаких schema-coupled events: payload — независимый
@dataclass(frozen=True), не ORM-модель write-схемы.
Сердце CQRS — не разделение моделей, а способ их связать. Если write и read физически разнесены, нужен надёжный механизм передачи изменений, и он же определяет всё: latency, отказоустойчивость, формат данных. Outbox + Kafka — стандартная связка; всё остальное — детали её правильного применения в идиомах Python.
Outbox-pattern: атомарность с агрегатом
R-CQRS-SYNC-1: command-handler регистрирует событие через агрегат (order.confirm() поднимает domain-event); на await session.flush() / await uow.commit() событие попадает в outbox-таблицу той же БД, в той же AsyncSession, что и изменение агрегата.
# core/order/domain/events.py
@dataclass(frozen=True)
class OrderConfirmed:
event_id: UUID
order_id: OrderId
status: str
confirmed_at: datetime
aggregate_version: int
# adapter/out/persistence/order_repository.py
class SqlAlchemyOrderRepository:
def __init__(self, session: AsyncSession) -> None:
self._session = session
async def save(self, order: Order) -> None:
orm_row = to_orm(order)
await self._session.merge(orm_row)
for event in order.pull_events():
await self._session.execute(
insert(OutboxRow).values(
id=uuid4(),
event_type=type(event).__name__,
payload=to_json(event),
aggregate_id=str(order.id.value),
created_at=utcnow(),
published=False,
)
)
# core/order/usecase/confirm_order.py
class ConfirmOrderHandler:
def __init__(self, orders: OrderRepository, uow: UnitOfWork, clock: Clock) -> None:
self._orders = orders
self._uow = uow
self._clock = clock
async def handle(self, cmd: ConfirmOrder) -> OrderId:
async with self._uow:
order = await self._orders.by_id(cmd.order_id)
order.confirm(self._clock)
await self._orders.save(order)
await self._uow.commit()
return order.id
После commit:
- Outbox-relay — asyncio-задача (или отдельный воркер), которая раз в N миллисекунд делает
SELECT … FOR UPDATE SKIP LOCKED LIMIT 100изoutbox, публикует в Kafka черезaiokafka.AIOKafkaProducer, помечает строки как опубликованные. - Продюсер настроен idempotent (
enable_idempotence=True): Kafka обеспечивает at-least-once с дедупом на уровне partition.
Зачем outbox, а не «после commit — publish»:
- Commit прошёл, Kafka недоступна — событие потеряно, read-model рассинхронизируется навсегда.
- Commit не прошёл, Kafka уже получила — phantom-event, в БД нет соответствующего state.
- Outbox решает обе проблемы: пока строка лежит в
outbox, relay будет ретраить до delivery.
Idempotent consumer
R-CQRS-SYNC-2: consumer read-стороны обязан быть идемпотентным. At-least-once означает: одно сообщение придёт дважды. Два варианта защиты:
processed_event таблица
CREATE TABLE processed_event (
event_id UUID NOT NULL,
consumer TEXT NOT NULL,
processed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
PRIMARY KEY (event_id, consumer)
);
# adapter/in/kafka/order_summary_projector.py
class OrderSummaryProjector:
def __init__(self, session_factory: async_sessionmaker) -> None:
self._session_factory = session_factory
async def on_order_confirmed(self, raw: bytes) -> None:
event: OrderConfirmed = from_json(raw, OrderConfirmed)
async with self._session_factory() as session:
async with session.begin():
already = await session.scalar(
select(ProcessedEventRow).where(
ProcessedEventRow.event_id == event.event_id,
ProcessedEventRow.consumer == "order-summary-projector",
)
)
if already:
return
session.add(ProcessedEventRow(
event_id=event.event_id,
consumer="order-summary-projector",
))
await session.execute(
update(OrderSummaryRow)
.where(OrderSummaryRow.order_id == event.order_id.value)
.values(status=event.status, confirmed_at=event.confirmed_at)
)
Плюс: точная гарантия дедупа. Минус: дополнительная запись под каждое событие.
Idempotent UPDATE по version
В read-таблице хранится version (номер версии агрегата). UPDATE применяется только если событие новее:
async def on_order_confirmed(self, raw: bytes) -> None:
event: OrderConfirmed = from_json(raw, OrderConfirmed)
async with self._session_factory() as session:
async with session.begin():
result = await session.execute(
update(OrderSummaryRow)
.where(
OrderSummaryRow.order_id == event.order_id.value,
OrderSummaryRow.version < event.aggregate_version,
)
.values(
status=event.status,
confirmed_at=event.confirmed_at,
version=event.aggregate_version,
)
)
if result.rowcount == 0:
logger.debug("skip stale or duplicate: event_id=%s", event.event_id)
Плюс: не нужна отдельная таблица. Минус: требует aggregate_version в каждом событии. Подходит когда события одного агрегата идут в одной Kafka-партиции по aggregate_id.
Bootstrap и disaster recovery — синхронный rebuild
R-CQRS-SYNC-3: при первом запуске нового read-store или после потери — не ждём пока события придут из Kafka. Запускаем батч-rebuild из write-стороны.
# adapter/out/persistence/order_summary_bootstrap.py
class OrderSummaryBootstrap:
def __init__(
self,
read_session_factory: async_sessionmaker,
write_session_factory: async_sessionmaker,
) -> None:
self._read = read_session_factory
self._write = write_session_factory
async def run_if_empty(self) -> None:
async with self._read() as session:
count = await session.scalar(select(func.count()).select_from(OrderSummaryRow))
if count:
return
logger.info("order_summary is empty — running bootstrap rebuild")
await self._rebuild_all()
async def _rebuild_all(self) -> None:
last_id = 0
while True:
async with self._write() as session:
rows: list[OrderRow] = (await session.scalars(
select(OrderRow)
.where(OrderRow.id > last_id)
.order_by(OrderRow.id)
.limit(500)
)).all()
if not rows:
break
async with self._read() as session:
async with session.begin():
for row in rows:
await session.merge(to_summary(row))
last_id = rows[-1].id
Bootstrap — разовый: при создании нового read-store, после disaster recovery или при структурной миграции (добавили колонку, нужно дозаполнить). Запускается через lifespan FastAPI до начала приёма трафика:
@asynccontextmanager
async def lifespan(app: FastAPI):
await order_summary_bootstrap.run_if_empty()
async with kafka_relay_task():
yield
Eventual consistency декларируется в API
R-CQRS-SYNC-4: FastAPI — code-first, поэтому описание задержки живёт в docstring endpoint'а. Клиент не должен угадывать.
@router.get(
"/orders/{order_id}/summary",
response_model=OrderSummaryResponse,
summary="Get order summary (read-projection)",
)
async def get_order_summary(
order_id: UUID,
handler: Annotated[GetOrderSummaryHandler, Depends(get_order_summary_handler)],
) -> OrderSummaryResponse:
"""
Возвращает read-проекцию заказа из денормализованной таблицы `order_summary`.
**Eventual consistency**: возможна задержка до 1 секунды между write-операцией
и появлением обновления в этой проекции.
Для immediate consistency (например, сразу после `POST /orders`)
используйте `GET /orders/{order_id}` — полный агрегат из write-store.
"""
result = await handler.handle(GetOrderSummary(order_id=OrderId(order_id)))
return to_response(result)
Зачем:
- Клиент знает: после
POST /ordersнемедленныйGET /orders/{id}/summaryможет вернуть устаревшие данные или404. - В тестах и production troubleshooting eventual consistency — архитектурное свойство, а не баг, и оно задокументировано.
- Документация генерируется автоматически в Swagger UI — видно без открытия кода.
Read-your-writes — три механизма
R-CQRS-SYNC-5: иногда нужно гарантировать, что клиент после своего write увидит этот write в read-проекции. Три варианта, в порядке возрастания инвазивности:
1. Sticky session в gateway
Запросы одного клиента приходят на тот же pod, который только что обработал write. Если read-model — локальный in-memory cache этого же сервиса, обновляемый consumer-ом синхронно после commit, клиент увидит свежие данные.
Не работает кросс-pod, кросс-сервис. Хрупко.
2. Polling в post-commit hook
class ConfirmOrderHandler:
async def handle(self, cmd: ConfirmOrder) -> OrderId:
async with self._uow:
order = await self._orders.by_id(cmd.order_id)
order.confirm(self._clock)
await self._orders.save(order)
await self._uow.commit()
await self._poll_until_visible(order.id, timeout=2.0)
return order.id
async def _poll_until_visible(self, order_id: OrderId, timeout: float) -> None:
deadline = monotonic() + timeout
while monotonic() < deadline:
exists = await self._summary_view.exists(order_id)
if exists:
return
await asyncio.sleep(0.05)
Контракт: 99% укладывается в 2 секунды, в остальных — 504 / async-pattern на стороне клиента. Минус: реальный latency POST становится p99 = 2 с. Не подходит для частых операций.
3. Отдельный endpoint из write-store
Самое простое и честное решение: для сценариев, критичных к immediate consistency, — отдельный endpoint прямо из write-стороны.
@router.get("/orders/{order_id}/summary", response_model=OrderSummaryResponse)
async def get_order_summary(...) -> OrderSummaryResponse:
"""Eventual consistency, from read-projection (order_summary)."""
...
@router.get("/orders/{order_id}", response_model=OrderFullResponse)
async def get_order(...) -> OrderFullResponse:
"""Immediate consistency, from write-store (orders aggregate)."""
...
Два endpoint'а явно показывают trade-off клиенту. В большинстве случаев — правильный выбор.
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
Синхронный INSERT/UPDATE read-таблицы внутри command-UoW | R-CQRS-SYNC-X1 | Outbox + Kafka + consumer |
| PG-триггер write-таблица → read-таблица | R-CQRS-SYNC-X2 | Явный consumer в Python |
Event payload — ORM-модель write-схемы (OrderRow) | R-CQRS-SYNC-X3 | Независимый @dataclass(frozen=True) с версионированием |
| Consumer без idempotency-защиты | R-CQRS-SYNC-2 | processed_event или idempotent UPDATE по version |
| Eventual consistency не задекларирована в API | R-CQRS-SYNC-4 | Явный docstring с описанием задержки в FastAPI |
| Ожидание событий из Kafka на bootstrap нового read-store | R-CQRS-SYNC-3 | Батч-rebuild из write-сессии |
Куда дальше
- Command side — как outbox-событие регистрируется в write-handler через
order.pull_events(). - Query side —
<X>ViewRepository-Protocol и read-only сессия. - Read-model — где и в каком виде хранится проекция.
- Уровень и эволюция — уровни CQRS и когда переходить к event-driven read-model.
- Когда CQRS оправдан — когда lightweight, когда full split, когда не нужен вовсе.
- CQRS → раздел 5. Синхронизация через события — нормативные
R-CQRS-SYNC-*.