← назад к разделу

В CQRS всё, что меняет данные, идёт через команды. Всё, что читает — через запросы. Граница жёсткая и не смешивается. Эта статья — про пишущую половину: что такое команда, как выглядит обработчик и где расставить проверки.

Зачем вообще разделять чтение и запись

Представь обычный сервис: один метод get_order() читает данные, другой confirm_order() их меняет. Пока нагрузка небольшая — всё нормально. Но с ростом системы чтение и запись начинают мешать друг другу. Запросы на чтение хотят денормализованных данных и кэша, запросы на запись — строгих транзакций и проверки бизнес-правил.

CQRS решает это разделением: команды (Commands) меняют состояние через агрегат и UnitOfWork; запросы (Queries) читают через отдельные read-модели. Каждый путь оптимизирован под свою задачу.

Что такое команда

Команда — это намерение изменить состояние. Не запрос информации, а просьба что-то сделать: «подтверди заказ», «создай продукт», «отмени платёж».

В Python команды делают через @dataclass(frozen=True):

# core/order/command/confirm_order.py
from dataclasses import dataclass
from core.order.domain.value_objects import OrderId

@dataclass(frozen=True)
class ConfirmOrder:
    order_id: OrderId
    idempotency_key: str

frozen=True делает объект неизменяемым: после создания поменять поля нельзя. Это важно — команда описывает конкретное намерение, которое не должно меняться в процессе выполнения.

Несколько правил для команд:

  • Никакой логики внутри. Команда — данные, не поведение. Маппинг из входящего запроса API происходит в роутере, а не здесь.
  • __post_init__ допустим только для приведения простых типов к value object-ам.
  • idempotency_key — стандартное поле для операций, которые нельзя случайно выполнить дважды. Роутер берёт его из заголовка Idempotency-Key.

Структура обработчика

Обработчик команды (handler) выполняет одно действие в четыре шага: загружает агрегат, вызывает доменный метод, сохраняет, фиксирует транзакцию.

# core/order/handler/confirm_order_handler.py
from core.order.command.confirm_order import ConfirmOrder
from core.order.domain.port import OrderRepository
from core.order.domain.value_objects import OrderId
from core.shared.uow import UnitOfWork
from core.shared.clock import Clock


class ConfirmOrderHandler:
    def __init__(
        self,
        orders: OrderRepository,
        uow: UnitOfWork,
        clock: Clock,
    ) -> None:
        self._orders = orders
        self._uow = uow
        self._clock = clock

    async def handle(self, cmd: ConfirmOrder) -> OrderId:
        async with self._uow:
            # 1. Загрузить агрегат
            order = await self._orders.by_id(cmd.order_id)
            if order is None:
                raise OrderNotFound(cmd.order_id)

            # 2. Вызвать доменный метод — он проверяет бизнес-правила
            order.confirm(self._clock)

            # 3. Сохранить агрегат
            await self._orders.save(order)

            # 4. Зафиксировать транзакцию
            await self._uow.commit()

        return order.id

Несколько деталей, которые важны:

async with self._uow — контекстный менеджер открывает сессию SQLAlchemy и при исключении автоматически делает откат. commit() вызывается явно в обработчике, не где-то внутри репозитория.

Доменный метод проверяет инварианты. order.confirm() сам бросит OrderAlreadyConfirmed, если заказ уже подтверждён. Обработчику не нужно дублировать эту проверку.

События регистрируются внутри агрегата. Метод order.confirm() вызывает внутри себя self._events.append(OrderConfirmed(...)). Репозиторий при save читает накопленные события и записывает их в outbox для последующей публикации.

Одна команда — один агрегат

Важное ограничение: одна команда меняет ровно один агрегат. Если бизнес-логика требует изменить два — это сигнал, что что-то не так.

Плохой вариант — менять два агрегата в одной транзакции:

# Так делать не нужно — два агрегата в одном UoW
async def handle(self, cmd: CreateOrder) -> OrderId:
    async with self._uow:
        customer = await self._customers.by_id(cmd.customer_id)
        customer.increment_order_count()
        await self._customers.save(customer)

        order = Order.create(cmd.customer_id, cmd.items)
        await self._orders.save(order)
        await self._uow.commit()
    return order.id

Проблема: при конкурентном изменении Customer из другого обработчика SQLAlchemy выбросит StaleDataError. Customer и Order живут по-разному — у них разная частота изменений и разные причины меняться.

Правильный вариант — один агрегат, событие летит дальше:

# Один агрегат, изменения в другие распространяются через события
async def handle(self, cmd: CreateOrder) -> OrderId:
    async with self._uow:
        order = Order.create(cmd.customer_id, cmd.items)
        await self._orders.save(order)
        await self._uow.commit()
        # OrderCreated зарегистрирован внутри агрегата;
        # обработчик события обновит Customer асинхронно
    return order.id

