Опирается на правила:
R-CQRS-CMD-1…R-CQRS-CMD-5иR-CQRS-CMD-X1…R-CQRS-CMD-X3из CQRS Rules → раздел 2. Command side.
Важно знать
- Command —
@dataclass(frozen=True), реализуетCommand[R]Protocol (R-UC-1). Без вычислений в__post_init__.- Один command меняет один агрегат. Если меняются два — это либо saga, либо границы агрегатов нарезаны неверно.
- Command-handler открывает UoW, загружает агрегат через
Repository, вызывает доменный метод, делаетawait uow.commit().- Возвращает минимум: id изменённой сущности, статус или
None. Никаких полных read-DTO.- Read внутри command — только
await repository.by_id(...)для загрузки агрегата. ОтдельныйSELECT«прочитать и решить» — нарушениеR-CQRS-CMD-X1.- Валидация: контракт входа через Pydantic-схему на роутере (
R-VLD-WHERE-1); бизнес-инварианты — метод агрегата, бросает domain exception.- Изменение нескольких агрегатов в одном UoW без саги — нарушение
R-CQRS-CMD-X3.
Command — это намерение изменить состояние. В CQRS это пишущая половина: всё, что меняет данные, едет через command-handler; всё, что читает — через query-handler. Граница жёсткая: AsyncSession с commit открывается на handler-е через UnitOfWork, не на репозитории.
Command — frozen dataclass с маркером
R-CQRS-CMD-1: command — это @dataclass(frozen=True), который реализует Command[R] Protocol из usecase_pattern.
# core/order/command/confirm_order.py
from dataclasses import dataclass
from usecase_pattern import Command
from core.order.domain.value_objects import OrderId
@dataclass(frozen=True)
class ConfirmOrder: # реализует Command[OrderId] через Protocol
order_id: OrderId
idempotency_key: str
Что важно:
frozen=True— иммутабельность из коробки. Python запрещает присваивание атрибутов после__init__.hashрассчитывается автоматически по полям — объект безопасно кладётся в set или dict.- Никаких вычислений в
__post_init__. Command — данные, не логика. Mapping из Pydantic-DTO в command происходит в роутере, не здесь.__post_init__допустим только для приведения примитивных типов к value object-ам. Command[R]— Protocol, не базовый класс. Python не требует явногоimplements; структурная типизация через Protocol достаточна. Статический анализ (mypy) проверяет соответствие по сигнатуре.idempotency_key— стандартное поле для неидемпотентных операций. Router берёт его из заголовкаIdempotency-Key.
Command меняет один агрегат
R-CQRS-CMD-2: одна команда — одно изменение одного агрегата. Если бизнес-логика затрагивает два агрегата, ситуация одна из двух:
- Это saga: распределённая последовательность из нескольких локальных команд с компенсациями.
- Границы агрегатов нарезаны неверно: два объекта, которые всегда меняются вместе, — это один агрегат.
# ПЛОХО — два агрегата в одном UoW
async def handle(self, cmd: CreateOrder) -> OrderId:
async with self._uow:
customer = await self._customers.by_id(cmd.customer_id)
customer.increment_order_count() # мутация Customer
await self._customers.save(customer)
order = Order.create(cmd.customer_id, cmd.items)
await self._orders.save(order)
await self._uow.commit()
return order.id
Что не так: AsyncSession держит строки обоих агрегатов в грязном состоянии, при flush идут два UPDATE. При конкурентном изменении Customer из другого хэндлера возникает StaleDataError SQLAlchemy. Customer и Order живут разными жизнями и имеют разный rate изменений.
# ХОРОШО — один агрегат меняется, событие летит дальше
async def handle(self, cmd: CreateOrder) -> OrderId:
async with self._uow:
order = Order.create(cmd.customer_id, cmd.items)
await self._orders.save(order)
await self._uow.commit()
# OrderCreated зарегистрирован внутри агрегата;
# outbox-relay опубликует событие, Customer-сервис
# обновит счётчик асинхронно
return order.id
Customer.order_count обновляется через event-driven, не через transactional coupling. Eventual consistency между агрегатами — норма DDD.
Структура command-handler-а
R-CQRS-CMD-3: классический command-handler — четыре шага в строгом порядке.
# core/order/handler/confirm_order_handler.py
from usecase_pattern import Handler
from core.order.command.confirm_order import ConfirmOrder
from core.order.domain.port import OrderRepository
from core.order.domain.value_objects import OrderId
from core.shared.uow import UnitOfWork
from core.shared.clock import Clock
class ConfirmOrderHandler:
def __init__(
self,
orders: OrderRepository,
uow: UnitOfWork,
clock: Clock,
) -> None:
self._orders = orders
self._uow = uow
self._clock = clock
async def handle(self, cmd: ConfirmOrder) -> OrderId:
async with self._uow:
# 1. Загрузить агрегат — сессия открыта в RW-режиме
order = await self._orders.by_id(cmd.order_id)
if order is None:
raise OrderNotFound(cmd.order_id)
# 2. Вызвать доменный метод — он же проверяет инварианты
order.confirm(self._clock)
# 3. Сохранить (OrderConfirmed зарегистрирован внутри
# order.confirm() через aggregate.register_event)
await self._orders.save(order)
# 4. Закоммитить — только здесь, не в репозитории
await self._uow.commit()
return order.id
Что важно в каждом шаге:
async with self._uow— context manager открываетAsyncSessionв RW-режиме и гарантирует rollback при исключении.commit()вызывается явно в handler-е, не внутри репозитория.by_idбез read-only флага — write-сессия, SQLAlchemy не добавляетFOR UPDATEавтоматически. Если сервис конкурентный, передай явныйwith_lock=Trueв порт-интерфейс, который транслируется в.with_for_update()в адаптере.- Доменный метод проверяет инварианты.
order.confirm()сам броситOrderAlreadyConfirmed, если статус не подходит. Handler не делаетif order.status != Status.NEW: raise. register_event— внутри агрегата.order.confirm()вызываетself._events.append(OrderConfirmed(...)). Репозитория приsaveвычитывает накопленные события и пишет в outbox. Без этого CQRS-синхронизация невозможна.
Command возвращает минимум
R-CQRS-CMD-4: возвращаемое значение command — это id новой или изменённой сущности, статус, либо None. Не полный read-DTO.
# OK — возвращает id созданной сущности
@dataclass(frozen=True)
class CreateProduct:
name: str
price: Decimal
sku: str
class CreateProductHandler:
async def handle(self, cmd: CreateProduct) -> ProductId:
async with self._uow:
product = Product.create(cmd.name, cmd.price, cmd.sku)
await self._products.save(product)
await self._uow.commit()
return product.id
# OK — возвращает None для fire-and-forget операции
@dataclass(frozen=True)
class CancelOrder:
order_id: OrderId
class CancelOrderHandler:
async def handle(self, cmd: CancelOrder) -> None: ...
# ПЛОХО — возвращает полный read-DTO
@dataclass(frozen=True)
class CreateOrder:
customer_id: CustomerId
items: list[OrderItemInput]
class CreateOrderHandler:
async def handle(self, cmd: CreateOrder) -> OrderSummarySchema: # нарушение R-CQRS-CMD-X2
...
Почему не полный read-DTO (R-CQRS-CMD-X2):
- Смешение обязанностей. Command-handler — это write. Если он начинает собирать read-проекцию, в нём появляются join-запросы, маппинги, вложенные DTO — это работа query-handler-а.
- При eventual consistency данные могут быть рассинхронизированы. Write-handler видит
order.status == CONFIRMEDпрямо сейчас; query-handler через 100 мс тоже увидит. Если клиенту нужна полная проекция после write — два раздельных вызова с понятным контрактом надёжнее одного с двумя обязанностями. - Router делает второй вызов сам.
POST /orders→ 201 сLocation: /orders/{id}и минимумом полей в body. Клиент при необходимости делаетGET /orders/{id}/summary.
# core/order/router.py — роутер делает второй вызов сам при необходимости
@router.post("/orders", status_code=201)
async def create_order(
body: CreateOrderBody,
handler: CreateOrderHandler = Depends(),
) -> CreatedOrderResponse:
order_id = await handler.handle(
CreateOrder(customer_id=body.customer_id, items=body.items)
)
return CreatedOrderResponse(id=order_id)
# Если UI нужен полный view — отдельный GET /orders/{id}/summary
Валидация: контракт vs инвариант
R-CQRS-CMD-5: валидация в command-side происходит в двух местах, и их нельзя путать.
# 1. Контракт входа — Pydantic-схема на роутере
class ConfirmOrderBody(BaseModel):
idempotency_key: str = Field(min_length=1, max_length=64)
@router.post("/orders/{order_id}/confirm")
async def confirm_order(
order_id: UUID,
body: ConfirmOrderBody,
handler: ConfirmOrderHandler = Depends(),
) -> None:
await handler.handle(
ConfirmOrder(
order_id=OrderId(order_id),
idempotency_key=body.idempotency_key,
)
)
# 2. Бизнес-инвариант — метод агрегата, бросает domain exception
class Order:
def confirm(self, clock: Clock) -> None:
if self.status != OrderStatus.NEW:
raise OrderAlreadyConfirmed(self.id, self.status)
if not self.items:
raise EmptyOrderCannotBeConfirmed(self.id)
self.status = OrderStatus.CONFIRMED
self._register_event(OrderConfirmed(self.id, clock.now()))
Pydantic проверяет форму и тип входа: обязательные поля, длины строк, форматы. Агрегат проверяет бизнес-правила: можно ли подтвердить заказ в текущем состоянии. Эти слои независимы: Pydantic-ошибка — 422 Unprocessable Entity, domain exception — 409 Conflict или 400 Bad Request через exception handler.
Что запрещено
Отдельный SELECT «прочитать и решить» в command
R-CQRS-CMD-X1: внутри command-handler нет отдельного read-запроса для «посмотреть и подумать». Всё, что нужно для решения, — это агрегат, загруженный одним by_id.
# ПЛОХО — отдельный SELECT в command
async def handle(self, cmd: ConfirmOrder) -> OrderId:
async with self._uow:
has_payment = await self._payments.exists_by_order(cmd.order_id) # отдельный read
if not has_payment:
raise PaymentRequired(cmd.order_id)
order = await self._orders.by_id(cmd.order_id)
order.confirm(self._clock)
await self._orders.save(order)
await self._uow.commit()
return order.id
# ХОРОШО — payment_status хранится на агрегате Order
async def handle(self, cmd: ConfirmOrder) -> OrderId:
async with self._uow:
order = await self._orders.by_id(cmd.order_id)
order.confirm(self._clock) # сам проверит payment_status
await self._orders.save(order)
await self._uow.commit()
return order.id
Что не так с первым вариантом:
- Read-логика просочилась в write-handler. Если завтра правило изменится — «ещё проверить лимит клиента» — command-handler разрастётся.
- Race condition: между
exists_by_orderиby_idплатёж может появиться или исчезнуть. Без пессимистичной блокировки проверка не консистентна.
Корректно: либо payment_status — поле агрегата Order, и order.confirm() сам проверяет его, либо это разные bounded context-ы, и проверка идёт через explicit query-handler, а не склеена сбоку в command.
Command возвращает полный read-DTO
R-CQRS-CMD-X2 — см. выше раздел «Command возвращает минимум». Read-проекция — работа query-handler-а.
Несколько агрегатов в одном UoW без саги
R-CQRS-CMD-X3: если команда меняет два независимых агрегата — это saga, не один handler.
# ПЛОХО — два агрегата в одной сессии
async def handle(self, cmd: TransferFunds) -> None:
async with self._uow:
sender = await self._accounts.by_id(cmd.sender_id)
receiver = await self._accounts.by_id(cmd.receiver_id)
sender.debit(cmd.amount)
receiver.credit(cmd.amount)
await self._accounts.save(sender)
await self._accounts.save(receiver)
await self._uow.commit()
# ХОРОШО — orchestration saga
# 1. Command DebitAccount → publishes AccountDebited
# 2. Event AccountDebited → saga orchestrator → Command CreditAccount
# 3. На любой failure → компенсирующий RefundAccount
Почему: при конкурентном встречном переводе StaleDataError SQLAlchemy на sender или receiver откатит всю транзакцию. Saga даёт явную state-machine с компенсациями и не держит длинную сессию.
Что запрещено — таблица
| Антипаттерн | Правило | Что взамен |
|---|---|---|
Command-handler делает отдельный SELECT для принятия решения | R-CQRS-CMD-X1 | Загрузка агрегата one-shot; проверки внутри метода агрегата |
Command возвращает полный read-DTO (OrderSummarySchema) | R-CQRS-CMD-X2 | id / None + отдельный query-handler |
Два агрегата мутируются в одном async with uow | R-CQRS-CMD-X3 | Saga с локальными командами и компенсациями |
Загрузка агрегата без явного with_lock=True в конкурентном write | R-CQRS-CMD-3 | .with_for_update() в адаптере репозитория |
if order.status != Status.NEW: raise прямо в handler-е | R-CQRS-CMD-5 | Инвариант в методе агрегата + domain exception |
await uow.commit() внутри репозитория, не handler-а | R-CQRS-CMD-3 | Commit только на уровне handler-а через UnitOfWork |
Куда дальше
- CQRS → раздел 2. Command side — нормативные формулировки
R-CQRS-CMD-*. - Query side — read-handler через
ViewRepositoryи read-onlyAsyncSession. - Sync через события — как outbox-событие из command-handler доходит до read-model.
- Read-model — денормализованная проекция, rebuild-скрипт, восстановимость.
- Уровень и эволюция — Уровень 2 (lightweight) → Уровень 3 (ViewRepository) → event-driven.
- Когда CQRS оправдан — когда lightweight, когда полный CQRS, когда не нужен вообще.