Опирается на правила: R-CQRS-CMD-1R-CQRS-CMD-5 и R-CQRS-CMD-X1R-CQRS-CMD-X3 из CQRS Rules → раздел 2. Command side.

Важно знать

  • Command — @dataclass(frozen=True), реализует Command[R] Protocol (R-UC-1). Без вычислений в __post_init__.
  • Один command меняет один агрегат. Если меняются два — это либо saga, либо границы агрегатов нарезаны неверно.
  • Command-handler открывает UoW, загружает агрегат через Repository, вызывает доменный метод, делает await uow.commit().
  • Возвращает минимум: id изменённой сущности, статус или None. Никаких полных read-DTO.
  • Read внутри command — только await repository.by_id(...) для загрузки агрегата. Отдельный SELECT «прочитать и решить» — нарушение R-CQRS-CMD-X1.
  • Валидация: контракт входа через Pydantic-схему на роутере (R-VLD-WHERE-1); бизнес-инварианты — метод агрегата, бросает domain exception.
  • Изменение нескольких агрегатов в одном UoW без саги — нарушение R-CQRS-CMD-X3.

Command — это намерение изменить состояние. В CQRS это пишущая половина: всё, что меняет данные, едет через command-handler; всё, что читает — через query-handler. Граница жёсткая: AsyncSession с commit открывается на handler-е через UnitOfWork, не на репозитории.

Command — frozen dataclass с маркером

R-CQRS-CMD-1: command — это @dataclass(frozen=True), который реализует Command[R] Protocol из usecase_pattern.

# core/order/command/confirm_order.py
from dataclasses import dataclass
from usecase_pattern import Command
from core.order.domain.value_objects import OrderId

@dataclass(frozen=True)
class ConfirmOrder:  # реализует Command[OrderId] через Protocol
    order_id: OrderId
    idempotency_key: str

Что важно:

  • frozen=True — иммутабельность из коробки. Python запрещает присваивание атрибутов после __init__. hash рассчитывается автоматически по полям — объект безопасно кладётся в set или dict.
  • Никаких вычислений в __post_init__. Command — данные, не логика. Mapping из Pydantic-DTO в command происходит в роутере, не здесь. __post_init__ допустим только для приведения примитивных типов к value object-ам.
  • Command[R] — Protocol, не базовый класс. Python не требует явного implements; структурная типизация через Protocol достаточна. Статический анализ (mypy) проверяет соответствие по сигнатуре.
  • idempotency_key — стандартное поле для неидемпотентных операций. Router берёт его из заголовка Idempotency-Key.

Command меняет один агрегат

R-CQRS-CMD-2: одна команда — одно изменение одного агрегата. Если бизнес-логика затрагивает два агрегата, ситуация одна из двух:

  • Это saga: распределённая последовательность из нескольких локальных команд с компенсациями.
  • Границы агрегатов нарезаны неверно: два объекта, которые всегда меняются вместе, — это один агрегат.
# ПЛОХО — два агрегата в одном UoW
async def handle(self, cmd: CreateOrder) -> OrderId:
    async with self._uow:
        customer = await self._customers.by_id(cmd.customer_id)
        customer.increment_order_count()           # мутация Customer
        await self._customers.save(customer)

        order = Order.create(cmd.customer_id, cmd.items)
        await self._orders.save(order)
        await self._uow.commit()
    return order.id

Что не так: AsyncSession держит строки обоих агрегатов в грязном состоянии, при flush идут два UPDATE. При конкурентном изменении Customer из другого хэндлера возникает StaleDataError SQLAlchemy. Customer и Order живут разными жизнями и имеют разный rate изменений.

# ХОРОШО — один агрегат меняется, событие летит дальше
async def handle(self, cmd: CreateOrder) -> OrderId:
    async with self._uow:
        order = Order.create(cmd.customer_id, cmd.items)
        await self._orders.save(order)
        await self._uow.commit()
        # OrderCreated зарегистрирован внутри агрегата;
        # outbox-relay опубликует событие, Customer-сервис
        # обновит счётчик асинхронно
    return order.id

Customer.order_count обновляется через event-driven, не через transactional coupling. Eventual consistency между агрегатами — норма DDD.

Структура command-handler-а

R-CQRS-CMD-3: классический command-handler — четыре шага в строгом порядке.

# core/order/handler/confirm_order_handler.py
from usecase_pattern import Handler
from core.order.command.confirm_order import ConfirmOrder
from core.order.domain.port import OrderRepository
from core.order.domain.value_objects import OrderId
from core.shared.uow import UnitOfWork
from core.shared.clock import Clock


