Опирается на правила: R-EVT-1R-EVT-5 и R-EVT-X1R-EVT-X4 из DDD Tactical Style Guide → раздел 4. Domain Event.

Важно знать

  • Domain Event — факт, который уже произошёл в домене. Имя — глагол в прошедшем времени: OrderConfirmed, CustomerRegistered, не ConfirmOrder и не OrderEvent.
  • Идиома Python: событие наследует DomainEvent и помечено @dataclass(frozen=True). Иммутабельность бесплатно.
  • В полях — примитивы и VO, не агрегаты. UUID, Decimal, str, datetime — да. Order order — нет (R-EVT-X2).
  • Корень регистрирует события через self._register_event(...). Репозиторий/UoW забирает их через aggregate.pull_events() после сохранения, затем публикует.
  • asyncio.create_task(publish_event(...)) после commit — аналог Java AFTER_COMMIT. Для критичных эффектов (деньги, склад) — только Outbox (R-EVT-X4).
  • Суффикс Event не добавляется: OrderConfirmed, не OrderConfirmedEvent.
  • Базовый DomainEvent несёт event_id: UUID, occurred_at: datetime, aggregate_id: UUID — базовые поля для дедупликации и маршрутизации.

Domain Event — механизм, через который агрегат сообщает миру «у меня изменилось состояние», не зная заранее, кто будет слушать. Без событий все взаимодействия между агрегатами — прямые вызовы, которые порождают жёсткие связи. Раскрытие раздела 4 гайда.

Базовый класс и структура события

R-EVT-1: событие наследует DomainEvent. R-EVT-3: иммутабельно — @dataclass(frozen=True).

# core/shared/building_blocks.py
from dataclasses import dataclass
from datetime import datetime
from uuid import UUID


@dataclass(frozen=True)
class DomainEvent:
    event_id: UUID
    occurred_at: datetime
    aggregate_id: UUID

Что даёт базовый класс:

  • event_id — уникальный UUID события. Подписчики используют для дедупликации (at-least-once delivery).
  • occurred_at — момент создания события в домене.
  • aggregate_id — id агрегата, сгенерировавшего событие. Outbox-relay и подписчики маршрутизируют по нему.

Конкретное событие добавляет бизнес-контекст:

# core/order/event/order_confirmed.py
from dataclasses import dataclass
from datetime import datetime
from decimal import Decimal
from uuid import UUID

from core.shared.building_blocks import DomainEvent


@dataclass(frozen=True)
class OrderConfirmed(DomainEvent):
    customer_id: UUID
    total: Decimal
    currency: str
    confirmed_at: datetime

Имя — глагол в прошедшем времени

R-EVT-2: OrderConfirmed, CustomerRegistered, PaymentReceived. Не ConfirmOrder (команда), не OrderEvent (тавтология).

Почему:

  • Событие — факт в прошлом. Императивное имя (ConfirmOrder) — команда, не факт.
  • Читается как лог. OrderCreated → OrderConfirmed → PaymentReceived → OrderShipped — история заказа.
  • Подписчик понимает семантику. OrderConfirmed — уже подтверждён, нужно отреагировать.

Суффикс Event не добавляем:

# ПЛОХО
class OrderConfirmedEvent(DomainEvent): ...   # тавтология
class ConfirmOrderEvent(DomainEvent): ...     # звучит как команда

# ХОРОШО
class OrderConfirmed(DomainEvent): ...
class CustomerRegistered(DomainEvent): ...
class PaymentReceived(DomainEvent): ...

В полях — примитивы и VO, не агрегаты

R-EVT-4, R-EVT-X2: событие несёт бизнес-контекст значениями, не ссылку на изменяемый агрегат.

# ПЛОХО — ссылка на агрегат
@dataclass(frozen=True)
class OrderConfirmed(DomainEvent):
    order: "Order"   # объект агрегата — R-EVT-X2

# ХОРОШО — плоские примитивы и VO
@dataclass(frozen=True)
class OrderConfirmed(DomainEvent):
    customer_id: UUID       # ID, не объект Customer
    total: Decimal
    currency: str
    confirmed_at: datetime