Если два объекта должны меняться всегда вместе, возможно, они один агрегат. Если меняются независимо — используют событийную цепочку или сагу.

Что возвращает обработчик

Обработчик команды возвращает минимум: идентификатор созданной или изменённой сущности, статус, либо ничего (None).

# Возвращает id созданной сущности — нормально
class CreateProductHandler:
    async def handle(self, cmd: CreateProduct) -> ProductId:
        async with self._uow:
            product = Product.create(cmd.name, cmd.price, cmd.sku)
            await self._products.save(product)
            await self._uow.commit()
        return product.id

# Возвращает None для простых операций — тоже нормально
class CancelOrderHandler:
    async def handle(self, cmd: CancelOrder) -> None: ...

Полный read-DTO (например OrderSummarySchema со всеми полями заказа) из обработчика команды — типичная ошибка. Обработчик команды отвечает только за запись; читать данные — задача отдельного запроса (query-handler).

Роутер после команды возвращает 201 Created с Location: /orders/{id}. Если клиенту нужен полный вид — он делает отдельный GET-запрос:

# core/order/router.py
@router.post("/orders", status_code=201)
async def create_order(
    body: CreateOrderBody,
    handler: CreateOrderHandler = Depends(),
) -> CreatedOrderResponse:
    order_id = await handler.handle(
        CreateOrder(customer_id=body.customer_id, items=body.items)
    )
    return CreatedOrderResponse(id=order_id)

Где валидировать

Проверки делятся на два уровня, и их нельзя смешивать.

Первый уровень — форма входящего запроса. Pydantic-схема на роутере проверяет обязательные поля, типы, длины строк. Ошибка здесь — 422 Unprocessable Entity.

class ConfirmOrderBody(BaseModel):
    idempotency_key: str = Field(min_length=1, max_length=64)

@router.post("/orders/{order_id}/confirm")
async def confirm_order(
    order_id: UUID,
    body: ConfirmOrderBody,
    handler: ConfirmOrderHandler = Depends(),
) -> None:
    await handler.handle(
        ConfirmOrder(
            order_id=OrderId(order_id),
            idempotency_key=body.idempotency_key,
        )
    )

Второй уровень — бизнес-правила. Метод агрегата проверяет, можно ли выполнить операцию в текущем состоянии. Ошибка здесь — доменное исключение, которое превращается в 409 Conflict или 400 Bad Request.

class Order:
    def confirm(self, clock: Clock) -> None:
        if self.status != OrderStatus.NEW:
            raise OrderAlreadyConfirmed(self.id, self.status)
        if not self.items:
            raise EmptyOrderCannotBeConfirmed(self.id)
        self.status = OrderStatus.CONFIRMED
        self._register_event(OrderConfirmed(self.id, clock.now()))

Эти два слоя независимы. Pydantic не знает про бизнес-состояние агрегата, агрегат не знает про HTTP.

Частые ошибки

Отдельный SELECT для принятия решения. Иногда пишут так: «сначала проверю, есть ли платёж, потом загружу заказ». Это проблема: между двумя запросами данные могут измениться, а read-логика просочилась в write-путь. Если агрегату нужно знать про платёж — payment_status должно быть полем агрегата, и order.confirm() проверяет его сам.

Два агрегата в одной транзакции — разобрано выше. Используй сагу или событийную цепочку.

commit() внутри репозитория. Репозиторий занимается только сохранением агрегата. Транзакцию фиксирует обработчик через UnitOfWork.

Проверка инварианта в обработчике вместо агрегата. if order.status != Status.NEW: raise в handler-е — это логика, которая принадлежит агрегату. Если это правило изменится, его нужно будет найти и поменять в каждом обработчике.

Коротко

  • Команда — @dataclass(frozen=True) без логики внутри. Описывает намерение изменить состояние.
  • Обработчик следует четырём шагам: загрузить агрегат → вызвать доменный метод → сохранить → зафиксировать транзакцию.
  • commit() вызывается в обработчике, не в репозитории.
  • Одна команда меняет один агрегат. Два агрегата — это сага или неверно нарезанные границы.
  • Обработчик возвращает id или None, но не полный read-DTO.
  • Pydantic проверяет форму входа, агрегат проверяет бизнес-правила. Эти слои независимы.
  • Доменные события регистрируются внутри агрегата, не снаружи.

Что почитать дальше

  • Query side — read-handler через ViewRepository и read-only сессию.
  • Sync через события — как событие из команды доходит до read-модели через outbox.
  • Read-model — денормализованная проекция и её восстановление.
  • Когда CQRS оправдан — когда стоит применять, а когда достаточно простого CRUD.