← назад к разделу

Представьте сервис заказов. Запись заказа — строгая: проверяем остаток, блокируем позицию, сохраняем транзакционно. Чтение — другое: собираем сводку из шести таблиц, отдаём в JSON. Это две совершенно разные задачи, но зачастую их обслуживает один и тот же класс и одна и та же сессия базы данных.

CQRS (Command Query Responsibility Segregation) говорит: разделите модель записи и модель чтения. В простом виде — два типа handler-ов с разными настройками транзакции. В сложном — физически разные хранилища с синхронизацией через события. Между этими крайностями — спектр, и переходить по нему стоит только тогда, когда есть реальная причина.

Три уровня CQRS

Паттерн не бинарный. Есть три точки на шкале, и начинать нужно с самой простой:

УровеньЧто разделеноКогда имеет смысл
МаркерыCommand / Query — разные типы, разные сессииВсегда, как только появляются handler-ы
Отдельная read-таблицаДенормализованная таблица в той же БД, отдельный репозиторийТяжёлые join-ы, сложные агрегации
Раздельные хранилищаPostgreSQL для записи + Redis или ElasticSearch для чтенияread:write ≥ 10:1, задачи поиска/аналитики

Движение всегда снизу вверх — сначала маркеры, потом отдельная таблица, потом отдельное хранилище. Прыгать сразу на третий уровень «на вырост» — распространённая ошибка.

Уровень первый: маркеры без лишней инфраструктуры

Самый простой вид CQRS — просто обозначить тип операции через отдельный класс. Command меняет состояние, Query только читает.

В Python это @dataclass(frozen=True) — иммутабельный объект, который описывает намерение:

from dataclasses import dataclass
from app.core.order.domain.model import OrderId

@dataclass(frozen=True)
class ConfirmOrder:       # это команда — меняет состояние
    order_id: OrderId
    idempotency_key: str

@dataclass(frozen=True)
class GetOrderSummary:    # это запрос — только читает
    order_id: OrderId

Важный момент: query-handler должен получать сессию базы данных без права на запись. Тогда случайный session.add() просто не сохранится:

class GetOrderSummaryHandler:
    def __init__(self, orders: OrderRepository) -> None:
        self._orders = orders

    async def handle(self, query: GetOrderSummary) -> OrderSummaryView:
        return await self._orders.by_id_readonly(query.order_id)

Что это даёт на практике:

  • Mypy и Pyright видят разницу между Command и Query на уровне типов — ошибка перепутать их.
  • Read-only сессия защищает от случайных изменений в query-handler.
  • Метрики читаются по смыслу: app_command_total{name="ConfirmOrder"} и app_query_total{name="GetOrderSummary"} — разные счётчики.
  • Валидация разная: для команды — строгая (Pydantic с frozen=True), для запроса — лёгкая (проверка диапазонов page/size).

При этом и read, и write ходят в одну и ту же базу данных. Никакой дополнительной инфраструктуры.

Уровень второй: отдельная таблица для чтения

Когда query-handler начинает собирать данные из пяти-шести таблиц при каждом запросе, появляется реальная боль: медленные join-ы, высокая нагрузка на CPU, сложные запросы, которые трудно оптимизировать.

Пример: OrderSummaryView собирается из order, order_item, product, customer, payment, shipment. При загрузке истории заказов — каждый раз все шесть join-ов.

Решение — денормализованная таблица order_summary в той же базе данных. Не новое хранилище, просто отдельная таблица с уже готовой структурой для API:

@dataclass(frozen=True)
class OrderSummaryView:
    order_id: int
    customer_name: str    # денормализовано из customer
    status: str
    item_count: int       # заранее посчитано
    total_amount: Decimal
    created_at: datetime

Репозиторий для чтения объявляется отдельным Protocol:

from typing import Protocol
from app.core.order.domain.model import OrderId, CustomerId

class OrderViewRepository(Protocol):
    async def by_id(self, order_id: OrderId) -> OrderSummaryView: ...
    async def by_customer(
        self,
        customer_id: CustomerId,
        limit: int,
        offset: int,
    ) -> list[OrderSummaryView]: ...

