Опирается на правила:
R-CQRS-TIER-1…R-CQRS-TIER-5иR-CQRS-TIER-X1…R-CQRS-TIER-X2из CQRS Style Guide → раздел 6. Уровень и эволюция.
Важно знать
- Уровень 1 (плоский FastAPI): CQRS не применяется. Роутеры вызывают сервисы напрямую, маркеров нет.
- Уровень 2 (Use Case Pattern): lightweight CQRS — маркеры
Command/Query(Protocol) обязательны. Один<X>Repository, read через read-onlyAsyncSessionбез commit.- Уровень 3 split (DDD + Hexagonal): появляется
<X>ViewRepository-протокол с read-DTO; write — через<X>Repositoryс агрегатом иWITH FOR UPDATE.- Уровень 3 event-driven: read-model в отдельной таблице / Redis / ElasticSearch, sync через outbox + Kafka.
- Эволюция всегда снизу вверх: 1 → 2 → 3-split → 3-event-driven. Возврат — редкость и обычно говорит об ошибке планирования.
- Маркеры
Command/Queryна Уровне 1 без enforcement (read-only сессия, отдельный ViewRepository) — карго-культ.- Event-driven read-model с одним
<X>Repositoryдля R+W — несостыковка слоёв. Если read-инфра отдельная, интерфейс тоже отдельный.
CQRS — не «есть или нет», а шкала зрелости. На каждом уровне берётся ровно столько, сколько даёт пользу при текущем размере и нагрузке. Лишняя CQRS-инфраструктура на маленьком FastAPI-сервисе обходится дороже, чем lightweight на старте и эволюция по метрикам.
Уровень 1 — CQRS не применяется
R-CQRS-TIER-1: на Уровне 1 CQRS не используется. Роутеры вызывают сервисный слой напрямую, один AsyncSession на все операции, маркеров нет.
# routers/orders.py
@router.post("/orders", response_model=OrderResponse)
async def create_order(
payload: CreateOrderRequest,
session: AsyncSession = Depends(get_session),
) -> OrderResponse:
return await OrderService(session).create(payload)
@router.get("/orders/{order_id}", response_model=OrderResponse)
async def get_order(
order_id: UUID,
session: AsyncSession = Depends(get_session),
) -> OrderResponse:
return await OrderService(session).get(order_id)
Уровень 1 — внутренние утилиты, CRUD-микросервисы, тонкие прокси. Применять маркеры Command/Query здесь смысла нет: одна модель данных, простая структура, паттерны SQLAlchemy покрывают всё.
Уровень 2 — lightweight CQRS обязателен
R-CQRS-TIER-2: на Уровне 2 (Use Case Pattern) маркеры обязательны на всех use-case-классах. Read и write идут через один и тот же <X>Repository, но с разными режимами сессии.
# core/order/usecase/create_order.py
@dataclass(frozen=True)
class CreateOrder: # Command[OrderId]
customer_id: CustomerId
items: tuple[OrderItemDto, ...]
class CreateOrderHandler:
def __init__(self, orders: OrderRepository, uow: UnitOfWork) -> None:
self._orders = orders
self._uow = uow
async def handle(self, cmd: CreateOrder) -> OrderId:
async with self._uow:
order = Order.place(cmd.customer_id, cmd.items)
await self._orders.save(order)
await self._uow.commit()
return order.id
# core/order/usecase/get_order.py
@dataclass(frozen=True)
class GetOrder: # Query[OrderJson]
order_id: OrderId
class GetOrderHandler:
def __init__(self, orders: OrderRepository) -> None:
self._orders = orders
async def handle(self, query: GetOrder) -> OrderJson:
# read-only AsyncSession — без commit, без FOR UPDATE
order = await self._orders.by_id_readonly(query.order_id)
return OrderMapper.to_json(order)
Enforcement маркеров:
- Read-only
AsyncSessionна query-handler-ах: сессия открывается безautocommit, методby_id_readonlyявно не вызываетcommit. Это enforcement — без него маркерQueryлишь декорация. WITH FOR UPDATEтолько на write-стороне через отдельный методby_id_for_update.- Разная валидация: command —
Annotated[..., Field(...)]на Pydantic-DTO; query —ge=1/le=100на page/size. - Метрики раздельные —
app_command_totalvsapp_query_totalчерез middleware или декоратор.
Read и write используют один OrderRepository — никакого отдельного OrderViewRepository. Это упрощение Уровня 2: разделять интерфейсы рано, выгоды мало.
Уровень 3 split — отдельный ViewRepository
R-CQRS-TIER-3: на Уровне 3 (DDD + Hexagonal) появляется явное разделение на интерфейсы. <X>Repository для записи (агрегат + FOR UPDATE), <X>ViewRepository для чтения (read-DTO).
# core/order/port/out/order_repository.py
class OrderRepository(Protocol):
async def by_id(self, order_id: OrderId) -> Order: ... # FOR UPDATE
async def save(self, order: Order) -> None: ...
# core/order/port/out/order_view_repository.py
class OrderViewRepository(Protocol):
async def summary_by_id(self, order_id: OrderId) -> OrderSummary: ...
async def search(
self,
customer_id: CustomerId,
status: OrderStatus | None,
page: int,
size: int,
) -> Page[OrderSummary]: ...
# core/order/port/view/order_summary.py
@dataclass(frozen=True)
class OrderSummary:
order_id: OrderId
customer_name: str
total_amount: Decimal
status: OrderStatus
created_at: datetime
Что меняется:
- Два протокола в
core/.OrderRepositoryвозвращает агрегат,OrderViewRepositoryвозвращает read-DTO. Никакого пересечения. - Реализации — в
adapter/persistence/. Один SQLAlchemy-класс может реализовывать оба протокола или быть по одному на каждый — это деталь persistence-слоя. - Read-DTO как первоклассные frozen dataclass-ы. Лежат в
core/<bc>/port/view/. Структура подчинена API/UI, не агрегату. - Запрос read-stuff через write-Repository — code smell. Если query-handler хочет получить
OrderSummary, он идёт вOrderViewRepository, не вOrderRepository.by_id.
Read и write по-прежнему используют одно физическое хранилище — PostgreSQL. Разделение пока только на уровне типов и протоколов, не инфраструктуры.
Подробно — Query side.
Уровень 3 event-driven — отдельное хранилище
R-CQRS-TIER-4: эволюция от split-варианта, когда нагрузки или паттерны чтения требуют отдельной инфраструктуры. Read-model переезжает в отдельную таблицу той же БД, в Redis, в ElasticSearch.
write-side: read-side:
PostgreSQL order_summary (PG-таблица)
├── order (агрегат) ├── денормализованная схема
└── outbox └── индексы под query-сторону
↓
outbox-relay (asyncio task)
↓
Kafka (order.events)
↓
read-side consumer (aiokafka)
↓
UPSERT order_summary
# adapter/persistence/order_view_repository_impl.py
class OrderViewRepositoryImpl:
def __init__(self, session: AsyncSession) -> None:
self._session = session
async def summary_by_id(self, order_id: OrderId) -> OrderSummary:
# читает из order_summary — денормализованная read-таблица
row = await self._session.execute(
select(OrderSummaryRow).where(OrderSummaryRow.order_id == order_id)
)
return _to_summary(row.scalar_one())
# adapter/messaging/order_summary_consumer.py
async def handle_order_confirmed(event: OrderConfirmed, session: AsyncSession) -> None:
await session.execute(
insert(OrderSummaryRow)
.values(order_id=event.order_id, status="CONFIRMED", ...)
.on_conflict_do_update(
index_elements=["order_id"],
set_={"status": "CONFIRMED", "version": OrderSummaryRow.version + 1},
)
)
await session.commit()
Что добавляется:
- Outbox-таблица в write-БД (см. PG Runtime → outbox).
- Outbox-relay — asyncio-задача с
SKIP LOCKEDциклом. - Kafka-топик для событий домена.
- Read-side consumer в том же сервисе или в отдельном.
- Idempotency-защита на consumer:
on_conflict_do_updateсversion-проверкой или таблицаprocessed_event. - Bootstrap-rebuilder для нового read-store и disaster recovery.
Стоимость:
- Eventual consistency (100ms–1s в норме, больше при деградации).
- Дополнительные runtime-компоненты для мониторинга.
- Новые failure modes: lag consumer, stuck outbox, рассинхронизация.
Окупается, когда write-сторона страдает от read-нагрузки или когда read-проекция фундаментально другая (full-text search, аналитические сводки). До этого порога — read-replica + кеш могут решить дешевле.
Эволюция всегда снизу вверх
R-CQRS-TIER-5: движение по уровням — строго 1 → 2 → 3-split → 3-event-driven. Возврат назад — редкое и обычно показывает, что начали слишком высоко.
Типичный путь сервиса:
- Уровень 1 — стартовали как утилитный FastAPI-микросервис без явной бизнес-домены.
- Появился реальный бизнес-домен, перешли на Уровень 2 — ввели UseCase + Handler, маркеры
Command/Query. - Read-сторона стала отличаться от write (UI хочет проекций, аналитики) — перешли к Уровню 3 split с
OrderViewRepository. - p95 latency query пробил SLA или появился full-text search — перешли к Уровню 3 event-driven с отдельной read-таблицей или хранилищем.
Каждый переход — бизнес-обоснован: метриками, новыми фичами, фактом боли. Не «перешли на Уровень 3 потому что это принято».
Возврат назад случается:
- Слили два сервиса обратно в один, и event-driven read-model в нём перестала иметь смысл — собрали обратно в один PG.
- Поменялся продукт, не нужны больше сложные read-проекции — упростили до 3-split.
Это редкость. Чаще возврат — это «переписать заново», что обычно решается рефакторингом без потери функциональности.
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
Command/Query-маркеры на Уровне 1 без read-only сессии и отдельных путей | R-CQRS-TIER-X1 | Либо полный переход на Уровень 2, либо убрать маркеры |
Event-driven read-model с одним <X>Repository для R+W | R-CQRS-TIER-X2 | Отдельный <X>ViewRepository-протокол |
| Прыжок Уровень 1 → Уровень 3 event-driven без промежуточных шагов | R-CQRS-TIER-5 | Эволюция по метрикам, не по моде |
Отсутствие read-only AsyncSession на query-handler-е | R-CQRS-TIER-2 | Read-only сессия (без commit) обязательна |
Read-методы в основном <X>Repository на Уровне 3 | R-CQRS-QRY-X2 | Отдельный <X>ViewRepository |
Куда дальше
- Command side — write-handler, UnitOfWork,
frozen=Truecommand. - Query side — read-handler с
<X>ViewRepository, read-only сессия. - Read-model — где хранить отдельную проекцию на Уровне 3 event-driven.
- Sync через события — outbox + aiokafka для event-driven.
- Когда CQRS оправдан — пороги перехода между уровнями.