← назад к разделу

Разделить запись и чтение легко. Труднее — объяснить, как изменения, которые произошли на write-стороне, попадают в read-модель. Если связь ненадёжная, read-модель «отстаёт» навсегда: данные устарели, а клиент об этом не знает.

Здесь разберём стандартную схему для Python-стека: outbox + Kafka + idempotent consumer.

Почему нельзя просто «опубликовать событие после коммита»

Кажется, логично: сохранили агрегат, коммит прошёл — отправили событие в Kafka. Но тут два сценария поломки:

  • Коммит прошёл, Kafka в этот момент недоступна — событие потеряно, read-модель расходится навсегда.
  • Kafka получила событие, но коммит упал с ошибкой — в read-модели появились данные, которых нет на write-стороне.

Оба случая ломают систему по-разному, и оба трудно диагностировать.

Outbox-паттерн решает проблему иначе: событие записывается в ту же базу данных, в той же транзакции, что и изменение агрегата. Отдельный процесс (relay) читает эту таблицу и публикует в Kafka. Пока запись лежит в outbox — relay будет пытаться доставить её снова. Транзакция упала — ни агрегат, ни событие не сохранились. Kafka упала — событие никуда не делось, оно в базе.

Outbox: сохраняем событие вместе с агрегатом

Когда обработчик команды сохраняет агрегат, репозиторий берёт все «незафиксированные» события агрегата и записывает их в таблицу outbox той же AsyncSession:

# adapter/out/persistence/order_repository.py
class SqlAlchemyOrderRepository:
    def __init__(self, session: AsyncSession) -> None:
        self._session = session

    async def save(self, order: Order) -> None:
        orm_row = to_orm(order)
        await self._session.merge(orm_row)
        for event in order.pull_events():
            await self._session.execute(
                insert(OutboxRow).values(
                    id=uuid4(),
                    event_type=type(event).__name__,
                    payload=to_json(event),
                    aggregate_id=str(order.id.value),
                    created_at=utcnow(),
                    published=False,
                )
            )

Обработчик команды просто вызывает save и commit — о записи в outbox он ничего не знает:

# core/order/usecase/confirm_order.py
class ConfirmOrderHandler:
    def __init__(self, orders: OrderRepository, uow: UnitOfWork, clock: Clock) -> None:
        self._orders = orders
        self._uow = uow
        self._clock = clock

    async def handle(self, cmd: ConfirmOrder) -> OrderId:
        async with self._uow:
            order = await self._orders.by_id(cmd.order_id)
            order.confirm(self._clock)
            await self._orders.save(order)
            await self._uow.commit()
        return order.id

Само событие — обычный dataclass, не ORM-модель:

@dataclass(frozen=True)
class OrderConfirmed:
    event_id: UUID
    order_id: OrderId
    status: str
    confirmed_at: datetime
    aggregate_version: int

Важно: событие не должно быть привязано к ORM-классу write-схемы. Если структура таблицы orders изменится, формат события остаётся прежним — иначе любая миграция сломает читателей.

Relay: доставляем из outbox в Kafka

Relay — это asyncio-задача (или отдельный воркер), которая периодически берёт непубликованные строки из outbox и отправляет их в Kafka:

async def relay_loop(session_factory, producer):
    while True:
        async with session_factory() as session:
            async with session.begin():
                rows = (await session.scalars(
                    select(OutboxRow)
                    .where(OutboxRow.published == False)
                    .order_by(OutboxRow.created_at)
                    .limit(100)
                    .with_for_update(skip_locked=True)
                )).all()
                for row in rows:
                    await producer.send_and_wait(
                        topic="order-events",
                        key=row.aggregate_id.encode(),
                        value=row.payload.encode(),
                    )
                    row.published = True
        await asyncio.sleep(0.1)

SKIP LOCKED позволяет запускать несколько relay-воркеров параллельно без конфликтов. Kafka-продюсер настроен с enable_idempotence=True — это защита от дублей на уровне Kafka.

Idempotent consumer: что делать, когда событие пришло дважды

Kafka гарантирует доставку «хотя бы один раз» (at-least-once). На практике одно сообщение может прийти дважды: сбой сети, перебалансировка партиций. Если consumer не защищён от дублей, read-модель обновится дважды — и это может привести к некорректному состоянию.

Два способа защиты:

Таблица обработанных событий

Храним (event_id, consumer) — пара, которую уже видели. Перед обновлением read-модели проверяем:

async def on_order_confirmed(self, raw: bytes) -> None:
    event: OrderConfirmed = from_json(raw, OrderConfirmed)
    async with self._session_factory() as session:
        async with session.begin():
            already = await session.scalar(
                select(ProcessedEventRow).where(
                    ProcessedEventRow.event_id == event.event_id,
                    ProcessedEventRow.consumer == "order-summary-projector",
                )
            )
            if already:
                return
            session.add(ProcessedEventRow(
                event_id=event.event_id,
                consumer="order-summary-projector",
            ))
            await session.execute(
                update(OrderSummaryRow)
                .where(OrderSummaryRow.order_id == event.order_id.value)
                .values(status=event.status, confirmed_at=event.confirmed_at)
            )

Точная гарантия, но каждое событие добавляет строку в таблицу.

UPDATE по версии агрегата

Если в событии есть aggregate_version, можно применять UPDATE только тогда, когда событие новее текущего состояния:

async def on_order_confirmed(self, raw: bytes) -> None:
    event: OrderConfirmed = from_json(raw, OrderConfirmed)
    async with self._session_factory() as session:
        async with session.begin():
            result = await session.execute(
                update(OrderSummaryRow)
                .where(
                    OrderSummaryRow.order_id == event.order_id.value,
                    OrderSummaryRow.version < event.aggregate_version,
                )
                .values(
                    status=event.status,
                    confirmed_at=event.confirmed_at,
                    version=event.aggregate_version,
                )
            )
            if result.rowcount == 0:
                logger.debug("skip stale or duplicate: event_id=%s", event.event_id)

