В CQRS всё, что меняет данные, идёт через команды. Всё, что читает — через запросы. Граница жёсткая и не смешивается. Эта статья — про пишущую половину: что такое команда, как выглядит обработчик и где расставить проверки.
Зачем вообще разделять чтение и запись
Представь обычный сервис: один метод get_order() читает данные, другой confirm_order() их меняет. Пока нагрузка небольшая — всё нормально. Но с ростом системы чтение и запись начинают мешать друг другу. Запросы на чтение хотят денормализованных данных и кэша, запросы на запись — строгих транзакций и проверки бизнес-правил.
CQRS решает это разделением: команды (Commands) меняют состояние через агрегат и UnitOfWork; запросы (Queries) читают через отдельные read-модели. Каждый путь оптимизирован под свою задачу.
Что такое команда
Команда — это намерение изменить состояние. Не запрос информации, а просьба что-то сделать: «подтверди заказ», «создай продукт», «отмени платёж».
В Python команды делают через @dataclass(frozen=True):
# core/order/command/confirm_order.py
from dataclasses import dataclass
from core.order.domain.value_objects import OrderId
@dataclass(frozen=True)
class ConfirmOrder:
order_id: OrderId
idempotency_key: str
frozen=True делает объект неизменяемым: после создания поменять поля нельзя. Это важно — команда описывает конкретное намерение, которое не должно меняться в процессе выполнения.
Несколько правил для команд:
- Никакой логики внутри. Команда — данные, не поведение. Маппинг из входящего запроса API происходит в роутере, а не здесь.
__post_init__допустим только для приведения простых типов к value object-ам.idempotency_key— стандартное поле для операций, которые нельзя случайно выполнить дважды. Роутер берёт его из заголовкаIdempotency-Key.
Структура обработчика
Обработчик команды (handler) выполняет одно действие в четыре шага: загружает агрегат, вызывает доменный метод, сохраняет, фиксирует транзакцию.
# core/order/handler/confirm_order_handler.py
from core.order.command.confirm_order import ConfirmOrder
from core.order.domain.port import OrderRepository
from core.order.domain.value_objects import OrderId
from core.shared.uow import UnitOfWork
from core.shared.clock import Clock
class ConfirmOrderHandler:
def __init__(
self,
orders: OrderRepository,
uow: UnitOfWork,
clock: Clock,
) -> None:
self._orders = orders
self._uow = uow
self._clock = clock
async def handle(self, cmd: ConfirmOrder) -> OrderId:
async with self._uow:
# 1. Загрузить агрегат
order = await self._orders.by_id(cmd.order_id)
if order is None:
raise OrderNotFound(cmd.order_id)
# 2. Вызвать доменный метод — он проверяет бизнес-правила
order.confirm(self._clock)
# 3. Сохранить агрегат
await self._orders.save(order)
# 4. Зафиксировать транзакцию
await self._uow.commit()
return order.id
Несколько деталей, которые важны:
async with self._uow — контекстный менеджер открывает сессию SQLAlchemy и при исключении автоматически делает откат. commit() вызывается явно в обработчике, не где-то внутри репозитория.
Доменный метод проверяет инварианты. order.confirm() сам бросит OrderAlreadyConfirmed, если заказ уже подтверждён. Обработчику не нужно дублировать эту проверку.
События регистрируются внутри агрегата. Метод order.confirm() вызывает внутри себя self._events.append(OrderConfirmed(...)). Репозиторий при save читает накопленные события и записывает их в outbox для последующей публикации.
Одна команда — один агрегат
Важное ограничение: одна команда меняет ровно один агрегат. Если бизнес-логика требует изменить два — это сигнал, что что-то не так.
Плохой вариант — менять два агрегата в одной транзакции:
# Так делать не нужно — два агрегата в одном UoW
async def handle(self, cmd: CreateOrder) -> OrderId:
async with self._uow:
customer = await self._customers.by_id(cmd.customer_id)
customer.increment_order_count()
await self._customers.save(customer)
order = Order.create(cmd.customer_id, cmd.items)
await self._orders.save(order)
await self._uow.commit()
return order.id
Проблема: при конкурентном изменении Customer из другого обработчика SQLAlchemy выбросит StaleDataError. Customer и Order живут по-разному — у них разная частота изменений и разные причины меняться.
Правильный вариант — один агрегат, событие летит дальше:
# Один агрегат, изменения в другие распространяются через события
async def handle(self, cmd: CreateOrder) -> OrderId:
async with self._uow:
order = Order.create(cmd.customer_id, cmd.items)
await self._orders.save(order)
await self._uow.commit()
# OrderCreated зарегистрирован внутри агрегата;
# обработчик события обновит Customer асинхронно
return order.id
Если два объекта должны меняться всегда вместе, возможно, они один агрегат. Если меняются независимо — используют событийную цепочку или сагу.
Что возвращает обработчик
Обработчик команды возвращает минимум: идентификатор созданной или изменённой сущности, статус, либо ничего (None).
# Возвращает id созданной сущности — нормально
class CreateProductHandler:
async def handle(self, cmd: CreateProduct) -> ProductId:
async with self._uow:
product = Product.create(cmd.name, cmd.price, cmd.sku)
await self._products.save(product)
await self._uow.commit()
return product.id
# Возвращает None для простых операций — тоже нормально
class CancelOrderHandler:
async def handle(self, cmd: CancelOrder) -> None: ...
Полный read-DTO (например OrderSummarySchema со всеми полями заказа) из обработчика команды — типичная ошибка. Обработчик команды отвечает только за запись; читать данные — задача отдельного запроса (query-handler).
Роутер после команды возвращает 201 Created с Location: /orders/{id}. Если клиенту нужен полный вид — он делает отдельный GET-запрос:
# core/order/router.py
@router.post("/orders", status_code=201)
async def create_order(
body: CreateOrderBody,
handler: CreateOrderHandler = Depends(),
) -> CreatedOrderResponse:
order_id = await handler.handle(
CreateOrder(customer_id=body.customer_id, items=body.items)
)
return CreatedOrderResponse(id=order_id)
Где валидировать
Проверки делятся на два уровня, и их нельзя смешивать.
Первый уровень — форма входящего запроса. Pydantic-схема на роутере проверяет обязательные поля, типы, длины строк. Ошибка здесь — 422 Unprocessable Entity.
class ConfirmOrderBody(BaseModel):
idempotency_key: str = Field(min_length=1, max_length=64)
@router.post("/orders/{order_id}/confirm")
async def confirm_order(
order_id: UUID,
body: ConfirmOrderBody,
handler: ConfirmOrderHandler = Depends(),
) -> None:
await handler.handle(
ConfirmOrder(
order_id=OrderId(order_id),
idempotency_key=body.idempotency_key,
)
)
Второй уровень — бизнес-правила. Метод агрегата проверяет, можно ли выполнить операцию в текущем состоянии. Ошибка здесь — доменное исключение, которое превращается в 409 Conflict или 400 Bad Request.
class Order:
def confirm(self, clock: Clock) -> None:
if self.status != OrderStatus.NEW:
raise OrderAlreadyConfirmed(self.id, self.status)
if not self.items:
raise EmptyOrderCannotBeConfirmed(self.id)
self.status = OrderStatus.CONFIRMED
self._register_event(OrderConfirmed(self.id, clock.now()))
Эти два слоя независимы. Pydantic не знает про бизнес-состояние агрегата, агрегат не знает про HTTP.
Частые ошибки
Отдельный SELECT для принятия решения. Иногда пишут так: «сначала проверю, есть ли платёж, потом загружу заказ». Это проблема: между двумя запросами данные могут измениться, а read-логика просочилась в write-путь. Если агрегату нужно знать про платёж — payment_status должно быть полем агрегата, и order.confirm() проверяет его сам.
Два агрегата в одной транзакции — разобрано выше. Используй сагу или событийную цепочку.
commit() внутри репозитория. Репозиторий занимается только сохранением агрегата. Транзакцию фиксирует обработчик через UnitOfWork.
Проверка инварианта в обработчике вместо агрегата. if order.status != Status.NEW: raise в handler-е — это логика, которая принадлежит агрегату. Если это правило изменится, его нужно будет найти и поменять в каждом обработчике.
Коротко
- Команда —
@dataclass(frozen=True)без логики внутри. Описывает намерение изменить состояние. - Обработчик следует четырём шагам: загрузить агрегат → вызвать доменный метод → сохранить → зафиксировать транзакцию.
commit()вызывается в обработчике, не в репозитории.- Одна команда меняет один агрегат. Два агрегата — это сага или неверно нарезанные границы.
- Обработчик возвращает
idилиNone, но не полный read-DTO. - Pydantic проверяет форму входа, агрегат проверяет бизнес-правила. Эти слои независимы.
- Доменные события регистрируются внутри агрегата, не снаружи.
Что почитать дальше
- Query side — read-handler через ViewRepository и read-only сессию.
- Sync через события — как событие из команды доходит до read-модели через outbox.
- Read-model — денормализованная проекция и её восстановление.
- Когда CQRS оправдан — когда стоит применять, а когда достаточно простого CRUD.