Опирается на правила: R-CQRS-TIER-1R-CQRS-TIER-5 и R-CQRS-TIER-X1R-CQRS-TIER-X2 из CQRS Style Guide → раздел 6. Уровень и эволюция.

Важно знать

  • Уровень 1 (плоский FastAPI): CQRS не применяется. Роутеры вызывают сервисы напрямую, маркеров нет.
  • Уровень 2 (Use Case Pattern): lightweight CQRS — маркеры Command/Query (Protocol) обязательны. Один <X>Repository, read через read-only AsyncSession без commit.
  • Уровень 3 split (DDD + Hexagonal): появляется <X>ViewRepository-протокол с read-DTO; write — через <X>Repository с агрегатом и WITH FOR UPDATE.
  • Уровень 3 event-driven: read-model в отдельной таблице / Redis / ElasticSearch, sync через outbox + Kafka.
  • Эволюция всегда снизу вверх: 1 → 2 → 3-split → 3-event-driven. Возврат — редкость и обычно говорит об ошибке планирования.
  • Маркеры Command/Query на Уровне 1 без enforcement (read-only сессия, отдельный ViewRepository) — карго-культ.
  • Event-driven read-model с одним <X>Repository для R+W — несостыковка слоёв. Если read-инфра отдельная, интерфейс тоже отдельный.

CQRS — не «есть или нет», а шкала зрелости. На каждом уровне берётся ровно столько, сколько даёт пользу при текущем размере и нагрузке. Лишняя CQRS-инфраструктура на маленьком FastAPI-сервисе обходится дороже, чем lightweight на старте и эволюция по метрикам.

Уровень 1 — CQRS не применяется

R-CQRS-TIER-1: на Уровне 1 CQRS не используется. Роутеры вызывают сервисный слой напрямую, один AsyncSession на все операции, маркеров нет.

# routers/orders.py
@router.post("/orders", response_model=OrderResponse)
async def create_order(
    payload: CreateOrderRequest,
    session: AsyncSession = Depends(get_session),
) -> OrderResponse:
    return await OrderService(session).create(payload)

@router.get("/orders/{order_id}", response_model=OrderResponse)
async def get_order(
    order_id: UUID,
    session: AsyncSession = Depends(get_session),
) -> OrderResponse:
    return await OrderService(session).get(order_id)

Уровень 1 — внутренние утилиты, CRUD-микросервисы, тонкие прокси. Применять маркеры Command/Query здесь смысла нет: одна модель данных, простая структура, паттерны SQLAlchemy покрывают всё.

Уровень 2 — lightweight CQRS обязателен

R-CQRS-TIER-2: на Уровне 2 (Use Case Pattern) маркеры обязательны на всех use-case-классах. Read и write идут через один и тот же <X>Repository, но с разными режимами сессии.

# core/order/usecase/create_order.py
@dataclass(frozen=True)
class CreateOrder:                           # Command[OrderId]
    customer_id: CustomerId
    items: tuple[OrderItemDto, ...]

class CreateOrderHandler:
    def __init__(self, orders: OrderRepository, uow: UnitOfWork) -> None:
        self._orders = orders
        self._uow = uow

    async def handle(self, cmd: CreateOrder) -> OrderId:
        async with self._uow:
            order = Order.place(cmd.customer_id, cmd.items)
            await self._orders.save(order)
            await self._uow.commit()
        return order.id
# core/order/usecase/get_order.py
@dataclass(frozen=True)
class GetOrder:                              # Query[OrderJson]
    order_id: OrderId

class GetOrderHandler:
    def __init__(self, orders: OrderRepository) -> None:
        self._orders = orders

    async def handle(self, query: GetOrder) -> OrderJson:
        # read-only AsyncSession — без commit, без FOR UPDATE
        order = await self._orders.by_id_readonly(query.order_id)
        return OrderMapper.to_json(order)

Enforcement маркеров:

  • Read-only AsyncSession на query-handler-ах: сессия открывается без autocommit, метод by_id_readonly явно не вызывает commit. Это enforcement — без него маркер Query лишь декорация.
  • WITH FOR UPDATE только на write-стороне через отдельный метод by_id_for_update.
  • Разная валидация: command — Annotated[..., Field(...)] на Pydantic-DTO; query — ge=1/le=100 на page/size.
  • Метрики раздельныеapp_command_total vs app_query_total через middleware или декоратор.

Read и write используют один OrderRepository — никакого отдельного OrderViewRepository. Это упрощение Уровня 2: разделять интерфейсы рано, выгоды мало.

Уровень 3 split — отдельный ViewRepository

R-CQRS-TIER-3: на Уровне 3 (DDD + Hexagonal) появляется явное разделение на интерфейсы. <X>Repository для записи (агрегат + FOR UPDATE), <X>ViewRepository для чтения (read-DTO).

# core/order/port/out/order_repository.py
class OrderRepository(Protocol):
    async def by_id(self, order_id: OrderId) -> Order: ...           # FOR UPDATE
    async def save(self, order: Order) -> None: ...

# core/order/port/out/order_view_repository.py
class OrderViewRepository(Protocol):
    async def summary_by_id(self, order_id: OrderId) -> OrderSummary: ...
    async def search(
        self,
        customer_id: CustomerId,
        status: OrderStatus | None,
        page: int,
        size: int,
    ) -> Page[OrderSummary]: ...
# core/order/port/view/order_summary.py
@dataclass(frozen=True)
class OrderSummary:
    order_id: OrderId
    customer_name: str
    total_amount: Decimal
    status: OrderStatus
    created_at: datetime

