Опирается на правила: R-CQRS-QRY-1R-CQRS-QRY-4 и R-CQRS-QRY-X1R-CQRS-QRY-X3 из раздела 3. Query side.

Важно знать

  • Query — @dataclass(frozen=True), реализует маркер-Protocol Query[R]. Без побочных эффектов.
  • Query-handler читает через <X>ViewRepository, получает read-only AsyncSession без commit. Не использует основной <X>Repository с агрегатом.
  • Read-only сессия создаётся явно: async_session_factory(execution_options={"postgresql_readonly": True}) — PostgreSQL блокирует любой UPDATE/INSERT на уровне сервера.
  • Read-DTO — Pydantic-модель или @dataclass(frozen=True) в core/<bc>/port/, структура продиктована UI/API, не агрегатом. Денормализованный, с pre-computed полями.
  • Query-handler не вызывает доменные методы (order.confirm() и т. п.). Только read.
  • Запрещено: query делает write, грузит агрегат целиком и маппит в DTO, возвращает агрегат или доменные объекты наружу.
  • FastAPI dependency get_read_session изолирует read-only сессию от write — смешать невозможно структурно.

Query-side — это половина CQRS, занятая чтением. Она оптимизирована под чтение: денормализованные read-DTO, отдельный репозиторий, read-only сессия. В Python маркер реализуется через Protocol — структурная типизация, а enforcement гарантируется тем, что query-handler принимает только read-only AsyncSession, которую нельзя закоммитить.

Query — frozen dataclass с маркером Query

R-CQRS-QRY-1: query — @dataclass(frozen=True), реализует Query[R], где R — тип read-DTO.

# core/cqrs.py
from typing import Protocol, TypeVar

R = TypeVar("R", covariant=True)

class Query(Protocol[R]):
    ...

class Command(Protocol):
    ...
# core/order/query/get_order_summary.py
from dataclasses import dataclass
from core.cqrs import Query
from core.order.port.view import OrderSummary

@dataclass(frozen=True)
class GetOrderSummaryQuery:
    order_id: str

    def __class_getitem__(cls, item):
        return Query[OrderSummary]

Удобнее применять упрощённую форму без __class_getitem__, когда тип известен из handler-сигнатуры:

# core/order/query/get_order_summary.py
from dataclasses import dataclass

@dataclass(frozen=True)
class GetOrderSummaryQuery:
    order_id: str

# core/order/query/search_orders.py
from dataclasses import dataclass
from core.order.port.view import OrderStatus

@dataclass(frozen=True)
class SearchOrdersQuery:
    customer_id: str
    status: OrderStatus | None
    page: int
    page_size: int

Что важно:

  • frozen=True — query иммутабельна после создания, случайная мутация параметров невозможна.
  • Имя в форме Get…Query / Search…Query / List…Query — глагол + предметная область + суффикс Query. Соответствует REST-вербу GET.
  • Тип параметра R — read-DTO или коллекция read-DTO. Не агрегат, не ORM-модель.

Структура query-handler

R-CQRS-QRY-2: query-handler получает read-only AsyncSession, читает через <X>ViewRepository, возвращает read-DTO.

# core/order/handler/get_order_summary_handler.py
from core.order.query.get_order_summary import GetOrderSummaryQuery
from core.order.port.view import OrderSummary
from core.order.port.out.order_view_repository import OrderViewRepository
from core.error import OrderNotFoundError

class GetOrderSummaryHandler:
    def __init__(self, order_view_repo: OrderViewRepository) -> None:
        self._repo = order_view_repo

    async def handle(self, query: GetOrderSummaryQuery) -> OrderSummary:
        result = await self._repo.find_summary_by_id(query.order_id)
        if result is None:
            raise OrderNotFoundError(query.order_id)
        return result
# core/order/handler/search_orders_handler.py
from core.order.query.search_orders import SearchOrdersQuery
from core.order.port.view import OrderSummary, Page

class SearchOrdersHandler:
    def __init__(self, order_view_repo: OrderViewRepository) -> None:
        self._repo = order_view_repo

    async def handle(self, query: SearchOrdersQuery) -> Page[OrderSummary]:
        return await self._repo.search(
            customer_id=query.customer_id,
            status=query.status,
            page=query.page,
            page_size=query.page_size,
        )

