Представьте сервис заказов. Запись заказа — строгая: проверяем остаток, блокируем позицию, сохраняем транзакционно. Чтение — другое: собираем сводку из шести таблиц, отдаём в JSON. Это две совершенно разные задачи, но зачастую их обслуживает один и тот же класс и одна и та же сессия базы данных.
CQRS (Command Query Responsibility Segregation) говорит: разделите модель записи и модель чтения. В простом виде — два типа handler-ов с разными настройками транзакции. В сложном — физически разные хранилища с синхронизацией через события. Между этими крайностями — спектр, и переходить по нему стоит только тогда, когда есть реальная причина.
Три уровня CQRS
Паттерн не бинарный. Есть три точки на шкале, и начинать нужно с самой простой:
| Уровень | Что разделено | Когда имеет смысл |
|---|---|---|
| Маркеры | Command / Query — разные типы, разные сессии | Всегда, как только появляются handler-ы |
| Отдельная read-таблица | Денормализованная таблица в той же БД, отдельный репозиторий | Тяжёлые join-ы, сложные агрегации |
| Раздельные хранилища | PostgreSQL для записи + Redis или ElasticSearch для чтения | read:write ≥ 10:1, задачи поиска/аналитики |
Движение всегда снизу вверх — сначала маркеры, потом отдельная таблица, потом отдельное хранилище. Прыгать сразу на третий уровень «на вырост» — распространённая ошибка.
Уровень первый: маркеры без лишней инфраструктуры
Самый простой вид CQRS — просто обозначить тип операции через отдельный класс. Command меняет состояние, Query только читает.
В Python это @dataclass(frozen=True) — иммутабельный объект, который описывает намерение:
from dataclasses import dataclass
from app.core.order.domain.model import OrderId
@dataclass(frozen=True)
class ConfirmOrder: # это команда — меняет состояние
order_id: OrderId
idempotency_key: str
@dataclass(frozen=True)
class GetOrderSummary: # это запрос — только читает
order_id: OrderId
Важный момент: query-handler должен получать сессию базы данных без права на запись. Тогда случайный session.add() просто не сохранится:
class GetOrderSummaryHandler:
def __init__(self, orders: OrderRepository) -> None:
self._orders = orders
async def handle(self, query: GetOrderSummary) -> OrderSummaryView:
return await self._orders.by_id_readonly(query.order_id)
Что это даёт на практике:
- Mypy и Pyright видят разницу между
CommandиQueryна уровне типов — ошибка перепутать их. - Read-only сессия защищает от случайных изменений в query-handler.
- Метрики читаются по смыслу:
app_command_total{name="ConfirmOrder"}иapp_query_total{name="GetOrderSummary"}— разные счётчики. - Валидация разная: для команды — строгая (Pydantic с
frozen=True), для запроса — лёгкая (проверка диапазонов page/size).
При этом и read, и write ходят в одну и ту же базу данных. Никакой дополнительной инфраструктуры.
Уровень второй: отдельная таблица для чтения
Когда query-handler начинает собирать данные из пяти-шести таблиц при каждом запросе, появляется реальная боль: медленные join-ы, высокая нагрузка на CPU, сложные запросы, которые трудно оптимизировать.
Пример: OrderSummaryView собирается из order, order_item, product, customer, payment, shipment. При загрузке истории заказов — каждый раз все шесть join-ов.
Решение — денормализованная таблица order_summary в той же базе данных. Не новое хранилище, просто отдельная таблица с уже готовой структурой для API:
@dataclass(frozen=True)
class OrderSummaryView:
order_id: int
customer_name: str # денормализовано из customer
status: str
item_count: int # заранее посчитано
total_amount: Decimal
created_at: datetime
Репозиторий для чтения объявляется отдельным Protocol:
from typing import Protocol
from app.core.order.domain.model import OrderId, CustomerId
class OrderViewRepository(Protocol):
async def by_id(self, order_id: OrderId) -> OrderSummaryView: ...
async def by_customer(
self,
customer_id: CustomerId,
limit: int,
offset: int,
) -> list[OrderSummaryView]: ...
Запрос GET /customers/{id}/orders теперь — один SELECT по индексу, без join-ов. Синхронизация между write-таблицами и read-таблицей идёт через outbox-паттерн внутри того же сервиса.
Типичные сигналы для перехода на этот уровень:
- Типичный query собирает 5 и более join-ов.
- Тяжёлые
GROUP BYпо миллионам строк для дашборда или аналитики. - UI хочет
customer_name,total_items,last_status_change— а в нормализованной схеме это собирается заново при каждом запросе.
Уровень третий: разные хранилища — только при измеренной боли
Полное разделение хранилищ оправдано только в конкретных ситуациях с реальными цифрами:
Соотношение чтения и записи 10:1 и выше. Типичный интернет-магазин: один заказ создаётся, но карточка показывается десять раз в разных сценариях (история, поиск, аналитика, уведомления). При таком соотношении read-нагрузка диктует архитектуру.
Принципиально разная структура данных. Полнотекстовый поиск по описаниям с фильтрами по двадцати параметрам и ранжированием по релевантности — это не реляционная задача. ElasticSearch или OpenSearch с инвертированным индексом решают её на порядок эффективнее PostgreSQL.
Read-нагрузка превышает возможности write-базы. PostgreSQL выдерживает 5 тысяч операций записи в секунду, но 50 тысяч операций чтения того же объёма уже требуют кеширования или отдельного хранилища.
Пример инфраструктуры для каталога товаров:
- PostgreSQL с агрегатом
Productи outbox-таблицей для событий. - Kafka-relay публикует
ProductPublished,ProductPriceChanged. - ElasticSearch index
productsс денормализованной структурой; consumer обновляет документ при каждом событии. - FastAPI:
POST /products→ command-handler в PostgreSQL;GET /products?q=...→ query-handler в ElasticSearch.
class SearchProductsHandler:
def __init__(self, products: ProductSearchRepository) -> None:
self._products = products
async def handle(self, query: SearchProducts) -> ProductSearchPage:
return await self._products.search(
text=query.text,
filters=query.filters,
page=query.page,
size=query.size,
)
ProductSearchRepository — это Protocol, за которым стоит ElasticSearch-адаптер. FastAPI-контроллер не знает о конкретном хранилище.
Это дорогая инфраструктура: два хранилища, мониторинг обоих, данные могут временно расходиться (eventual consistency), нужны скрипты восстановления при сбоях. Окупается только тогда, когда read-реплика и кеш уже не справляются с измеренной нагрузкой.
Частые ошибки
Полное разделение хранилищ с самого начала. Новый сервис без измеренной нагрузки — не повод сразу строить два хранилища. Начните с маркеров, эволюционируйте по метрикам.
Маркеры есть, но read-handler может писать в базу. Если query-handler получает обычную сессию с правом на commit, маркеры — просто названия классов без реальной защиты. Сессия для query-handler должна быть read-only.
Разделение хранилищ без явной причины. Eventual consistency, синхронизация, recovery-сценарии — всё это реальная сложность. Должна быть конкретная измеренная боль, не абстрактная «масштабируемость».
Коротко
- CQRS — три уровня: маркеры → отдельная read-таблица → раздельные хранилища. Начинайте снизу.
- Маркеры (
Command/Queryкак@dataclass(frozen=True)) — минимальная стоимость, максимальная польза для типов и метрик. - Query-handler работает с read-only сессией — случайная запись просто не сохранится.
- Отдельная read-таблица в той же БД — решение для тяжёлых join-ов без дополнительной инфраструктуры.
- Раздельные хранилища — только при read:write ≥ 10:1, задачах поиска/аналитики или измеренном превышении throughput.
- Полное CQRS «на вырост» для нового сервиса — усложнение без выгоды.
Что почитать дальше
- Command side в Python — write-handler с Command Protocol и Unit of Work.
- Query side в Python — read-handler с Query Protocol и ViewRepository.
- Read-model — где хранить и как обновлять денормализованную проекцию.