Запрос GET /customers/{id}/orders теперь — один SELECT по индексу, без join-ов. Синхронизация между write-таблицами и read-таблицей идёт через outbox-паттерн внутри того же сервиса.

Типичные сигналы для перехода на этот уровень:

  • Типичный query собирает 5 и более join-ов.
  • Тяжёлые GROUP BY по миллионам строк для дашборда или аналитики.
  • UI хочет customer_name, total_items, last_status_change — а в нормализованной схеме это собирается заново при каждом запросе.

Уровень третий: разные хранилища — только при измеренной боли

Полное разделение хранилищ оправдано только в конкретных ситуациях с реальными цифрами:

Соотношение чтения и записи 10:1 и выше. Типичный интернет-магазин: один заказ создаётся, но карточка показывается десять раз в разных сценариях (история, поиск, аналитика, уведомления). При таком соотношении read-нагрузка диктует архитектуру.

Принципиально разная структура данных. Полнотекстовый поиск по описаниям с фильтрами по двадцати параметрам и ранжированием по релевантности — это не реляционная задача. ElasticSearch или OpenSearch с инвертированным индексом решают её на порядок эффективнее PostgreSQL.

Read-нагрузка превышает возможности write-базы. PostgreSQL выдерживает 5 тысяч операций записи в секунду, но 50 тысяч операций чтения того же объёма уже требуют кеширования или отдельного хранилища.

Пример инфраструктуры для каталога товаров:

  • PostgreSQL с агрегатом Product и outbox-таблицей для событий.
  • Kafka-relay публикует ProductPublished, ProductPriceChanged.
  • ElasticSearch index products с денормализованной структурой; consumer обновляет документ при каждом событии.
  • FastAPI: POST /products → command-handler в PostgreSQL; GET /products?q=... → query-handler в ElasticSearch.
class SearchProductsHandler:
    def __init__(self, products: ProductSearchRepository) -> None:
        self._products = products

    async def handle(self, query: SearchProducts) -> ProductSearchPage:
        return await self._products.search(
            text=query.text,
            filters=query.filters,
            page=query.page,
            size=query.size,
        )

ProductSearchRepository — это Protocol, за которым стоит ElasticSearch-адаптер. FastAPI-контроллер не знает о конкретном хранилище.

Это дорогая инфраструктура: два хранилища, мониторинг обоих, данные могут временно расходиться (eventual consistency), нужны скрипты восстановления при сбоях. Окупается только тогда, когда read-реплика и кеш уже не справляются с измеренной нагрузкой.

Частые ошибки

Полное разделение хранилищ с самого начала. Новый сервис без измеренной нагрузки — не повод сразу строить два хранилища. Начните с маркеров, эволюционируйте по метрикам.

Маркеры есть, но read-handler может писать в базу. Если query-handler получает обычную сессию с правом на commit, маркеры — просто названия классов без реальной защиты. Сессия для query-handler должна быть read-only.

Разделение хранилищ без явной причины. Eventual consistency, синхронизация, recovery-сценарии — всё это реальная сложность. Должна быть конкретная измеренная боль, не абстрактная «масштабируемость».

Коротко

  • CQRS — три уровня: маркеры → отдельная read-таблица → раздельные хранилища. Начинайте снизу.
  • Маркеры (Command / Query как @dataclass(frozen=True)) — минимальная стоимость, максимальная польза для типов и метрик.
  • Query-handler работает с read-only сессией — случайная запись просто не сохранится.
  • Отдельная read-таблица в той же БД — решение для тяжёлых join-ов без дополнительной инфраструктуры.
  • Раздельные хранилища — только при read:write ≥ 10:1, задачах поиска/аналитики или измеренном превышении throughput.
  • Полное CQRS «на вырост» для нового сервиса — усложнение без выгоды.

Что почитать дальше

  • Command side в Python — write-handler с Command Protocol и Unit of Work.
  • Query side в Python — read-handler с Query Protocol и ViewRepository.
  • Read-model — где хранить и как обновлять денормализованную проекцию.