Что меняется:

  • Два протокола в core/. OrderRepository возвращает агрегат, OrderViewRepository возвращает read-DTO. Никакого пересечения.
  • Реализации — в adapter/persistence/. Один SQLAlchemy-класс может реализовывать оба протокола или быть по одному на каждый — это деталь persistence-слоя.
  • Read-DTO как первоклассные frozen dataclass-ы. Лежат в core/<bc>/port/view/. Структура подчинена API/UI, не агрегату.
  • Запрос read-stuff через write-Repository — code smell. Если query-handler хочет получить OrderSummary, он идёт в OrderViewRepository, не в OrderRepository.by_id.

Read и write по-прежнему используют одно физическое хранилище — PostgreSQL. Разделение пока только на уровне типов и протоколов, не инфраструктуры.

Подробно — Query side.

Уровень 3 event-driven — отдельное хранилище

R-CQRS-TIER-4: эволюция от split-варианта, когда нагрузки или паттерны чтения требуют отдельной инфраструктуры. Read-model переезжает в отдельную таблицу той же БД, в Redis, в ElasticSearch.

write-side:                       read-side:
  PostgreSQL                        order_summary (PG-таблица)
  ├── order (агрегат)               ├── денормализованная схема
  └── outbox                        └── индексы под query-сторону
       ↓
  outbox-relay (asyncio task)
       ↓
  Kafka (order.events)
       ↓
  read-side consumer (aiokafka)
       ↓
  UPSERT order_summary
# adapter/persistence/order_view_repository_impl.py
class OrderViewRepositoryImpl:
    def __init__(self, session: AsyncSession) -> None:
        self._session = session

    async def summary_by_id(self, order_id: OrderId) -> OrderSummary:
        # читает из order_summary — денормализованная read-таблица
        row = await self._session.execute(
            select(OrderSummaryRow).where(OrderSummaryRow.order_id == order_id)
        )
        return _to_summary(row.scalar_one())
# adapter/messaging/order_summary_consumer.py
async def handle_order_confirmed(event: OrderConfirmed, session: AsyncSession) -> None:
    await session.execute(
        insert(OrderSummaryRow)
        .values(order_id=event.order_id, status="CONFIRMED", ...)
        .on_conflict_do_update(
            index_elements=["order_id"],
            set_={"status": "CONFIRMED", "version": OrderSummaryRow.version + 1},
        )
    )
    await session.commit()

Что добавляется:

  • Outbox-таблица в write-БД (см. PG Runtime → outbox).
  • Outbox-relay — asyncio-задача с SKIP LOCKED циклом.
  • Kafka-топик для событий домена.
  • Read-side consumer в том же сервисе или в отдельном.
  • Idempotency-защита на consumer: on_conflict_do_update с version-проверкой или таблица processed_event.
  • Bootstrap-rebuilder для нового read-store и disaster recovery.

Стоимость:

  • Eventual consistency (100ms–1s в норме, больше при деградации).
  • Дополнительные runtime-компоненты для мониторинга.
  • Новые failure modes: lag consumer, stuck outbox, рассинхронизация.

Окупается, когда write-сторона страдает от read-нагрузки или когда read-проекция фундаментально другая (full-text search, аналитические сводки). До этого порога — read-replica + кеш могут решить дешевле.

Эволюция всегда снизу вверх

R-CQRS-TIER-5: движение по уровням — строго 1 → 2 → 3-split → 3-event-driven. Возврат назад — редкое и обычно показывает, что начали слишком высоко.

Типичный путь сервиса:

  1. Уровень 1 — стартовали как утилитный FastAPI-микросервис без явной бизнес-домены.
  2. Появился реальный бизнес-домен, перешли на Уровень 2 — ввели UseCase + Handler, маркеры Command/Query.
  3. Read-сторона стала отличаться от write (UI хочет проекций, аналитики) — перешли к Уровню 3 split с OrderViewRepository.
  4. p95 latency query пробил SLA или появился full-text search — перешли к Уровню 3 event-driven с отдельной read-таблицей или хранилищем.

Каждый переход — бизнес-обоснован: метриками, новыми фичами, фактом боли. Не «перешли на Уровень 3 потому что это принято».

Возврат назад случается:

  • Слили два сервиса обратно в один, и event-driven read-model в нём перестала иметь смысл — собрали обратно в один PG.
  • Поменялся продукт, не нужны больше сложные read-проекции — упростили до 3-split.

Это редкость. Чаще возврат — это «переписать заново», что обычно решается рефакторингом без потери функциональности.

Что запрещено

АнтипаттернПравилоЧто взамен
Command/Query-маркеры на Уровне 1 без read-only сессии и отдельных путейR-CQRS-TIER-X1Либо полный переход на Уровень 2, либо убрать маркеры
Event-driven read-model с одним <X>Repository для R+WR-CQRS-TIER-X2Отдельный <X>ViewRepository-протокол
Прыжок Уровень 1 → Уровень 3 event-driven без промежуточных шаговR-CQRS-TIER-5Эволюция по метрикам, не по моде
Отсутствие read-only AsyncSession на query-handler-еR-CQRS-TIER-2Read-only сессия (без commit) обязательна
Read-методы в основном <X>Repository на Уровне 3R-CQRS-QRY-X2Отдельный <X>ViewRepository

Куда дальше

  • Command side — write-handler, UnitOfWork, frozen=True command.
  • Query side — read-handler с <X>ViewRepository, read-only сессия.
  • Read-model — где хранить отдельную проекцию на Уровне 3 event-driven.
  • Sync через события — outbox + aiokafka для event-driven.
  • Когда CQRS оправдан — пороги перехода между уровнями.