Опирается на правила:
R-EVT-1…R-EVT-5иR-EVT-X1…R-EVT-X4из DDD Tactical Style Guide → раздел 4. Domain Event.
Важно знать
- Domain Event — факт, который уже произошёл в домене. Имя — глагол в прошедшем времени:
OrderConfirmed,CustomerRegistered, неConfirmOrderи неOrderEvent.- Идиома Python: событие наследует
DomainEventи помечено@dataclass(frozen=True). Иммутабельность бесплатно.- В полях — примитивы и VO, не агрегаты.
UUID,Decimal,str,datetime— да.Order order— нет (R-EVT-X2).- Корень регистрирует события через
self._register_event(...). Репозиторий/UoW забирает их черезaggregate.pull_events()после сохранения, затем публикует.asyncio.create_task(publish_event(...))после commit — аналог JavaAFTER_COMMIT. Для критичных эффектов (деньги, склад) — только Outbox (R-EVT-X4).- Суффикс
Eventне добавляется:OrderConfirmed, неOrderConfirmedEvent.- Базовый
DomainEventнесётevent_id: UUID,occurred_at: datetime,aggregate_id: UUID— базовые поля для дедупликации и маршрутизации.
Domain Event — механизм, через который агрегат сообщает миру «у меня изменилось состояние», не зная заранее, кто будет слушать. Без событий все взаимодействия между агрегатами — прямые вызовы, которые порождают жёсткие связи. Раскрытие раздела 4 гайда.
Базовый класс и структура события
R-EVT-1: событие наследует DomainEvent. R-EVT-3: иммутабельно — @dataclass(frozen=True).
# core/shared/building_blocks.py
from dataclasses import dataclass
from datetime import datetime
from uuid import UUID
@dataclass(frozen=True)
class DomainEvent:
event_id: UUID
occurred_at: datetime
aggregate_id: UUID
Что даёт базовый класс:
event_id— уникальный UUID события. Подписчики используют для дедупликации (at-least-once delivery).occurred_at— момент создания события в домене.aggregate_id— id агрегата, сгенерировавшего событие. Outbox-relay и подписчики маршрутизируют по нему.
Конкретное событие добавляет бизнес-контекст:
# core/order/event/order_confirmed.py
from dataclasses import dataclass
from datetime import datetime
from decimal import Decimal
from uuid import UUID
from core.shared.building_blocks import DomainEvent
@dataclass(frozen=True)
class OrderConfirmed(DomainEvent):
customer_id: UUID
total: Decimal
currency: str
confirmed_at: datetime
Имя — глагол в прошедшем времени
R-EVT-2: OrderConfirmed, CustomerRegistered, PaymentReceived. Не ConfirmOrder (команда), не OrderEvent (тавтология).
Почему:
- Событие — факт в прошлом. Императивное имя (
ConfirmOrder) — команда, не факт. - Читается как лог.
OrderCreated → OrderConfirmed → PaymentReceived → OrderShipped— история заказа. - Подписчик понимает семантику.
OrderConfirmed— уже подтверждён, нужно отреагировать.
Суффикс Event не добавляем:
# ПЛОХО
class OrderConfirmedEvent(DomainEvent): ... # тавтология
class ConfirmOrderEvent(DomainEvent): ... # звучит как команда
# ХОРОШО
class OrderConfirmed(DomainEvent): ...
class CustomerRegistered(DomainEvent): ...
class PaymentReceived(DomainEvent): ...
В полях — примитивы и VO, не агрегаты
R-EVT-4, R-EVT-X2: событие несёт бизнес-контекст значениями, не ссылку на изменяемый агрегат.
# ПЛОХО — ссылка на агрегат
@dataclass(frozen=True)
class OrderConfirmed(DomainEvent):
order: "Order" # объект агрегата — R-EVT-X2
# ХОРОШО — плоские примитивы и VO
@dataclass(frozen=True)
class OrderConfirmed(DomainEvent):
customer_id: UUID # ID, не объект Customer
total: Decimal
currency: str
confirmed_at: datetime
Что не так со ссылкой на агрегат:
- Снимок или текущее состояние? К моменту обработки события агрегат мог измениться. «Факт в прошлом» становится «состояние сейчас».
- Сериализация. Сериализовать весь
Order— десятки полей, внутренние Entity, рекурсивные ссылки.Decimal + UUID + datetime— три поля. - Версионирование схемы. Когда
Orderизменит структуру, payload изменится для всех подписчиков. С плоскими полями — стабильный контракт.
Регистрация в корне, публикация через pull_events
R-EVT-5, R-AGG-X4: корень регистрирует события через self._register_event(...) в момент изменения состояния. Репозиторий/UoW забирает через aggregate.pull_events() после save.
Корень агрегата:
# core/order/aggregate/order.py
def confirm(self, clock: Clock) -> None:
if not self._lines:
raise ValueError("cannot confirm empty order")
self._status = OrderStatus.CONFIRMED
self._register_event(
OrderConfirmed(
event_id=uuid4(),
occurred_at=clock.now(),
aggregate_id=self.id.value,
customer_id=self._customer_id.value,
total=self._total().amount,
currency=self._total().currency,
confirmed_at=clock.now(),
)
)
Репозиторий после сохранения:
# adapters/out/persistence/sqlalchemy_order_repository.py
class SqlAlchemyOrderRepository:
async def save(self, order: Order) -> None:
async with self._session.begin():
# 1. сохраняем агрегат
await self._upsert(order)
# 2. забираем события и пишем в outbox (в той же транзакции)
events = order.pull_events()
await self._outbox.write_all(events)
# pull_events() уже очистил список — повторный save не дублирует
pull_events() возвращает список и очищает внутренний список событий. Если транзакция откатилась, следующий вызов save снова запишет те же события (они всё ещё в агрегате, если pull_events не вызывался до коммита).
asyncio.create_task после commit — антипаттерн для критичных эффектов
R-EVT-X4: асинхронный вызов после коммита — Python-аналог Java AFTER_COMMIT. Для критичных эффектов запрещён.
# ПЛОХО — asyncio.create_task после commit для критичного эффекта
async def save(self, order: Order) -> None:
await self._session.commit()
events = order.pull_events()
asyncio.create_task(self._publish(events)) # теряется при падении процесса
# ХОРОШО — Outbox в той же транзакции
async def save(self, order: Order) -> None:
async with self._session.begin():
await self._upsert(order)
events = order.pull_events()
await self._outbox.write_all(events) # атомарно с агрегатом
Что может пойти не так с create_task:
- Процесс упал между commit и publish. Агрегат сохранён, событие потеряно. Склад не уменьшен, деньги не списаны.
- Нет ретраев.
create_taskне персистентная очередь — при ошибке событие исчезает.
Outbox pattern: событие пишется в outbox-таблицу в той же транзакции. Отдельный relay-процесс читает outbox и публикует в Kafka/RabbitMQ с гарантией at-least-once.
asyncio.create_task остаётся приемлемым только для некритичных эффектов: обновить кэш, отправить уведомление (ок если потеряем).
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
| Изменять поля события после создания | R-EVT-X1 | @dataclass(frozen=True), новое событие при изменении |
| Ссылка на агрегат/Entity в событии | R-EVT-X2 | Только UUID, Decimal, str, datetime |
| Регистрировать событие в Handler/репозитории | R-EVT-X3 | _register_event только в методах корня |
asyncio.create_task после commit для критичных эффектов | R-EVT-X4 | Outbox pattern + relay |
Имя-императив (ConfirmOrder, CreateCustomer) | R-EVT-2 | Прошедшее время: OrderConfirmed, CustomerCreated |
Куда дальше
- DDD Tactical → раздел 4. Domain Event — нормативные формулировки
R-EVT-*. - python/aggregate-root.md — где
_register_eventвызывается и почему только в корне. - python/repository.md — где
pull_events()+ публикация через Outbox. - python/value-object.md — VO в полях событий,
@dataclass(frozen=True).