Опирается на правила:
R-CQRS-QRY-1…R-CQRS-QRY-4иR-CQRS-QRY-X1…R-CQRS-QRY-X3из раздела 3. Query side.
Важно знать
- Query —
@dataclass(frozen=True), реализует маркер-ProtocolQuery[R]. Без побочных эффектов.- Query-handler читает через
<X>ViewRepository, получает read-onlyAsyncSessionбезcommit. Не использует основной<X>Repositoryс агрегатом.- Read-only сессия создаётся явно:
async_session_factory(execution_options={"postgresql_readonly": True})— PostgreSQL блокирует любойUPDATE/INSERTна уровне сервера.- Read-DTO — Pydantic-модель или
@dataclass(frozen=True)вcore/<bc>/port/, структура продиктована UI/API, не агрегатом. Денормализованный, с pre-computed полями.- Query-handler не вызывает доменные методы (
order.confirm()и т. п.). Только read.- Запрещено: query делает write, грузит агрегат целиком и маппит в DTO, возвращает агрегат или доменные объекты наружу.
- FastAPI dependency
get_read_sessionизолирует read-only сессию от write — смешать невозможно структурно.
Query-side — это половина CQRS, занятая чтением. Она оптимизирована под чтение: денормализованные read-DTO, отдельный репозиторий, read-only сессия. В Python маркер реализуется через Protocol — структурная типизация, а enforcement гарантируется тем, что query-handler принимает только read-only AsyncSession, которую нельзя закоммитить.
Query — frozen dataclass с маркером Query
R-CQRS-QRY-1: query — @dataclass(frozen=True), реализует Query[R], где R — тип read-DTO.
# core/cqrs.py
from typing import Protocol, TypeVar
R = TypeVar("R", covariant=True)
class Query(Protocol[R]):
...
class Command(Protocol):
...
# core/order/query/get_order_summary.py
from dataclasses import dataclass
from core.cqrs import Query
from core.order.port.view import OrderSummary
@dataclass(frozen=True)
class GetOrderSummaryQuery:
order_id: str
def __class_getitem__(cls, item):
return Query[OrderSummary]
Удобнее применять упрощённую форму без __class_getitem__, когда тип известен из handler-сигнатуры:
# core/order/query/get_order_summary.py
from dataclasses import dataclass
@dataclass(frozen=True)
class GetOrderSummaryQuery:
order_id: str
# core/order/query/search_orders.py
from dataclasses import dataclass
from core.order.port.view import OrderStatus
@dataclass(frozen=True)
class SearchOrdersQuery:
customer_id: str
status: OrderStatus | None
page: int
page_size: int
Что важно:
frozen=True— query иммутабельна после создания, случайная мутация параметров невозможна.- Имя в форме
Get…Query/Search…Query/List…Query— глагол + предметная область + суффиксQuery. Соответствует REST-вербу GET. - Тип параметра
R— read-DTO или коллекция read-DTO. Не агрегат, не ORM-модель.
Структура query-handler
R-CQRS-QRY-2: query-handler получает read-only AsyncSession, читает через <X>ViewRepository, возвращает read-DTO.
# core/order/handler/get_order_summary_handler.py
from core.order.query.get_order_summary import GetOrderSummaryQuery
from core.order.port.view import OrderSummary
from core.order.port.out.order_view_repository import OrderViewRepository
from core.error import OrderNotFoundError
class GetOrderSummaryHandler:
def __init__(self, order_view_repo: OrderViewRepository) -> None:
self._repo = order_view_repo
async def handle(self, query: GetOrderSummaryQuery) -> OrderSummary:
result = await self._repo.find_summary_by_id(query.order_id)
if result is None:
raise OrderNotFoundError(query.order_id)
return result
# core/order/handler/search_orders_handler.py
from core.order.query.search_orders import SearchOrdersQuery
from core.order.port.view import OrderSummary, Page
class SearchOrdersHandler:
def __init__(self, order_view_repo: OrderViewRepository) -> None:
self._repo = order_view_repo
async def handle(self, query: SearchOrdersQuery) -> Page[OrderSummary]:
return await self._repo.search(
customer_id=query.customer_id,
status=query.status,
page=query.page,
page_size=query.page_size,
)
Read-only сессия создаётся в DI-слое и передаётся в репозиторий через dependency injection:
# infra/db/session.py
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
async def get_read_session(
session_factory: async_sessionmaker[AsyncSession],
) -> AsyncGenerator[AsyncSession, None]:
async with session_factory() as session:
await session.execute(text("SET TRANSACTION READ ONLY"))
yield session
# commit намеренно отсутствует — read-only
# api/order/router.py
from fastapi import APIRouter, Depends
from core.order.query.get_order_summary import GetOrderSummaryQuery
from core.order.port.view import OrderSummary
router = APIRouter(prefix="/orders")
@router.get("/{order_id}/summary", response_model=OrderSummary)
async def get_order_summary(
order_id: str,
handler: GetOrderSummaryHandler = Depends(get_order_summary_handler),
) -> OrderSummary:
return await handler.handle(GetOrderSummaryQuery(order_id=order_id))
Что важно:
- Read-only сессия —
SET TRANSACTION READ ONLYилиexecution_options={"postgresql_readonly": True}при создании сессии. PostgreSQL отклоняет любойUPDATE/INSERTна уровне сервера, не ждёт ошибки приложения. OrderViewRepository— отдельный Protocol. Не основнойOrderRepository, который работает с агрегатом и предполагаетcommit.- Возвращает read-DTO, не ORM-объект. Read-DTO — Pydantic-модель или frozen dataclass.
Read-DTO — денормализованная Pydantic-модель
R-CQRS-QRY-3: read-DTO в core/<bc>/port/view/, структура под API, не под агрегат.
# core/order/port/view.py
from __future__ import annotations
from datetime import datetime
from decimal import Decimal
from enum import StrEnum
from pydantic import BaseModel
class OrderStatus(StrEnum):
PENDING = "PENDING"
CONFIRMED = "CONFIRMED"
SHIPPED = "SHIPPED"
DELIVERED = "DELIVERED"
CANCELLED = "CANCELLED"
class OrderSummary(BaseModel):
model_config = {"frozen": True}
order_id: str
status: OrderStatus
customer_name: str # денормализовано — без JOIN к customer
total_amount: Decimal
item_count: int # pre-computed — не List[OrderItem]
created_at: datetime
updated_at: datetime
class OrderListItem(BaseModel):
model_config = {"frozen": True}
order_id: str
status: OrderStatus
total_amount: Decimal
created_at: datetime
Что хорошего:
customer_nameденормализован. При использовании агрегата пришлось бы грузитьCustomerотдельно или через JOIN. Здесь — одно поле, один запрос.item_count, а неlist[OrderItem]. Для UI-списка нужно показать «5 позиций», не сами позиции.intна порядок дешевле выборки коллекции.model_config = {"frozen": True}— Pydantic v2 делает экземпляр неизменяемым. Эквивалентrecordв Java.StrEnum— сериализуется в строку прозрачно, не содержит поведения, безопасно отдаётся наружу.
Расположение:
core/
└── order/
├── domain/
│ ├── order.py # агрегат
│ └── order_item.py # внутренняя Entity
├── port/
│ ├── out/
│ │ ├── order_repository.py # write-side (агрегат)
│ │ └── order_view_repository.py # read-side
│ └── view.py # read-DTO
└── handler/
├── confirm_order_handler.py # command-handler
└── get_order_summary_handler.py # query-handler
ViewRepository — отдельный Protocol
R-CQRS-QRY-2 следует из: query идёт через <X>ViewRepository, отдельный Protocol без save/commit.
# core/order/port/out/order_view_repository.py
from typing import Protocol
from core.order.port.view import OrderSummary, OrderListItem, OrderStatus, Page
class OrderViewRepository(Protocol):
async def find_summary_by_id(self, order_id: str) -> OrderSummary | None: ...
async def search(
self,
customer_id: str,
status: OrderStatus | None,
page: int,
page_size: int,
) -> Page[OrderSummary]: ...
async def find_recent_by_customer(
self, customer_id: str, limit: int
) -> list[OrderListItem]: ...
Реализация в infra/persistence/:
# infra/persistence/order_view_repository_impl.py
from sqlalchemy import select, func, text
from sqlalchemy.ext.asyncio import AsyncSession
from core.order.port.out.order_view_repository import OrderViewRepository
from core.order.port.view import OrderSummary, OrderListItem, OrderStatus, Page
from infra.persistence.models import OrderModel, CustomerModel
class SqlAlchemyOrderViewRepository:
def __init__(self, session: AsyncSession) -> None:
self._session = session
async def find_summary_by_id(self, order_id: str) -> OrderSummary | None:
stmt = (
select(
OrderModel.id,
OrderModel.status,
CustomerModel.name.label("customer_name"),
OrderModel.total_amount,
OrderModel.item_count,
OrderModel.created_at,
OrderModel.updated_at,
)
.join(CustomerModel, CustomerModel.id == OrderModel.customer_id)
.where(OrderModel.id == order_id)
)
row = (await self._session.execute(stmt)).one_or_none()
if row is None:
return None
return OrderSummary(
order_id=str(row.id),
status=OrderStatus(row.status),
customer_name=row.customer_name,
total_amount=row.total_amount,
item_count=row.item_count,
created_at=row.created_at,
updated_at=row.updated_at,
)
Если уже есть денормализованная read-таблица order_summary (см. Read-model), запрос становится тривиальным:
async def find_summary_by_id(self, order_id: str) -> OrderSummary | None:
stmt = select(OrderSummaryModel).where(OrderSummaryModel.order_id == order_id)
row = (await self._session.execute(stmt)).scalar_one_or_none()
if row is None:
return None
return OrderSummary(
order_id=str(row.order_id),
status=OrderStatus(row.status),
customer_name=row.customer_name,
total_amount=row.total_amount,
item_count=row.item_count,
created_at=row.created_at,
updated_at=row.updated_at,
)
Query не вызывает доменные методы
R-CQRS-QRY-4: внутри query-handler нет вызова бизнес-методов агрегата.
# ПЛОХО — query вызывает доменный метод
async def handle(self, query: GetOrderSummaryQuery) -> OrderSummary:
order = await self._order_repo.by_id(query.order_id) # грузим агрегат
if order.should_be_archived(): # ← доменный метод в read
await order.archive() # ← мутация без commit невидима
return OrderSummary(
order_id=str(order.id),
status=order.status,
...
)
Что не так:
- Мутация в read-only сессии —
SET TRANSACTION READ ONLYвызовет ошибку от PostgreSQL при попыткеINSERT/UPDATE; если сессия не помечена read-only, мутация просто потеряется безcommit. - Принцип CQRS нарушен — query с побочным эффектом.
- Логика «когда архивировать» должна быть в scheduled command-handler (
ArchiveStaleOrdersCommand), не приклеена к UI-чтению.
Корректно: query-handler читает через <X>ViewRepository и возвращает read-DTO. Никакого агрегата, никакой бизнес-логики.
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
Query-handler делает UPDATE/INSERT/DELETE | R-CQRS-QRY-X1 | Перенести в отдельный command-handler |
Query грузит агрегат через основной <X>Repository и маппит в DTO | R-CQRS-QRY-X2 | <X>ViewRepository с минимально нужным набором полей |
| Query возвращает агрегат / ORM-модель наружу | R-CQRS-QRY-X3 | Read-DTO (Pydantic BaseModel или frozen dataclass) |
Query вызывает доменный метод (order.confirm()) | R-CQRS-QRY-4 | Отдельный scheduled command-handler |
Query-handler получает write-AsyncSession (с commit) | R-CQRS-QRY-2 | Отдельная read-only сессия через get_read_session |
| Read-DTO с полями 1-в-1 из ORM-модели агрегата | R-CQRS-QRY-3 | Денормализация, pre-computed поля |
Куда дальше
- CQRS → раздел 3. Query side — нормативные формулировки
R-CQRS-QRY-*. - Command side — пишущая половина: handler через агрегат и UoW.
- Read-model — где и в каком виде хранить read-данные.
- Sync через события — как read-таблица заполняется из событий write-side.
- Уровень и эволюция — уровни зрелости и эволюция CQRS в Python-сервисе.
- Когда CQRS оправдан — когда lightweight CQRS достаточно, а когда нужен full split.