← назад к разделу

CQRS — это не переключатель «включено/выключено». Это шкала: можно взять самый простой вариант и постепенно усложнять по мере роста системы. Начинать с максимума — дорого и бессмысленно.

Уровень 1 — просто FastAPI без CQRS

Небольшой сервис: несколько роутеров, сервисный слой, один AsyncSession на всё. Чтение и запись идут через один класс, никаких специальных маркеров нет.

# routers/orders.py
@router.post("/orders", response_model=OrderResponse)
async def create_order(
    payload: CreateOrderRequest,
    session: AsyncSession = Depends(get_session),
) -> OrderResponse:
    return await OrderService(session).create(payload)

@router.get("/orders/{order_id}", response_model=OrderResponse)
async def get_order(
    order_id: UUID,
    session: AsyncSession = Depends(get_session),
) -> OrderResponse:
    return await OrderService(session).get(order_id)

Это нормально для внутренних утилит и простых CRUD-сервисов. Добавлять маркеры Command/Query сюда без реального разделения — лишняя сложность без выгоды.

Уровень 2 — лёгкое CQRS с маркерами

Сервис вырос: появилась настоящая бизнес-логика, команды и запросы начинают расходиться. Вводят Use Case Pattern — отдельные классы-команды и классы-запросы с явными маркерами.

# core/order/usecase/create_order.py
@dataclass(frozen=True)
class CreateOrder:           # это команда — меняет состояние
    customer_id: CustomerId
    items: tuple[OrderItemDto, ...]

class CreateOrderHandler:
    def __init__(self, orders: OrderRepository, uow: UnitOfWork) -> None:
        self._orders = orders
        self._uow = uow

    async def handle(self, cmd: CreateOrder) -> OrderId:
        async with self._uow:
            order = Order.place(cmd.customer_id, cmd.items)
            await self._orders.save(order)
            await self._uow.commit()
        return order.id
# core/order/usecase/get_order.py
@dataclass(frozen=True)
class GetOrder:              # это запрос — только читает
    order_id: OrderId

class GetOrderHandler:
    def __init__(self, orders: OrderRepository) -> None:
        self._orders = orders

    async def handle(self, query: GetOrder) -> OrderJson:
        order = await self._orders.by_id_readonly(query.order_id)
        return OrderMapper.to_json(order)

Главное отличие от Уровня 1 — не просто слова «команда/запрос», а реальное разделение в поведении:

  • Read-handler работает с read-only сессией: нет commit, нет FOR UPDATE.
  • Write-handler работает через Unit of Work с явным commit.
  • Разные пути валидации: команды проверяют бизнес-правила, запросы — параметры пагинации и фильтров.

Репозиторий при этом один — OrderRepository. Разделять интерфейсы на этом уровне рано.

Уровень 3 split — отдельный репозиторий для чтения

Когда read-сторона начинает жить своей жизнью: UI хочет проекций, аналитики, сводок — всего, что не совпадает с формой агрегата. Тогда появляется второй протокол — OrderViewRepository.

# core/order/port/out/order_repository.py
class OrderRepository(Protocol):
    async def by_id(self, order_id: OrderId) -> Order: ...     # FOR UPDATE
    async def save(self, order: Order) -> None: ...

# core/order/port/out/order_view_repository.py
class OrderViewRepository(Protocol):
    async def find_by_id(self, order_id: OrderId) -> OrderSummary | None: ...
    async def search(
        self,
        customer_id: CustomerId,
        status: OrderStatus | None,
        page: int,
        size: int,
    ) -> Page[OrderSummary]: ...
# core/order/port/view/order_summary.py
@dataclass(frozen=True)
class OrderSummary:
    order_id: OrderId
    customer_name: str
    total_amount: Decimal
    status: OrderStatus
    created_at: datetime

Что меняется:

  • Write-side (OrderRepository) возвращает агрегат и блокирует строку для изменений.
  • Read-side (OrderViewRepository) возвращает лёгкий read-DTO, заточенный под конкретный экран или API.
  • Query-handler идёт только в OrderViewRepository — обращаться в OrderRepository за данными для чтения стало неправильным.

Физически всё ещё один PostgreSQL. Разделение пока только на уровне кода, не инфраструктуры. Подробнее о read-стороне — в статье Query side.

Уровень 3 event-driven — отдельное хранилище для read-model

