← назад к разделу

В системах с CQRS есть две стороны: команды меняют состояние, запросы (queries) только читают. Кажется просто, но на практике разработчики часто смешивают их — и получают лишние нагрузки, ошибки в тестах и запутанную логику. Разберём, как организовать читающую сторону в Python.

Почему нельзя читать через тот же репозиторий, что и пишущая сторона

Типичная ошибка: есть OrderRepository, который умеет сохранять и загружать агрегат Order. Разработчик использует его же для API-запросов — грузит агрегат, маппит в DTO и отдаёт.

Проблема здесь не в том, что «нарушен паттерн». Проблема практическая:

  • Агрегат грузит всё, что нужно бизнес-логике: связанные сущности, коллекции, вложенные объекты. Для отображения в UI это избыточно.
  • Запрос может случайно вызвать доменный метод и изменить состояние — в read-only сессии это вызовет ошибку, без неё — молча потеряется.
  • Трудно оптимизировать: индексы под запись и под чтение часто разные.

Читающая сторона CQRS решает это разделением: отдельный объект запроса, отдельный репозиторий, отдельная сессия и отдельная модель данных для API.

Query — это неизменяемый объект с параметрами

Запрос к системе оформляется как frozen dataclass — объект с параметрами, который нельзя изменить после создания.

# core/cqrs.py
from typing import Protocol, TypeVar

R_co = TypeVar("R_co", covariant=True)

class Query(Protocol[R_co]):
    ...
# core/order/query/get_order_summary.py
from dataclasses import dataclass

@dataclass(frozen=True)
class GetOrderSummaryQuery:
    order_id: str
# core/order/query/search_orders.py
from dataclasses import dataclass
from core.order.port.view import OrderStatus

@dataclass(frozen=True)
class SearchOrdersQuery:
    customer_id: str
    status: OrderStatus | None
    page: int
    page_size: int

frozen=True означает, что после создания объект нельзя изменить — это важно, потому что запрос описывает намерение прочитать данные, а не изменить их. Имена пишутся в форме Get…Query, Search…Query, List…Query.

Query-handler — читает, ничего не меняет

Handler принимает Query, обращается к ViewRepository и возвращает read-DTO. Никаких доменных методов, никаких commit.

# core/order/handler/get_order_summary_handler.py
from core.order.query.get_order_summary import GetOrderSummaryQuery
from core.order.port.view import OrderSummary
from core.order.port.out.order_view_repository import OrderViewRepository
from core.error import OrderNotFoundError

class GetOrderSummaryHandler:
    def __init__(self, order_view_repo: OrderViewRepository) -> None:
        self._repo = order_view_repo

    async def handle(self, query: GetOrderSummaryQuery) -> OrderSummary:
        result = await self._repo.find_by_id(query.order_id)
        if result is None:
            raise OrderNotFoundError(query.order_id)
        return result
# core/order/handler/search_orders_handler.py
from core.order.query.search_orders import SearchOrdersQuery
from core.order.port.view import OrderSummary, Page

class SearchOrdersHandler:
    def __init__(self, order_view_repo: OrderViewRepository) -> None:
        self._repo = order_view_repo

    async def handle(self, query: SearchOrdersQuery) -> Page[OrderSummary]:
        return await self._repo.search(
            customer_id=query.customer_id,
            status=query.status,
            page=query.page,
            page_size=query.page_size,
        )

В FastAPI handler подключается как зависимость:

# api/order/router.py
from fastapi import APIRouter, Depends
from core.order.query.get_order_summary import GetOrderSummaryQuery
from core.order.port.view import OrderSummary

router = APIRouter(prefix="/orders")

@router.get("/{order_id}/summary", response_model=OrderSummary)
async def get_order_summary(
    order_id: str,
    handler: GetOrderSummaryHandler = Depends(get_order_summary_handler),
) -> OrderSummary:
    return await handler.handle(GetOrderSummaryQuery(order_id=order_id))

Read-only сессия — защита на уровне базы данных

Читающая сторона получает отдельную сессию, которую нельзя закоммитить. Это делается явно через SET TRANSACTION READ ONLY — PostgreSQL на уровне сервера отклонит любой UPDATE или INSERT, не дожидаясь ошибки приложения.

# infra/db/session.py
from typing import AsyncGenerator
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker

async def get_read_session(
    session_factory: async_sessionmaker[AsyncSession],
) -> AsyncGenerator[AsyncSession, None]:
    async with session_factory() as session:
        await session.execute(text("SET TRANSACTION READ ONLY"))
        yield session
        # commit намеренно отсутствует

Эта зависимость (get_read_session) передаётся в ViewRepository через DI. Физически смешать read и write сессию становится невозможно — они разные объекты с разными функциями.

ViewRepository — отдельный Protocol только для чтения

OrderViewRepository — это Protocol без методов save, commit или delete. Только методы чтения.

# core/order/port/out/order_view_repository.py
from typing import Protocol
from core.order.port.view import OrderSummary, OrderListItem, OrderStatus, Page

class OrderViewRepository(Protocol):
    async def find_by_id(self, order_id: str) -> OrderSummary | None: ...
    async def search(
        self,
        customer_id: str,
        status: OrderStatus | None,
        page: int,
        page_size: int,
    ) -> Page[OrderSummary]: ...
    async def find_recent_by_customer(
        self, customer_id: str, limit: int
    ) -> list[OrderListItem]: ...

