Разделить запись и чтение легко. Труднее — объяснить, как изменения, которые произошли на write-стороне, попадают в read-модель. Если связь ненадёжная, read-модель «отстаёт» навсегда: данные устарели, а клиент об этом не знает.
Здесь разберём стандартную схему для Python-стека: outbox + Kafka + idempotent consumer.
Почему нельзя просто «опубликовать событие после коммита»
Кажется, логично: сохранили агрегат, коммит прошёл — отправили событие в Kafka. Но тут два сценария поломки:
- Коммит прошёл, Kafka в этот момент недоступна — событие потеряно, read-модель расходится навсегда.
- Kafka получила событие, но коммит упал с ошибкой — в read-модели появились данные, которых нет на write-стороне.
Оба случая ломают систему по-разному, и оба трудно диагностировать.
Outbox-паттерн решает проблему иначе: событие записывается в ту же базу данных, в той же транзакции, что и изменение агрегата. Отдельный процесс (relay) читает эту таблицу и публикует в Kafka. Пока запись лежит в outbox — relay будет пытаться доставить её снова. Транзакция упала — ни агрегат, ни событие не сохранились. Kafka упала — событие никуда не делось, оно в базе.
Outbox: сохраняем событие вместе с агрегатом
Когда обработчик команды сохраняет агрегат, репозиторий берёт все «незафиксированные» события агрегата и записывает их в таблицу outbox той же AsyncSession:
# adapter/out/persistence/order_repository.py
class SqlAlchemyOrderRepository:
def __init__(self, session: AsyncSession) -> None:
self._session = session
async def save(self, order: Order) -> None:
orm_row = to_orm(order)
await self._session.merge(orm_row)
for event in order.pull_events():
await self._session.execute(
insert(OutboxRow).values(
id=uuid4(),
event_type=type(event).__name__,
payload=to_json(event),
aggregate_id=str(order.id.value),
created_at=utcnow(),
published=False,
)
)
Обработчик команды просто вызывает save и commit — о записи в outbox он ничего не знает:
# core/order/usecase/confirm_order.py
class ConfirmOrderHandler:
def __init__(self, orders: OrderRepository, uow: UnitOfWork, clock: Clock) -> None:
self._orders = orders
self._uow = uow
self._clock = clock
async def handle(self, cmd: ConfirmOrder) -> OrderId:
async with self._uow:
order = await self._orders.by_id(cmd.order_id)
order.confirm(self._clock)
await self._orders.save(order)
await self._uow.commit()
return order.id
Само событие — обычный dataclass, не ORM-модель:
@dataclass(frozen=True)
class OrderConfirmed:
event_id: UUID
order_id: OrderId
status: str
confirmed_at: datetime
aggregate_version: int
Важно: событие не должно быть привязано к ORM-классу write-схемы. Если структура таблицы orders изменится, формат события остаётся прежним — иначе любая миграция сломает читателей.
Relay: доставляем из outbox в Kafka
Relay — это asyncio-задача (или отдельный воркер), которая периодически берёт непубликованные строки из outbox и отправляет их в Kafka:
async def relay_loop(session_factory, producer):
while True:
async with session_factory() as session:
async with session.begin():
rows = (await session.scalars(
select(OutboxRow)
.where(OutboxRow.published == False)
.order_by(OutboxRow.created_at)
.limit(100)
.with_for_update(skip_locked=True)
)).all()
for row in rows:
await producer.send_and_wait(
topic="order-events",
key=row.aggregate_id.encode(),
value=row.payload.encode(),
)
row.published = True
await asyncio.sleep(0.1)
SKIP LOCKED позволяет запускать несколько relay-воркеров параллельно без конфликтов. Kafka-продюсер настроен с enable_idempotence=True — это защита от дублей на уровне Kafka.
Idempotent consumer: что делать, когда событие пришло дважды
Kafka гарантирует доставку «хотя бы один раз» (at-least-once). На практике одно сообщение может прийти дважды: сбой сети, перебалансировка партиций. Если consumer не защищён от дублей, read-модель обновится дважды — и это может привести к некорректному состоянию.
Два способа защиты:
Таблица обработанных событий
Храним (event_id, consumer) — пара, которую уже видели. Перед обновлением read-модели проверяем:
async def on_order_confirmed(self, raw: bytes) -> None:
event: OrderConfirmed = from_json(raw, OrderConfirmed)
async with self._session_factory() as session:
async with session.begin():
already = await session.scalar(
select(ProcessedEventRow).where(
ProcessedEventRow.event_id == event.event_id,
ProcessedEventRow.consumer == "order-summary-projector",
)
)
if already:
return
session.add(ProcessedEventRow(
event_id=event.event_id,
consumer="order-summary-projector",
))
await session.execute(
update(OrderSummaryRow)
.where(OrderSummaryRow.order_id == event.order_id.value)
.values(status=event.status, confirmed_at=event.confirmed_at)
)
Точная гарантия, но каждое событие добавляет строку в таблицу.
UPDATE по версии агрегата
Если в событии есть aggregate_version, можно применять UPDATE только тогда, когда событие новее текущего состояния:
async def on_order_confirmed(self, raw: bytes) -> None:
event: OrderConfirmed = from_json(raw, OrderConfirmed)
async with self._session_factory() as session:
async with session.begin():
result = await session.execute(
update(OrderSummaryRow)
.where(
OrderSummaryRow.order_id == event.order_id.value,
OrderSummaryRow.version < event.aggregate_version,
)
.values(
status=event.status,
confirmed_at=event.confirmed_at,
version=event.aggregate_version,
)
)
if result.rowcount == 0:
logger.debug("skip stale or duplicate: event_id=%s", event.event_id)
Дополнительная таблица не нужна. Подходит, когда события одного агрегата идут в одной Kafka-партиции (по aggregate_id как ключу).
Восстановление read-модели после сбоя
Новая read-модель или потеря данных после сбоя — не повод ждать, пока все события проиграются из Kafka заново (история может быть большой). Проще взять данные прямо с write-стороны:
class OrderSummaryBootstrap:
async def run_if_empty(self) -> None:
async with self._read() as session:
count = await session.scalar(select(func.count()).select_from(OrderSummaryRow))
if count:
return
await self._rebuild_all()
async def _rebuild_all(self) -> None:
last_id = 0
while True:
async with self._write() as session:
rows = (await session.scalars(
select(OrderRow)
.where(OrderRow.id > last_id)
.order_by(OrderRow.id)
.limit(500)
)).all()
if not rows:
break
async with self._read() as session:
async with session.begin():
for row in rows:
await session.merge(to_summary(row))
last_id = rows[-1].id
Запускается в lifespan FastAPI до начала приёма трафика:
@asynccontextmanager
async def lifespan(app: FastAPI):
await order_summary_bootstrap.run_if_empty()
async with kafka_relay_task():
yield
Как объяснить клиенту, что данные могут отставать
Read-проекция обновляется не мгновенно. Клиент, который только что создал заказ и сразу запросил его из read-модели, может получить устаревшие данные или вообще 404. Это нормально — но об этом нужно сказать явно в документации API.
В FastAPI это делается прямо в docstring endpoint'а:
@router.get(
"/orders/{order_id}/summary",
response_model=OrderSummaryResponse,
)
async def get_order_summary(order_id: UUID, ...) -> OrderSummaryResponse:
"""
Возвращает read-проекцию заказа из таблицы order_summary.
**Eventual consistency**: возможна задержка до 1 секунды между записью
и появлением обновления в этой проекции.
Для немедленной согласованности используйте GET /orders/{order_id} —
полный агрегат из write-store.
"""
...
Swagger UI покажет это описание автоматически — не нужно объяснять в отдельной документации.
Read-your-writes: когда клиенту нужно увидеть свои же данные сразу
Иногда требование чёткое: после записи клиент должен сразу получить актуальные данные. Три варианта по возрастанию надёжности:
Sticky session в gateway. Запросы одного клиента направляются на один под. Работает только если read-модель — локальный кэш этого же сервиса. Хрупко, не масштабируется кросс-сервисно.
Polling в обработчике команды. После коммита ждём, пока запись появится в read-проекции:
async def handle(self, cmd: ConfirmOrder) -> OrderId:
async with self._uow:
order = await self._orders.by_id(cmd.order_id)
order.confirm(self._clock)
await self._orders.save(order)
await self._uow.commit()
await self._poll_until_visible(order.id, timeout=2.0)
return order.id
async def _poll_until_visible(self, order_id: OrderId, timeout: float) -> None:
deadline = monotonic() + timeout
while monotonic() < deadline:
if await self._summary_view.exists(order_id):
return
await asyncio.sleep(0.05)
Минус: реальная задержка POST-запроса становится p99 = 2 секунды. Не подходит для частых операций.
Отдельный endpoint из write-store. Самое простое и честное решение: для сценариев с требованием немедленной согласованности — отдельный endpoint, читающий напрямую из write-стороны.
@router.get("/orders/{order_id}/summary")
async def get_order_summary(...):
"""Eventual consistency, из read-проекции."""
...
@router.get("/orders/{order_id}")
async def get_order(...):
"""Немедленная согласованность, из write-store."""
...
Два endpoint'а явно показывают trade-off клиенту. В большинстве случаев это правильный выбор.
Частые ошибки
Синхронная запись в read-таблицу внутри command-транзакции. Кажется удобным: обновили агрегат — тут же обновили проекцию. На деле разрушает весь смысл разделения: при откате транзакции read-модель расходится, а при росте нагрузки блокировки становятся узким местом.
PG-триггер с write-таблицы на read-таблицу. Невидимая логика, которая ломается на массовых операциях и не переносится в другую базу. Явный consumer в Python всегда лучше.
Payload события — это ORM-модель. Если в событии лежит OrderRow, то любая миграция схемы таблицы сломает consumer. Событие должно быть независимым @dataclass с собственным версионированием.
Consumer без защиты от дублей. At-least-once значит дубли будут. Без idempotency-защиты read-модель испортится при первом же ретрае.
Коротко
- Write и read в CQRS связывает outbox + Kafka + consumer, не прямая запись в read-таблицу.
- Outbox-запись идёт в той же транзакции, что и изменение агрегата — так не бывает «событие есть, но данных нет» и наоборот.
- Consumer обязан быть идемпотентным: используй таблицу
processed_eventили UPDATE по версии агрегата. - Новую read-модель восстанавливают пакетным запросом из write-стороны, не из истории Kafka.
- Задержка (eventual consistency) декларируется в docstring endpoint'а — клиент не должен угадывать.
- Для сценариев «увидеть сразу свои данные» — отдельный endpoint из write-store проще, чем polling.
Что почитать дальше
- Command side — как событие регистрируется в агрегате и handler'е.
- Query side — как устроен read-только репозиторий.
- Read-model — где и в каком виде хранится проекция.
- Когда CQRS оправдан — когда стоит применять, а когда нет.