Самый сложный вариант. Нужен, когда read-нагрузка начинает мешать write-стороне, или когда проекции требуют принципиально другой структуры данных — например, полнотекстового поиска или аналитических сводок.

write-side:                       read-side:
  PostgreSQL                        order_summary (отдельная таблица)
  ├── order (агрегат)               ├── денормализованная схема
  └── outbox                        └── индексы под конкретные запросы
       ↓
  outbox-relay (asyncio задача)
       ↓
  Kafka (order.events)
       ↓
  read-side consumer (aiokafka)
       ↓
  UPSERT order_summary

Write-side пишет в агрегатную таблицу и кладёт событие в outbox. Relay-задача вычитывает outbox и отправляет в Kafka. Consumer получает событие и обновляет read-таблицу:

# adapter/messaging/order_summary_consumer.py
async def handle_order_confirmed(event: OrderConfirmed, session: AsyncSession) -> None:
    await session.execute(
        insert(OrderSummaryRow)
        .values(order_id=event.order_id, status="CONFIRMED", version=event.aggregate_version, ...)
        .on_conflict_do_update(
            index_elements=["order_id"],
            set_={"status": "CONFIRMED", "version": event.aggregate_version},
            where=OrderSummaryRow.version < event.aggregate_version,
        )
    )
    await session.commit()

on_conflict_do_update с проверкой version — защита от применения устаревшего события поверх более свежего.

Что добавляется к предыдущему уровню:

  • Outbox-таблица в write-базе данных.
  • Asyncio-задача для вычитывания outbox (с SKIP LOCKED).
  • Kafka-топик с событиями домена.
  • Consumer для обновления read-таблицы.
  • Механизм восстановления read-store при первом запуске или после сбоя.

Цена этой архитектуры: данные на read-стороне немного отстают от write (обычно 100–500 мс). Появляются новые точки отказа: отставание consumer, зависший outbox, рассинхронизация. До того, как боль станет реальной — read-replica и кеш решат дешевле.

Как правильно двигаться по уровням

Путь только в одну сторону: 1 → 2 → 3-split → 3-event-driven. Перепрыгивать уровни не стоит — каждый следующий уровень решает конкретную боль, которую вы ещё не почувствовали на предыдущем.

Типичная история сервиса:

  1. Стартовали как простой CRUD — Уровень 1.
  2. Появилась настоящая доменная логика — перешли на Уровень 2 с UseCase/Handler и маркерами.
  3. Read и write стали расходиться по форме — перешли на Уровень 3 split с OrderViewRepository.
  4. Запросы на чтение начали мешать записи, или понадобился полнотекстовый поиск — перешли на Уровень 3 event-driven с отдельной таблицей.

Возврат назад бывает, но редко: объединили два сервиса, убрали сложные проекции. Обычно это говорит о том, что с самого начала взяли уровень выше нужного.

Частые ошибки:

  • Добавить маркеры Command/Query на Уровне 1 без read-only сессии и отдельных путей — красивые слова без эффекта.
  • На Уровне 3 event-driven оставить один OrderRepository для чтения и записи — write и read имеют разную инфраструктуру, значит и интерфейсы должны быть разными.
  • Запустить event-driven read-model без механизма восстановления — при сбое consumer данные рассинхронизируются навсегда.

Коротко

  • CQRS — шкала из четырёх позиций: плоский сервис, лёгкое CQRS с маркерами, split-репозитории, event-driven read-model.
  • На каждом уровне берут ровно столько инфраструктуры, сколько реально нужно при текущей нагрузке.
  • Уровень 2: маркеры Command/Query обязательны вместе с read-only сессией на query-handler-ах — без этого маркер декорация.
  • Уровень 3 split: OrderViewRepository возвращает read-DTO, OrderRepository — агрегат; пути не пересекаются.
  • Уровень 3 event-driven: read-model обновляется через outbox → Kafka → consumer; данные немного отстают, зато write-side свободна от read-нагрузки.
  • Двигаться по уровням только вперёд, только по реальной боли — не потому что «так принято».

Что почитать дальше

  • Command side — write-handler, UnitOfWork, команды как frozen dataclass.
  • Query side — read-handler с ViewRepository, read-only сессия.
  • Read-model — где хранить отдельную проекцию на event-driven уровне.
  • Sync через события — outbox и aiokafka шаг за шагом.
  • Когда CQRS оправдан — пороги перехода между уровнями.