Read-only сессия создаётся в DI-слое и передаётся в репозиторий через dependency injection:

# infra/db/session.py
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker

async def get_read_session(
    session_factory: async_sessionmaker[AsyncSession],
) -> AsyncGenerator[AsyncSession, None]:
    async with session_factory() as session:
        await session.execute(text("SET TRANSACTION READ ONLY"))
        yield session
        # commit намеренно отсутствует — read-only
# api/order/router.py
from fastapi import APIRouter, Depends
from core.order.query.get_order_summary import GetOrderSummaryQuery
from core.order.port.view import OrderSummary

router = APIRouter(prefix="/orders")

@router.get("/{order_id}/summary", response_model=OrderSummary)
async def get_order_summary(
    order_id: str,
    handler: GetOrderSummaryHandler = Depends(get_order_summary_handler),
) -> OrderSummary:
    return await handler.handle(GetOrderSummaryQuery(order_id=order_id))

Что важно:

  • Read-only сессияSET TRANSACTION READ ONLY или execution_options={"postgresql_readonly": True} при создании сессии. PostgreSQL отклоняет любой UPDATE/INSERT на уровне сервера, не ждёт ошибки приложения.
  • OrderViewRepository — отдельный Protocol. Не основной OrderRepository, который работает с агрегатом и предполагает commit.
  • Возвращает read-DTO, не ORM-объект. Read-DTO — Pydantic-модель или frozen dataclass.

Read-DTO — денормализованная Pydantic-модель

R-CQRS-QRY-3: read-DTO в core/<bc>/port/view/, структура под API, не под агрегат.

# core/order/port/view.py
from __future__ import annotations
from datetime import datetime
from decimal import Decimal
from enum import StrEnum
from pydantic import BaseModel

class OrderStatus(StrEnum):
    PENDING = "PENDING"
    CONFIRMED = "CONFIRMED"
    SHIPPED = "SHIPPED"
    DELIVERED = "DELIVERED"
    CANCELLED = "CANCELLED"

class OrderSummary(BaseModel):
    model_config = {"frozen": True}

    order_id: str
    status: OrderStatus
    customer_name: str        # денормализовано — без JOIN к customer
    total_amount: Decimal
    item_count: int           # pre-computed — не List[OrderItem]
    created_at: datetime
    updated_at: datetime

class OrderListItem(BaseModel):
    model_config = {"frozen": True}

    order_id: str
    status: OrderStatus
    total_amount: Decimal
    created_at: datetime

Что хорошего:

  • customer_name денормализован. При использовании агрегата пришлось бы грузить Customer отдельно или через JOIN. Здесь — одно поле, один запрос.
  • item_count, а не list[OrderItem]. Для UI-списка нужно показать «5 позиций», не сами позиции. int на порядок дешевле выборки коллекции.
  • model_config = {"frozen": True} — Pydantic v2 делает экземпляр неизменяемым. Эквивалент record в Java.
  • StrEnum — сериализуется в строку прозрачно, не содержит поведения, безопасно отдаётся наружу.

Расположение:

core/
└── order/
    ├── domain/
    │   ├── order.py               # агрегат
    │   └── order_item.py          # внутренняя Entity
    ├── port/
    │   ├── out/
    │   │   ├── order_repository.py       # write-side (агрегат)
    │   │   └── order_view_repository.py  # read-side
    │   └── view.py                       # read-DTO
    └── handler/
        ├── confirm_order_handler.py      # command-handler
        └── get_order_summary_handler.py  # query-handler

ViewRepository — отдельный Protocol

R-CQRS-QRY-2 следует из: query идёт через <X>ViewRepository, отдельный Protocol без save/commit.

# core/order/port/out/order_view_repository.py
from typing import Protocol
from core.order.port.view import OrderSummary, OrderListItem, OrderStatus, Page

class OrderViewRepository(Protocol):
    async def find_summary_by_id(self, order_id: str) -> OrderSummary | None: ...
    async def search(
        self,
        customer_id: str,
        status: OrderStatus | None,
        page: int,
        page_size: int,
    ) -> Page[OrderSummary]: ...
    async def find_recent_by_customer(
        self, customer_id: str, limit: int
    ) -> list[OrderListItem]: ...

Реализация в infra/persistence/:

# infra/persistence/order_view_repository_impl.py
from sqlalchemy import select, func, text
from sqlalchemy.ext.asyncio import AsyncSession
from core.order.port.out.order_view_repository import OrderViewRepository
from core.order.port.view import OrderSummary, OrderListItem, OrderStatus, Page
from infra.persistence.models import OrderModel, CustomerModel

class SqlAlchemyOrderViewRepository:
    def __init__(self, session: AsyncSession) -> None:
        self._session = session

    async def find_summary_by_id(self, order_id: str) -> OrderSummary | None:
        stmt = (
            select(
                OrderModel.id,
                OrderModel.status,
                CustomerModel.name.label("customer_name"),
                OrderModel.total_amount,
                OrderModel.item_count,
                OrderModel.created_at,
                OrderModel.updated_at,
            )
            .join(CustomerModel, CustomerModel.id == OrderModel.customer_id)
            .where(OrderModel.id == order_id)
        )
        row = (await self._session.execute(stmt)).one_or_none()
        if row is None:
            return None
        return OrderSummary(
            order_id=str(row.id),
            status=OrderStatus(row.status),
            customer_name=row.customer_name,
            total_amount=row.total_amount,
            item_count=row.item_count,
            created_at=row.created_at,
            updated_at=row.updated_at,
        )

Если уже есть денормализованная read-таблица order_summary (см. Read-model), запрос становится тривиальным:

async def find_summary_by_id(self, order_id: str) -> OrderSummary | None:
    stmt = select(OrderSummaryModel).where(OrderSummaryModel.order_id == order_id)
    row = (await self._session.execute(stmt)).scalar_one_or_none()
    if row is None:
        return None
    return OrderSummary(
        order_id=str(row.order_id),
        status=OrderStatus(row.status),
        customer_name=row.customer_name,
        total_amount=row.total_amount,
        item_count=row.item_count,
        created_at=row.created_at,
        updated_at=row.updated_at,
    )

Query не вызывает доменные методы

R-CQRS-QRY-4: внутри query-handler нет вызова бизнес-методов агрегата.

# ПЛОХО — query вызывает доменный метод
async def handle(self, query: GetOrderSummaryQuery) -> OrderSummary:
    order = await self._order_repo.by_id(query.order_id)  # грузим агрегат
    if order.should_be_archived():          # ← доменный метод в read
        await order.archive()               # ← мутация без commit невидима
    return OrderSummary(
        order_id=str(order.id),
        status=order.status,
        ...
    )

Что не так:

  • Мутация в read-only сессииSET TRANSACTION READ ONLY вызовет ошибку от PostgreSQL при попытке INSERT/UPDATE; если сессия не помечена read-only, мутация просто потеряется без commit.
  • Принцип CQRS нарушен — query с побочным эффектом.
  • Логика «когда архивировать» должна быть в scheduled command-handler (ArchiveStaleOrdersCommand), не приклеена к UI-чтению.

Корректно: query-handler читает через <X>ViewRepository и возвращает read-DTO. Никакого агрегата, никакой бизнес-логики.

Что запрещено

АнтипаттернПравилоЧто взамен
Query-handler делает UPDATE/INSERT/DELETER-CQRS-QRY-X1Перенести в отдельный command-handler
Query грузит агрегат через основной <X>Repository и маппит в DTOR-CQRS-QRY-X2<X>ViewRepository с минимально нужным набором полей
Query возвращает агрегат / ORM-модель наружуR-CQRS-QRY-X3Read-DTO (Pydantic BaseModel или frozen dataclass)
Query вызывает доменный метод (order.confirm())R-CQRS-QRY-4Отдельный scheduled command-handler
Query-handler получает write-AsyncSessioncommit)R-CQRS-QRY-2Отдельная read-only сессия через get_read_session
Read-DTO с полями 1-в-1 из ORM-модели агрегатаR-CQRS-QRY-3Денормализация, pre-computed поля

Куда дальше

  • CQRS → раздел 3. Query side — нормативные формулировки R-CQRS-QRY-*.
  • Command side — пишущая половина: handler через агрегат и UoW.
  • Read-model — где и в каком виде хранить read-данные.
  • Sync через события — как read-таблица заполняется из событий write-side.
  • Уровень и эволюция — уровни зрелости и эволюция CQRS в Python-сервисе.
  • Когда CQRS оправдан — когда lightweight CQRS достаточно, а когда нужен full split.