class ConfirmOrderHandler:
    def __init__(
        self,
        orders: OrderRepository,
        uow: UnitOfWork,
        clock: Clock,
    ) -> None:
        self._orders = orders
        self._uow = uow
        self._clock = clock

    async def handle(self, cmd: ConfirmOrder) -> OrderId:
        async with self._uow:
            # 1. Загрузить агрегат — сессия открыта в RW-режиме
            order = await self._orders.by_id(cmd.order_id)
            if order is None:
                raise OrderNotFound(cmd.order_id)

            # 2. Вызвать доменный метод — он же проверяет инварианты
            order.confirm(self._clock)

            # 3. Сохранить (OrderConfirmed зарегистрирован внутри
            #    order.confirm() через aggregate.register_event)
            await self._orders.save(order)

            # 4. Закоммитить — только здесь, не в репозитории
            await self._uow.commit()

        return order.id

Что важно в каждом шаге:

  • async with self._uow — context manager открывает AsyncSession в RW-режиме и гарантирует rollback при исключении. commit() вызывается явно в handler-е, не внутри репозитория.
  • by_id без read-only флага — write-сессия, SQLAlchemy не добавляет FOR UPDATE автоматически. Если сервис конкурентный, передай явный with_lock=True в порт-интерфейс, который транслируется в .with_for_update() в адаптере.
  • Доменный метод проверяет инварианты. order.confirm() сам бросит OrderAlreadyConfirmed, если статус не подходит. Handler не делает if order.status != Status.NEW: raise.
  • register_event — внутри агрегата. order.confirm() вызывает self._events.append(OrderConfirmed(...)). Репозитория при save вычитывает накопленные события и пишет в outbox. Без этого CQRS-синхронизация невозможна.

Command возвращает минимум

R-CQRS-CMD-4: возвращаемое значение command — это id новой или изменённой сущности, статус, либо None. Не полный read-DTO.

# OK — возвращает id созданной сущности
@dataclass(frozen=True)
class CreateProduct:
    name: str
    price: Decimal
    sku: str

class CreateProductHandler:
    async def handle(self, cmd: CreateProduct) -> ProductId:
        async with self._uow:
            product = Product.create(cmd.name, cmd.price, cmd.sku)
            await self._products.save(product)
            await self._uow.commit()
        return product.id

# OK — возвращает None для fire-and-forget операции
@dataclass(frozen=True)
class CancelOrder:
    order_id: OrderId

class CancelOrderHandler:
    async def handle(self, cmd: CancelOrder) -> None: ...

# ПЛОХО — возвращает полный read-DTO
@dataclass(frozen=True)
class CreateOrder:
    customer_id: CustomerId
    items: list[OrderItemInput]

class CreateOrderHandler:
    async def handle(self, cmd: CreateOrder) -> OrderSummarySchema:  # нарушение R-CQRS-CMD-X2
        ...

Почему не полный read-DTO (R-CQRS-CMD-X2):

  • Смешение обязанностей. Command-handler — это write. Если он начинает собирать read-проекцию, в нём появляются join-запросы, маппинги, вложенные DTO — это работа query-handler-а.
  • При eventual consistency данные могут быть рассинхронизированы. Write-handler видит order.status == CONFIRMED прямо сейчас; query-handler через 100 мс тоже увидит. Если клиенту нужна полная проекция после write — два раздельных вызова с понятным контрактом надёжнее одного с двумя обязанностями.
  • Router делает второй вызов сам. POST /orders → 201 с Location: /orders/{id} и минимумом полей в body. Клиент при необходимости делает GET /orders/{id}/summary.
# core/order/router.py — роутер делает второй вызов сам при необходимости
@router.post("/orders", status_code=201)
async def create_order(
    body: CreateOrderBody,
    handler: CreateOrderHandler = Depends(),
) -> CreatedOrderResponse:
    order_id = await handler.handle(
        CreateOrder(customer_id=body.customer_id, items=body.items)
    )
    return CreatedOrderResponse(id=order_id)
    # Если UI нужен полный view — отдельный GET /orders/{id}/summary

Валидация: контракт vs инвариант

R-CQRS-CMD-5: валидация в command-side происходит в двух местах, и их нельзя путать.

# 1. Контракт входа — Pydantic-схема на роутере
class ConfirmOrderBody(BaseModel):
    idempotency_key: str = Field(min_length=1, max_length=64)

@router.post("/orders/{order_id}/confirm")
async def confirm_order(
    order_id: UUID,
    body: ConfirmOrderBody,
    handler: ConfirmOrderHandler = Depends(),
) -> None:
    await handler.handle(
        ConfirmOrder(
            order_id=OrderId(order_id),
            idempotency_key=body.idempotency_key,
        )
    )