Реализация в infra/persistence/ строит SQL-запросы напрямую:

# infra/persistence/order_view_repository_impl.py
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from core.order.port.view import OrderSummary, OrderStatus

class SqlAlchemyOrderViewRepository:
    def __init__(self, session: AsyncSession) -> None:
        self._session = session

    async def find_by_id(self, order_id: str) -> OrderSummary | None:
        from infra.persistence.models import OrderModel, CustomerModel
        stmt = (
            select(
                OrderModel.id,
                OrderModel.status,
                CustomerModel.name.label("customer_name"),
                OrderModel.total_amount,
                OrderModel.item_count,
                OrderModel.created_at,
                OrderModel.updated_at,
            )
            .join(CustomerModel, CustomerModel.id == OrderModel.customer_id)
            .where(OrderModel.id == order_id)
        )
        row = (await self._session.execute(stmt)).one_or_none()
        if row is None:
            return None
        return OrderSummary(
            order_id=str(row.id),
            status=OrderStatus(row.status),
            customer_name=row.customer_name,
            total_amount=row.total_amount,
            item_count=row.item_count,
            created_at=row.created_at,
            updated_at=row.updated_at,
        )

Если есть денормализованная таблица order_summary (заполняется из событий), запрос становится ещё проще — без JOIN.

Read-DTO — модель данных под API, не под агрегат

Read-DTO — это Pydantic-модель, структура которой диктуется тем, что нужно UI или API, а не тем, как устроен агрегат.

# core/order/port/view.py
from __future__ import annotations
from datetime import datetime
from decimal import Decimal
from enum import StrEnum
from pydantic import BaseModel

class OrderStatus(StrEnum):
    PENDING = "PENDING"
    CONFIRMED = "CONFIRMED"
    SHIPPED = "SHIPPED"
    DELIVERED = "DELIVERED"
    CANCELLED = "CANCELLED"

class OrderSummary(BaseModel):
    model_config = {"frozen": True}

    order_id: str
    status: OrderStatus
    customer_name: str        # денормализовано — без отдельного JOIN к customer
    total_amount: Decimal
    item_count: int           # число позиций, а не список объектов
    created_at: datetime
    updated_at: datetime

class OrderListItem(BaseModel):
    model_config = {"frozen": True}

    order_id: str
    status: OrderStatus
    total_amount: Decimal
    created_at: datetime

Несколько важных решений здесь:

  • customer_name денормализован — имя клиента хранится прямо в строке заказа, не нужен отдельный запрос к таблице клиентов.
  • item_count вместо list[OrderItem] — для отображения «5 позиций» в списке достаточно числа; грузить все позиции заказа дороже.
  • model_config = {"frozen": True} — Pydantic v2, экземпляр нельзя изменить после создания.
  • StrEnum — значения перечисления сериализуются как строки, что удобно для JSON API.

Частая ошибка: вызов доменных методов в query-handler

# Неправильно — query вызывает доменный метод
async def handle(self, query: GetOrderSummaryQuery) -> OrderSummary:
    order = await self._order_repo.by_id(query.order_id)  # грузим агрегат
    if order.should_be_archived():          # доменный метод в read
        await order.archive()               # мутация в read-only сессии
    return OrderSummary(order_id=str(order.id), ...)

Что здесь не так:

  • SET TRANSACTION READ ONLY вызовет ошибку от PostgreSQL при попытке INSERT/UPDATE. Без read-only сессии мутация молча потеряется — нет commit.
  • Читающая сторона не должна менять состояние. Если нужно архивировать просроченные заказы — это отдельная команда (ArchiveStaleOrdersCommand) с собственным handler.

Правильно: query-handler обращается только к ViewRepository и возвращает read-DTO. Никакого агрегата, никаких доменных методов.

Как выглядит структура файлов

core/
└── order/
    ├── domain/
    │   ├── order.py               # агрегат
    │   └── order_item.py
    ├── port/
    │   ├── out/
    │   │   ├── order_repository.py       # write-side (агрегат)
    │   │   └── order_view_repository.py  # read-side
    │   └── view.py                       # read-DTO
    └── handler/
        ├── confirm_order_handler.py      # command-handler
        └── get_order_summary_handler.py  # query-handler

Коротко

  • Query — @dataclass(frozen=True) с параметрами чтения. Не меняет состояние.
  • Query-handler обращается только к ViewRepository, возвращает read-DTO. Никакого агрегата.
  • Read-only сессия (SET TRANSACTION READ ONLY) — PostgreSQL блокирует UPDATE/INSERT на уровне сервера.
  • ViewRepository — отдельный Protocol без save/commit. Только методы чтения.
  • Read-DTO (Pydantic BaseModel с frozen=True) — структура под API: денормализованные поля, числа вместо коллекций.
  • Нельзя грузить агрегат в query-handler и маппить его в DTO — это нивелирует выгоду от разделения.
  • Нельзя вызывать доменные методы в query-handler — read-only, побочных эффектов быть не должно.

Что почитать дальше

  • Command side — пишущая половина: handler через агрегат и Unit of Work.
  • Read-model — где и в каком виде хранить read-данные.
  • Sync через события — как read-таблица заполняется из событий write-side.
  • Когда CQRS оправдан — когда простое разделение достаточно, когда нужен полный split.