Что не так со ссылкой на агрегат:

  • Снимок или текущее состояние? К моменту обработки события агрегат мог измениться. «Факт в прошлом» становится «состояние сейчас».
  • Сериализация. Сериализовать весь Order — десятки полей, внутренние Entity, рекурсивные ссылки. Decimal + UUID + datetime — три поля.
  • Версионирование схемы. Когда Order изменит структуру, payload изменится для всех подписчиков. С плоскими полями — стабильный контракт.

Регистрация в корне, публикация через pull_events

R-EVT-5, R-AGG-X4: корень регистрирует события через self._register_event(...) в момент изменения состояния. Репозиторий/UoW забирает через aggregate.pull_events() после save.

Корень агрегата:

# core/order/aggregate/order.py
def confirm(self, clock: Clock) -> None:
    if not self._lines:
        raise ValueError("cannot confirm empty order")
    self._status = OrderStatus.CONFIRMED
    self._register_event(
        OrderConfirmed(
            event_id=uuid4(),
            occurred_at=clock.now(),
            aggregate_id=self.id.value,
            customer_id=self._customer_id.value,
            total=self._total().amount,
            currency=self._total().currency,
            confirmed_at=clock.now(),
        )
    )

Репозиторий после сохранения:

# adapters/out/persistence/sqlalchemy_order_repository.py
class SqlAlchemyOrderRepository:
    async def save(self, order: Order) -> None:
        async with self._session.begin():
            # 1. сохраняем агрегат
            await self._upsert(order)
            # 2. забираем события и пишем в outbox (в той же транзакции)
            events = order.pull_events()
            await self._outbox.write_all(events)
        # pull_events() уже очистил список — повторный save не дублирует

pull_events() возвращает список и очищает внутренний список событий. Если транзакция откатилась, следующий вызов save снова запишет те же события (они всё ещё в агрегате, если pull_events не вызывался до коммита).

asyncio.create_task после commit — антипаттерн для критичных эффектов

R-EVT-X4: асинхронный вызов после коммита — Python-аналог Java AFTER_COMMIT. Для критичных эффектов запрещён.

# ПЛОХО — asyncio.create_task после commit для критичного эффекта
async def save(self, order: Order) -> None:
    await self._session.commit()
    events = order.pull_events()
    asyncio.create_task(self._publish(events))   # теряется при падении процесса

# ХОРОШО — Outbox в той же транзакции
async def save(self, order: Order) -> None:
    async with self._session.begin():
        await self._upsert(order)
        events = order.pull_events()
        await self._outbox.write_all(events)     # атомарно с агрегатом

Что может пойти не так с create_task:

  • Процесс упал между commit и publish. Агрегат сохранён, событие потеряно. Склад не уменьшен, деньги не списаны.
  • Нет ретраев. create_task не персистентная очередь — при ошибке событие исчезает.

Outbox pattern: событие пишется в outbox-таблицу в той же транзакции. Отдельный relay-процесс читает outbox и публикует в Kafka/RabbitMQ с гарантией at-least-once.

asyncio.create_task остаётся приемлемым только для некритичных эффектов: обновить кэш, отправить уведомление (ок если потеряем).

Что запрещено

АнтипаттернПравилоЧто взамен
Изменять поля события после созданияR-EVT-X1@dataclass(frozen=True), новое событие при изменении
Ссылка на агрегат/Entity в событииR-EVT-X2Только UUID, Decimal, str, datetime
Регистрировать событие в Handler/репозиторииR-EVT-X3_register_event только в методах корня
asyncio.create_task после commit для критичных эффектовR-EVT-X4Outbox pattern + relay
Имя-императив (ConfirmOrder, CreateCustomer)R-EVT-2Прошедшее время: OrderConfirmed, CustomerCreated

Куда дальше

  • DDD Tactical → раздел 4. Domain Event — нормативные формулировки R-EVT-*.
  • python/aggregate-root.md — где _register_event вызывается и почему только в корне.
  • python/repository.md — где pull_events() + публикация через Outbox.
  • python/value-object.md — VO в полях событий, @dataclass(frozen=True).