# 2. Бизнес-инвариант — метод агрегата, бросает domain exception
class Order:
    def confirm(self, clock: Clock) -> None:
        if self.status != OrderStatus.NEW:
            raise OrderAlreadyConfirmed(self.id, self.status)
        if not self.items:
            raise EmptyOrderCannotBeConfirmed(self.id)
        self.status = OrderStatus.CONFIRMED
        self._register_event(OrderConfirmed(self.id, clock.now()))

Pydantic проверяет форму и тип входа: обязательные поля, длины строк, форматы. Агрегат проверяет бизнес-правила: можно ли подтвердить заказ в текущем состоянии. Эти слои независимы: Pydantic-ошибка — 422 Unprocessable Entity, domain exception — 409 Conflict или 400 Bad Request через exception handler.

Что запрещено

Отдельный SELECT «прочитать и решить» в command

R-CQRS-CMD-X1: внутри command-handler нет отдельного read-запроса для «посмотреть и подумать». Всё, что нужно для решения, — это агрегат, загруженный одним by_id.

# ПЛОХО — отдельный SELECT в command
async def handle(self, cmd: ConfirmOrder) -> OrderId:
    async with self._uow:
        has_payment = await self._payments.exists_by_order(cmd.order_id)  # отдельный read
        if not has_payment:
            raise PaymentRequired(cmd.order_id)
        order = await self._orders.by_id(cmd.order_id)
        order.confirm(self._clock)
        await self._orders.save(order)
        await self._uow.commit()
    return order.id

# ХОРОШО — payment_status хранится на агрегате Order
async def handle(self, cmd: ConfirmOrder) -> OrderId:
    async with self._uow:
        order = await self._orders.by_id(cmd.order_id)
        order.confirm(self._clock)  # сам проверит payment_status
        await self._orders.save(order)
        await self._uow.commit()
    return order.id

Что не так с первым вариантом:

  • Read-логика просочилась в write-handler. Если завтра правило изменится — «ещё проверить лимит клиента» — command-handler разрастётся.
  • Race condition: между exists_by_order и by_id платёж может появиться или исчезнуть. Без пессимистичной блокировки проверка не консистентна.

Корректно: либо payment_status — поле агрегата Order, и order.confirm() сам проверяет его, либо это разные bounded context-ы, и проверка идёт через explicit query-handler, а не склеена сбоку в command.

Command возвращает полный read-DTO

R-CQRS-CMD-X2 — см. выше раздел «Command возвращает минимум». Read-проекция — работа query-handler-а.

Несколько агрегатов в одном UoW без саги

R-CQRS-CMD-X3: если команда меняет два независимых агрегата — это saga, не один handler.

# ПЛОХО — два агрегата в одной сессии
async def handle(self, cmd: TransferFunds) -> None:
    async with self._uow:
        sender = await self._accounts.by_id(cmd.sender_id)
        receiver = await self._accounts.by_id(cmd.receiver_id)
        sender.debit(cmd.amount)
        receiver.credit(cmd.amount)
        await self._accounts.save(sender)
        await self._accounts.save(receiver)
        await self._uow.commit()

# ХОРОШО — orchestration saga
# 1. Command DebitAccount → publishes AccountDebited
# 2. Event AccountDebited → saga orchestrator → Command CreditAccount
# 3. На любой failure → компенсирующий RefundAccount

Почему: при конкурентном встречном переводе StaleDataError SQLAlchemy на sender или receiver откатит всю транзакцию. Saga даёт явную state-machine с компенсациями и не держит длинную сессию.

Что запрещено — таблица

АнтипаттернПравилоЧто взамен
Command-handler делает отдельный SELECT для принятия решенияR-CQRS-CMD-X1Загрузка агрегата one-shot; проверки внутри метода агрегата
Command возвращает полный read-DTO (OrderSummarySchema)R-CQRS-CMD-X2id / None + отдельный query-handler
Два агрегата мутируются в одном async with uowR-CQRS-CMD-X3Saga с локальными командами и компенсациями
Загрузка агрегата без явного with_lock=True в конкурентном writeR-CQRS-CMD-3.with_for_update() в адаптере репозитория
if order.status != Status.NEW: raise прямо в handler-еR-CQRS-CMD-5Инвариант в методе агрегата + domain exception
await uow.commit() внутри репозитория, не handler-аR-CQRS-CMD-3Commit только на уровне handler-а через UnitOfWork

Куда дальше

  • CQRS → раздел 2. Command side — нормативные формулировки R-CQRS-CMD-*.
  • Query side — read-handler через ViewRepository и read-only AsyncSession.
  • Sync через события — как outbox-событие из command-handler доходит до read-model.
  • Read-model — денормализованная проекция, rebuild-скрипт, восстановимость.
  • Уровень и эволюция — Уровень 2 (lightweight) → Уровень 3 (ViewRepository) → event-driven.
  • Когда CQRS оправдан — когда lightweight, когда полный CQRS, когда не нужен вообще.