CQRS — это не переключатель «включено/выключено». Это шкала: можно взять самый простой вариант и постепенно усложнять по мере роста системы. Начинать с максимума — дорого и бессмысленно.
Уровень 1 — просто FastAPI без CQRS
Небольшой сервис: несколько роутеров, сервисный слой, один AsyncSession на всё. Чтение и запись идут через один класс, никаких специальных маркеров нет.
# routers/orders.py
@router.post("/orders", response_model=OrderResponse)
async def create_order(
payload: CreateOrderRequest,
session: AsyncSession = Depends(get_session),
) -> OrderResponse:
return await OrderService(session).create(payload)
@router.get("/orders/{order_id}", response_model=OrderResponse)
async def get_order(
order_id: UUID,
session: AsyncSession = Depends(get_session),
) -> OrderResponse:
return await OrderService(session).get(order_id)
Это нормально для внутренних утилит и простых CRUD-сервисов. Добавлять маркеры Command/Query сюда без реального разделения — лишняя сложность без выгоды.
Уровень 2 — лёгкое CQRS с маркерами
Сервис вырос: появилась настоящая бизнес-логика, команды и запросы начинают расходиться. Вводят Use Case Pattern — отдельные классы-команды и классы-запросы с явными маркерами.
# core/order/usecase/create_order.py
@dataclass(frozen=True)
class CreateOrder: # это команда — меняет состояние
customer_id: CustomerId
items: tuple[OrderItemDto, ...]
class CreateOrderHandler:
def __init__(self, orders: OrderRepository, uow: UnitOfWork) -> None:
self._orders = orders
self._uow = uow
async def handle(self, cmd: CreateOrder) -> OrderId:
async with self._uow:
order = Order.place(cmd.customer_id, cmd.items)
await self._orders.save(order)
await self._uow.commit()
return order.id
# core/order/usecase/get_order.py
@dataclass(frozen=True)
class GetOrder: # это запрос — только читает
order_id: OrderId
class GetOrderHandler:
def __init__(self, orders: OrderRepository) -> None:
self._orders = orders
async def handle(self, query: GetOrder) -> OrderJson:
order = await self._orders.by_id_readonly(query.order_id)
return OrderMapper.to_json(order)
Главное отличие от Уровня 1 — не просто слова «команда/запрос», а реальное разделение в поведении:
- Read-handler работает с read-only сессией: нет
commit, нетFOR UPDATE. - Write-handler работает через Unit of Work с явным
commit. - Разные пути валидации: команды проверяют бизнес-правила, запросы — параметры пагинации и фильтров.
Репозиторий при этом один — OrderRepository. Разделять интерфейсы на этом уровне рано.
Уровень 3 split — отдельный репозиторий для чтения
Когда read-сторона начинает жить своей жизнью: UI хочет проекций, аналитики, сводок — всего, что не совпадает с формой агрегата. Тогда появляется второй протокол — OrderViewRepository.
# core/order/port/out/order_repository.py
class OrderRepository(Protocol):
async def by_id(self, order_id: OrderId) -> Order: ... # FOR UPDATE
async def save(self, order: Order) -> None: ...
# core/order/port/out/order_view_repository.py
class OrderViewRepository(Protocol):
async def find_by_id(self, order_id: OrderId) -> OrderSummary | None: ...
async def search(
self,
customer_id: CustomerId,
status: OrderStatus | None,
page: int,
size: int,
) -> Page[OrderSummary]: ...
# core/order/port/view/order_summary.py
@dataclass(frozen=True)
class OrderSummary:
order_id: OrderId
customer_name: str
total_amount: Decimal
status: OrderStatus
created_at: datetime
Что меняется:
- Write-side (
OrderRepository) возвращает агрегат и блокирует строку для изменений. - Read-side (
OrderViewRepository) возвращает лёгкий read-DTO, заточенный под конкретный экран или API. - Query-handler идёт только в
OrderViewRepository— обращаться вOrderRepositoryза данными для чтения стало неправильным.
Физически всё ещё один PostgreSQL. Разделение пока только на уровне кода, не инфраструктуры. Подробнее о read-стороне — в статье Query side.
Уровень 3 event-driven — отдельное хранилище для read-model
Самый сложный вариант. Нужен, когда read-нагрузка начинает мешать write-стороне, или когда проекции требуют принципиально другой структуры данных — например, полнотекстового поиска или аналитических сводок.
write-side: read-side:
PostgreSQL order_summary (отдельная таблица)
├── order (агрегат) ├── денормализованная схема
└── outbox └── индексы под конкретные запросы
↓
outbox-relay (asyncio задача)
↓
Kafka (order.events)
↓
read-side consumer (aiokafka)
↓
UPSERT order_summary
Write-side пишет в агрегатную таблицу и кладёт событие в outbox. Relay-задача вычитывает outbox и отправляет в Kafka. Consumer получает событие и обновляет read-таблицу:
# adapter/messaging/order_summary_consumer.py
async def handle_order_confirmed(event: OrderConfirmed, session: AsyncSession) -> None:
await session.execute(
insert(OrderSummaryRow)
.values(order_id=event.order_id, status="CONFIRMED", version=event.aggregate_version, ...)
.on_conflict_do_update(
index_elements=["order_id"],
set_={"status": "CONFIRMED", "version": event.aggregate_version},
where=OrderSummaryRow.version < event.aggregate_version,
)
)
await session.commit()
on_conflict_do_update с проверкой version — защита от применения устаревшего события поверх более свежего.
Что добавляется к предыдущему уровню:
- Outbox-таблица в write-базе данных.
- Asyncio-задача для вычитывания outbox (с
SKIP LOCKED). - Kafka-топик с событиями домена.
- Consumer для обновления read-таблицы.
- Механизм восстановления read-store при первом запуске или после сбоя.
Цена этой архитектуры: данные на read-стороне немного отстают от write (обычно 100–500 мс). Появляются новые точки отказа: отставание consumer, зависший outbox, рассинхронизация. До того, как боль станет реальной — read-replica и кеш решат дешевле.
Как правильно двигаться по уровням
Путь только в одну сторону: 1 → 2 → 3-split → 3-event-driven. Перепрыгивать уровни не стоит — каждый следующий уровень решает конкретную боль, которую вы ещё не почувствовали на предыдущем.
Типичная история сервиса:
- Стартовали как простой CRUD — Уровень 1.
- Появилась настоящая доменная логика — перешли на Уровень 2 с UseCase/Handler и маркерами.
- Read и write стали расходиться по форме — перешли на Уровень 3 split с
OrderViewRepository. - Запросы на чтение начали мешать записи, или понадобился полнотекстовый поиск — перешли на Уровень 3 event-driven с отдельной таблицей.
Возврат назад бывает, но редко: объединили два сервиса, убрали сложные проекции. Обычно это говорит о том, что с самого начала взяли уровень выше нужного.
Частые ошибки:
- Добавить маркеры
Command/Queryна Уровне 1 без read-only сессии и отдельных путей — красивые слова без эффекта. - На Уровне 3 event-driven оставить один
OrderRepositoryдля чтения и записи — write и read имеют разную инфраструктуру, значит и интерфейсы должны быть разными. - Запустить event-driven read-model без механизма восстановления — при сбое consumer данные рассинхронизируются навсегда.
Коротко
- CQRS — шкала из четырёх позиций: плоский сервис, лёгкое CQRS с маркерами, split-репозитории, event-driven read-model.
- На каждом уровне берут ровно столько инфраструктуры, сколько реально нужно при текущей нагрузке.
- Уровень 2: маркеры
Command/Queryобязательны вместе с read-only сессией на query-handler-ах — без этого маркер декорация. - Уровень 3 split:
OrderViewRepositoryвозвращает read-DTO,OrderRepository— агрегат; пути не пересекаются. - Уровень 3 event-driven: read-model обновляется через outbox → Kafka → consumer; данные немного отстают, зато write-side свободна от read-нагрузки.
- Двигаться по уровням только вперёд, только по реальной боли — не потому что «так принято».
Что почитать дальше
- Command side — write-handler, UnitOfWork, команды как frozen dataclass.
- Query side — read-handler с
ViewRepository, read-only сессия. - Read-model — где хранить отдельную проекцию на event-driven уровне.
- Sync через события — outbox и aiokafka шаг за шагом.
- Когда CQRS оправдан — пороги перехода между уровнями.