Дополнительная таблица не нужна. Подходит, когда события одного агрегата идут в одной Kafka-партиции (по aggregate_id как ключу).

Восстановление read-модели после сбоя

Новая read-модель или потеря данных после сбоя — не повод ждать, пока все события проиграются из Kafka заново (история может быть большой). Проще взять данные прямо с write-стороны:

class OrderSummaryBootstrap:
    async def run_if_empty(self) -> None:
        async with self._read() as session:
            count = await session.scalar(select(func.count()).select_from(OrderSummaryRow))
        if count:
            return
        await self._rebuild_all()

    async def _rebuild_all(self) -> None:
        last_id = 0
        while True:
            async with self._write() as session:
                rows = (await session.scalars(
                    select(OrderRow)
                    .where(OrderRow.id > last_id)
                    .order_by(OrderRow.id)
                    .limit(500)
                )).all()
            if not rows:
                break
            async with self._read() as session:
                async with session.begin():
                    for row in rows:
                        await session.merge(to_summary(row))
            last_id = rows[-1].id

Запускается в lifespan FastAPI до начала приёма трафика:

@asynccontextmanager
async def lifespan(app: FastAPI):
    await order_summary_bootstrap.run_if_empty()
    async with kafka_relay_task():
        yield

Как объяснить клиенту, что данные могут отставать

Read-проекция обновляется не мгновенно. Клиент, который только что создал заказ и сразу запросил его из read-модели, может получить устаревшие данные или вообще 404. Это нормально — но об этом нужно сказать явно в документации API.

В FastAPI это делается прямо в docstring endpoint'а:

@router.get(
    "/orders/{order_id}/summary",
    response_model=OrderSummaryResponse,
)
async def get_order_summary(order_id: UUID, ...) -> OrderSummaryResponse:
    """
    Возвращает read-проекцию заказа из таблицы order_summary.

    **Eventual consistency**: возможна задержка до 1 секунды между записью
    и появлением обновления в этой проекции.

    Для немедленной согласованности используйте GET /orders/{order_id} —
    полный агрегат из write-store.
    """
    ...

Swagger UI покажет это описание автоматически — не нужно объяснять в отдельной документации.

Read-your-writes: когда клиенту нужно увидеть свои же данные сразу

Иногда требование чёткое: после записи клиент должен сразу получить актуальные данные. Три варианта по возрастанию надёжности:

Sticky session в gateway. Запросы одного клиента направляются на один под. Работает только если read-модель — локальный кэш этого же сервиса. Хрупко, не масштабируется кросс-сервисно.

Polling в обработчике команды. После коммита ждём, пока запись появится в read-проекции:

async def handle(self, cmd: ConfirmOrder) -> OrderId:
    async with self._uow:
        order = await self._orders.by_id(cmd.order_id)
        order.confirm(self._clock)
        await self._orders.save(order)
        await self._uow.commit()
    await self._poll_until_visible(order.id, timeout=2.0)
    return order.id

async def _poll_until_visible(self, order_id: OrderId, timeout: float) -> None:
    deadline = monotonic() + timeout
    while monotonic() < deadline:
        if await self._summary_view.exists(order_id):
            return
        await asyncio.sleep(0.05)

Минус: реальная задержка POST-запроса становится p99 = 2 секунды. Не подходит для частых операций.

Отдельный endpoint из write-store. Самое простое и честное решение: для сценариев с требованием немедленной согласованности — отдельный endpoint, читающий напрямую из write-стороны.

@router.get("/orders/{order_id}/summary")
async def get_order_summary(...):
    """Eventual consistency, из read-проекции."""
    ...

@router.get("/orders/{order_id}")
async def get_order(...):
    """Немедленная согласованность, из write-store."""
    ...

Два endpoint'а явно показывают trade-off клиенту. В большинстве случаев это правильный выбор.

Частые ошибки

Синхронная запись в read-таблицу внутри command-транзакции. Кажется удобным: обновили агрегат — тут же обновили проекцию. На деле разрушает весь смысл разделения: при откате транзакции read-модель расходится, а при росте нагрузки блокировки становятся узким местом.

PG-триггер с write-таблицы на read-таблицу. Невидимая логика, которая ломается на массовых операциях и не переносится в другую базу. Явный consumer в Python всегда лучше.

Payload события — это ORM-модель. Если в событии лежит OrderRow, то любая миграция схемы таблицы сломает consumer. Событие должно быть независимым @dataclass с собственным версионированием.

Consumer без защиты от дублей. At-least-once значит дубли будут. Без idempotency-защиты read-модель испортится при первом же ретрае.

Коротко

  • Write и read в CQRS связывает outbox + Kafka + consumer, не прямая запись в read-таблицу.
  • Outbox-запись идёт в той же транзакции, что и изменение агрегата — так не бывает «событие есть, но данных нет» и наоборот.
  • Consumer обязан быть идемпотентным: используй таблицу processed_event или UPDATE по версии агрегата.
  • Новую read-модель восстанавливают пакетным запросом из write-стороны, не из истории Kafka.
  • Задержка (eventual consistency) декларируется в docstring endpoint'а — клиент не должен угадывать.
  • Для сценариев «увидеть сразу свои данные» — отдельный endpoint из write-store проще, чем polling.

Что почитать дальше

  • Command side — как событие регистрируется в агрегате и handler'е.
  • Query side — как устроен read-только репозиторий.
  • Read-model — где и в каком виде хранится проекция.
  • Когда CQRS оправдан — когда стоит применять, а когда нет.