В системах с CQRS есть две стороны: команды меняют состояние, запросы (queries) только читают. Кажется просто, но на практике разработчики часто смешивают их — и получают лишние нагрузки, ошибки в тестах и запутанную логику. Разберём, как организовать читающую сторону в Python.
Почему нельзя читать через тот же репозиторий, что и пишущая сторона
Типичная ошибка: есть OrderRepository, который умеет сохранять и загружать агрегат Order. Разработчик использует его же для API-запросов — грузит агрегат, маппит в DTO и отдаёт.
Проблема здесь не в том, что «нарушен паттерн». Проблема практическая:
- Агрегат грузит всё, что нужно бизнес-логике: связанные сущности, коллекции, вложенные объекты. Для отображения в UI это избыточно.
- Запрос может случайно вызвать доменный метод и изменить состояние — в read-only сессии это вызовет ошибку, без неё — молча потеряется.
- Трудно оптимизировать: индексы под запись и под чтение часто разные.
Читающая сторона CQRS решает это разделением: отдельный объект запроса, отдельный репозиторий, отдельная сессия и отдельная модель данных для API.
Query — это неизменяемый объект с параметрами
Запрос к системе оформляется как frozen dataclass — объект с параметрами, который нельзя изменить после создания.
# core/cqrs.py
from typing import Protocol, TypeVar
R_co = TypeVar("R_co", covariant=True)
class Query(Protocol[R_co]):
...
# core/order/query/get_order_summary.py
from dataclasses import dataclass
@dataclass(frozen=True)
class GetOrderSummaryQuery:
order_id: str
# core/order/query/search_orders.py
from dataclasses import dataclass
from core.order.port.view import OrderStatus
@dataclass(frozen=True)
class SearchOrdersQuery:
customer_id: str
status: OrderStatus | None
page: int
page_size: int
frozen=True означает, что после создания объект нельзя изменить — это важно, потому что запрос описывает намерение прочитать данные, а не изменить их. Имена пишутся в форме Get…Query, Search…Query, List…Query.
Query-handler — читает, ничего не меняет
Handler принимает Query, обращается к ViewRepository и возвращает read-DTO. Никаких доменных методов, никаких commit.
# core/order/handler/get_order_summary_handler.py
from core.order.query.get_order_summary import GetOrderSummaryQuery
from core.order.port.view import OrderSummary
from core.order.port.out.order_view_repository import OrderViewRepository
from core.error import OrderNotFoundError
class GetOrderSummaryHandler:
def __init__(self, order_view_repo: OrderViewRepository) -> None:
self._repo = order_view_repo
async def handle(self, query: GetOrderSummaryQuery) -> OrderSummary:
result = await self._repo.find_by_id(query.order_id)
if result is None:
raise OrderNotFoundError(query.order_id)
return result
# core/order/handler/search_orders_handler.py
from core.order.query.search_orders import SearchOrdersQuery
from core.order.port.view import OrderSummary, Page
class SearchOrdersHandler:
def __init__(self, order_view_repo: OrderViewRepository) -> None:
self._repo = order_view_repo
async def handle(self, query: SearchOrdersQuery) -> Page[OrderSummary]:
return await self._repo.search(
customer_id=query.customer_id,
status=query.status,
page=query.page,
page_size=query.page_size,
)
В FastAPI handler подключается как зависимость:
# api/order/router.py
from fastapi import APIRouter, Depends
from core.order.query.get_order_summary import GetOrderSummaryQuery
from core.order.port.view import OrderSummary
router = APIRouter(prefix="/orders")
@router.get("/{order_id}/summary", response_model=OrderSummary)
async def get_order_summary(
order_id: str,
handler: GetOrderSummaryHandler = Depends(get_order_summary_handler),
) -> OrderSummary:
return await handler.handle(GetOrderSummaryQuery(order_id=order_id))
Read-only сессия — защита на уровне базы данных
Читающая сторона получает отдельную сессию, которую нельзя закоммитить. Это делается явно через SET TRANSACTION READ ONLY — PostgreSQL на уровне сервера отклонит любой UPDATE или INSERT, не дожидаясь ошибки приложения.
# infra/db/session.py
from typing import AsyncGenerator
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
async def get_read_session(
session_factory: async_sessionmaker[AsyncSession],
) -> AsyncGenerator[AsyncSession, None]:
async with session_factory() as session:
await session.execute(text("SET TRANSACTION READ ONLY"))
yield session
# commit намеренно отсутствует
Эта зависимость (get_read_session) передаётся в ViewRepository через DI. Физически смешать read и write сессию становится невозможно — они разные объекты с разными функциями.
ViewRepository — отдельный Protocol только для чтения
OrderViewRepository — это Protocol без методов save, commit или delete. Только методы чтения.
# core/order/port/out/order_view_repository.py
from typing import Protocol
from core.order.port.view import OrderSummary, OrderListItem, OrderStatus, Page
class OrderViewRepository(Protocol):
async def find_by_id(self, order_id: str) -> OrderSummary | None: ...
async def search(
self,
customer_id: str,
status: OrderStatus | None,
page: int,
page_size: int,
) -> Page[OrderSummary]: ...
async def find_recent_by_customer(
self, customer_id: str, limit: int
) -> list[OrderListItem]: ...
Реализация в infra/persistence/ строит SQL-запросы напрямую:
# infra/persistence/order_view_repository_impl.py
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from core.order.port.view import OrderSummary, OrderStatus
class SqlAlchemyOrderViewRepository:
def __init__(self, session: AsyncSession) -> None:
self._session = session
async def find_by_id(self, order_id: str) -> OrderSummary | None:
from infra.persistence.models import OrderModel, CustomerModel
stmt = (
select(
OrderModel.id,
OrderModel.status,
CustomerModel.name.label("customer_name"),
OrderModel.total_amount,
OrderModel.item_count,
OrderModel.created_at,
OrderModel.updated_at,
)
.join(CustomerModel, CustomerModel.id == OrderModel.customer_id)
.where(OrderModel.id == order_id)
)
row = (await self._session.execute(stmt)).one_or_none()
if row is None:
return None
return OrderSummary(
order_id=str(row.id),
status=OrderStatus(row.status),
customer_name=row.customer_name,
total_amount=row.total_amount,
item_count=row.item_count,
created_at=row.created_at,
updated_at=row.updated_at,
)
Если есть денормализованная таблица order_summary (заполняется из событий), запрос становится ещё проще — без JOIN.
Read-DTO — модель данных под API, не под агрегат
Read-DTO — это Pydantic-модель, структура которой диктуется тем, что нужно UI или API, а не тем, как устроен агрегат.
# core/order/port/view.py
from __future__ import annotations
from datetime import datetime
from decimal import Decimal
from enum import StrEnum
from pydantic import BaseModel
class OrderStatus(StrEnum):
PENDING = "PENDING"
CONFIRMED = "CONFIRMED"
SHIPPED = "SHIPPED"
DELIVERED = "DELIVERED"
CANCELLED = "CANCELLED"
class OrderSummary(BaseModel):
model_config = {"frozen": True}
order_id: str
status: OrderStatus
customer_name: str # денормализовано — без отдельного JOIN к customer
total_amount: Decimal
item_count: int # число позиций, а не список объектов
created_at: datetime
updated_at: datetime
class OrderListItem(BaseModel):
model_config = {"frozen": True}
order_id: str
status: OrderStatus
total_amount: Decimal
created_at: datetime
Несколько важных решений здесь:
customer_nameденормализован — имя клиента хранится прямо в строке заказа, не нужен отдельный запрос к таблице клиентов.item_countвместоlist[OrderItem]— для отображения «5 позиций» в списке достаточно числа; грузить все позиции заказа дороже.model_config = {"frozen": True}— Pydantic v2, экземпляр нельзя изменить после создания.StrEnum— значения перечисления сериализуются как строки, что удобно для JSON API.
Частая ошибка: вызов доменных методов в query-handler
# Неправильно — query вызывает доменный метод
async def handle(self, query: GetOrderSummaryQuery) -> OrderSummary:
order = await self._order_repo.by_id(query.order_id) # грузим агрегат
if order.should_be_archived(): # доменный метод в read
await order.archive() # мутация в read-only сессии
return OrderSummary(order_id=str(order.id), ...)
Что здесь не так:
SET TRANSACTION READ ONLYвызовет ошибку от PostgreSQL при попыткеINSERT/UPDATE. Без read-only сессии мутация молча потеряется — нетcommit.- Читающая сторона не должна менять состояние. Если нужно архивировать просроченные заказы — это отдельная команда (
ArchiveStaleOrdersCommand) с собственным handler.
Правильно: query-handler обращается только к ViewRepository и возвращает read-DTO. Никакого агрегата, никаких доменных методов.
Как выглядит структура файлов
core/
└── order/
├── domain/
│ ├── order.py # агрегат
│ └── order_item.py
├── port/
│ ├── out/
│ │ ├── order_repository.py # write-side (агрегат)
│ │ └── order_view_repository.py # read-side
│ └── view.py # read-DTO
└── handler/
├── confirm_order_handler.py # command-handler
└── get_order_summary_handler.py # query-handler
Коротко
- Query —
@dataclass(frozen=True)с параметрами чтения. Не меняет состояние. - Query-handler обращается только к
ViewRepository, возвращает read-DTO. Никакого агрегата. - Read-only сессия (
SET TRANSACTION READ ONLY) — PostgreSQL блокируетUPDATE/INSERTна уровне сервера. ViewRepository— отдельный Protocol безsave/commit. Только методы чтения.- Read-DTO (Pydantic
BaseModelсfrozen=True) — структура под API: денормализованные поля, числа вместо коллекций. - Нельзя грузить агрегат в query-handler и маппить его в DTO — это нивелирует выгоду от разделения.
- Нельзя вызывать доменные методы в query-handler — read-only, побочных эффектов быть не должно.
Что почитать дальше
- Command side — пишущая половина: handler через агрегат и Unit of Work.
- Read-model — где и в каком виде хранить read-данные.
- Sync через события — как read-таблица заполняется из событий write-side.
- Когда CQRS оправдан — когда простое разделение достаточно, когда нужен полный split.