Опирается на правила: R-KFK-EVT-1R-KFK-EVT-4 и R-KFK-EVT-X1R-KFK-EVT-X4 из Kafka Style Guide → раздел 6. Event design.

Важно знать

  • Имя события — глагол в прошедшем времени: OrderConfirmed, PaymentFailed, UserRegistered. Не ConfirmOrder (команда), не OrderConfirmation (noun).
  • Payload: event_id UUID v7, event_type версионированный (order.confirmed.v1), occurred_at, aggregate_type + aggregate_id, бизнес-данные.
  • occurred_at — когда произошло событие (commit в БД), не когда опубликовано в Kafka.
  • PII не в payload для широковещательных топиков — только customer_id, full PII по запросу.
  • Forward-compatible schema: добавление полей — non-breaking; удаление/переименование → event_type.v2.
  • Domain event@dataclass(frozen=True) в core/<bc>/domain/event/; Pydantic-модель — на границе consumer для десериализации.
  • Внутренние объекты (Aggregate, Entity целиком) в payload ломают forward-compat.
  • Breaking change без версии — старые consumer'ы перестанут работать.

Дизайн события — контракт между producer и всеми consumer-ами (текущими и будущими). Любое изменение влияет на несколько систем. Правила UCP обеспечивают события читаемыми (в логах), стабильными (для downstream) и версионированными (для эволюции без поломок).

Имя события — past tense

R-KFK-EVT-1: глагол в прошедшем времени.

КорректноНеверноПочему
OrderConfirmedConfirmOrderКоманда — намерение; событие — свершившийся факт
PaymentFailedPaymentFailure / FailPaymentnoun не описывает «что произошло»
UserRegisteredUserRegistrationфакт регистрации, не процесс
ProductPublishedPublishProductкоманда vs событие
OrderCancelledCancelOrderкоманда vs событие

DDD разделяет три концепта:

  • Command — намерение что-то сделать (ConfirmOrder). Адресована конкретному сервису, может быть отклонена.
  • Event — факт того, что произошло (OrderConfirmed). Прошлое, его нельзя отменить. Адресован всем заинтересованным.
  • Query — запрос данных (GetOrderById).

Kafka topics несут именно events. Имя в прошедшем времени — сигнал «это факт, реагируйте если нужно».

Payload — обязательные поля

R-KFK-EVT-2: метаданные + бизнес-данные.

В Python событие живёт в двух формах:

  • @dataclass(frozen=True) в core/ — pure domain object без зависимостей на aiokafka/Pydantic.
  • Pydantic-модель на границе consumer — десериализация + валидация входящего JSON.
# core/order/domain/event/order_confirmed.py
from dataclasses import dataclass
from datetime import datetime
from decimal import Decimal
from uuid import UUID


@dataclass(frozen=True)
class OrderItemSnapshot:
    product_id: int
    quantity: int
    price: Decimal


@dataclass(frozen=True)
class OrderConfirmedEvent:
    event_id: UUID
    event_type: str
    occurred_at: datetime
    aggregate_type: str
    aggregate_id: int
    order_id: int
    customer_id: int
    total_amount: Decimal
    items: tuple[OrderItemSnapshot, ...]

    @classmethod
    def from_order(cls, order: "Order") -> "OrderConfirmedEvent":
        return cls(
            event_id=uuid7(),
            event_type="order.confirmed.v1",
            occurred_at=order.confirmed_at,
            aggregate_type="Order",
            aggregate_id=order.id,
            order_id=order.id,
            customer_id=order.customer_id,
            total_amount=order.total_amount,
            items=tuple(OrderItemSnapshot(
                product_id=item.product_id,
                quantity=item.quantity,
                price=item.price,
            ) for item in order.items),
        )
ПолеНазначение
event_idUUID v7, уникальный, для dedup на consumer-side (R-KFK-IDEM-1)
event_type<aggregate>.<event>.v<N>, для routing и schema-evolution
occurred_atКогда событие произошло (commit в БД), не когда опубликовано в Kafka
aggregate_typeТип агрегата (Order), для маршрутизации
aggregate_idID агрегата, для dedup + partition key
Бизнес-поляorder_id, customer_id, total_amount, items — что нужно consumer-у

Разница occurred_at vs Kafka-timestamp:

  • Kafka-timestamp — когда broker записал сообщение (outbox-relay может лагать на секунды).
  • occurred_at — когда бизнес-факт случился: order.confirmed_at — момент commit в БД.

Для аналитики и distributed tracing occurred_at критичен: он отражает реальное время бизнес-события.

Pydantic-модель на границе consumer

R-KFK-CFG-3: десериализация через явный реестр event_type → Pydantic-модель. Не динамический импорт по строке из payload.

# adapter/in/kafka/schema/order_confirmed_schema.py
from datetime import datetime
from decimal import Decimal
from uuid import UUID
from pydantic import BaseModel


class OrderItemSchema(BaseModel):
    product_id: int
    quantity: int
    price: Decimal


class OrderConfirmedSchema(BaseModel):
    event_id: UUID
    event_type: str
    occurred_at: datetime
    aggregate_type: str
    aggregate_id: int
    order_id: int
    customer_id: int
    total_amount: Decimal
    items: list[OrderItemSchema]

    model_config = {"extra": "ignore"}   # forward-compat: unknown fields ignored

extra = "ignore" (аналог Jackson default) — новые поля от producer-а игнорируются без ошибки, что обеспечивает forward-compat при additive-изменениях.

Реестр десериализации:

# adapter/in/kafka/event_registry.py
from pydantic import BaseModel
from .schema.order_confirmed_schema import OrderConfirmedSchema
from .schema.payment_failed_schema import PaymentFailedSchema

EVENT_REGISTRY: dict[str, type[BaseModel]] = {
    "order.confirmed.v1": OrderConfirmedSchema,
    "payment.failed.v1": PaymentFailedSchema,
}


def deserialize(raw: dict) -> BaseModel:
    event_type = raw.get("event_type")
    schema_cls = EVENT_REGISTRY.get(event_type)
    if schema_cls is None:
        raise ValueError(f"Unknown event_type: {event_type}")
    return schema_cls.model_validate(raw)

Статический реестр — единственный допустимый способ. importlib.import_module по строке из payload — RCE-риск (аналог trusted.packages: '*' в Java).

Forward-compatible schema

R-KFK-EVT-3: какие изменения safe.

ИзменениеBreaking?Что делать
Добавить новое полеНетДобавить, extra="ignore" защитит старых consumer'ов
Сделать optional поле requiredДаНовый event_type.v2
Удалить полеДаНовый event_type.v2
Переименовать полеДаНовый event_type.v2
Изменить тип поляДаНовый event_type.v2
Изменить семантику без переименованияОпасноНовое имя поля + новый event_type

При breaking change — две версии события параллельно:

# core/order/domain/event/order_confirmed.py

@dataclass(frozen=True)
class OrderConfirmedEventV1:
    event_id: UUID
    event_type: str          # "order.confirmed.v1"
    occurred_at: datetime
    order_id: int
    customer_id: int
    total_amount: Decimal    # v1: единая сумма


@dataclass(frozen=True)
class OrderConfirmedEventV2:
    event_id: UUID
    event_type: str          # "order.confirmed.v2"
    occurred_at: datetime
    order_id: int
    customer_id: int
    gross_amount: Decimal    # v2: разбивка суммы
    net_amount: Decimal
    tax_amount: Decimal

Producer пишет обе версии в outbox в одной транзакции:

async def confirm_order(self, order_id: int, uow: UnitOfWork) -> None:
    async with uow:
        order = await uow.orders.get(order_id)
        order.confirm()
        uow.outbox.append(OrderConfirmedEventV1.from_order(order))
        uow.outbox.append(OrderConfirmedEventV2.from_order(order))

Consumer'ы постепенно переключаются с v1 на v2. После миграции всех — producer перестаёт писать v1.

Domain event в core/

R-KFK-EVT-4: размещение в core без зависимостей на инфраструктуру.

core/
  order/
    domain/
      order.py
      order_item.py
      event/
        order_created.py
        order_confirmed.py
        order_cancelled.py
  product/
    domain/
      event/
        product_published.py
        product_archived.py
  customer/
    domain/
      event/
        customer_registered.py

core/ — без зависимостей на aiokafka, Pydantic, SQLAlchemy. Чистый Python + DDD building blocks.

  • Outbox-relay импортирует @dataclass для сериализации в JSON перед записью в БД.
  • Consumer-adapter импортирует Pydantic-схему с границы для валидации входящего payload.
  • Mapper преобразует Pydantic-схему в @dataclass перед вызовом domain handler.

Что запрещено

Имя — команда

R-KFK-EVT-X1: ConfirmOrder в топике orders.confirmed создаёт путаницу.

Consumer видит ConfirmOrder и думает «это команда, кто-то просит подтвердить». Событие же означает «уже подтверждён — реагируй». Реакция будет неверной.

Aggregate целиком в payload

R-KFK-EVT-X2:

# ПЛОХО — внутренняя структура Order ломает forward-compat
@dataclass(frozen=True)
class OrderConfirmedEvent:
    event_id: UUID
    order: Order    # целиком aggregate

Что ломается:

  • Любое изменение Order — breaking change для всех consumer'ов.
  • Order может содержать sensitive поля (customer с email, phone).
  • ORM-объект с lazy-relations: сериализация потянет лишние данные или упадёт вне сессии.

Корректно — snapshot:

@dataclass(frozen=True)
class OrderConfirmedEvent:
    event_id: UUID
    order_id: int
    customer_id: int
    total_amount: Decimal
    items: tuple[OrderItemSnapshot, ...]


@dataclass(frozen=True)
class OrderItemSnapshot:
    product_id: int
    quantity: int
    price: Decimal

Payload — это проекция агрегата, специально для consumer-а. Только то, что нужно.

PII в широковещательных топиках

R-KFK-EVT-X3: orders.confirmed слушают billing, notification, analytics, fraud-detection. Если в payload customer_email, customer_phone — все четыре consumer'а видят PII.

# ПЛОХО — email в широком топике
@dataclass(frozen=True)
class OrderConfirmedEvent:
    order_id: int
    customer_email: str    # PII
    customer_phone: str    # PII
    total_amount: Decimal
# ХОРОШО — только customer_id, PII подгружается через customer-service по необходимости
@dataclass(frozen=True)
class OrderConfirmedEvent:
    order_id: int
    customer_id: int
    total_amount: Decimal

Notification-service, которому нужен email для отправки письма, делает HTTP-вызов в customer-service: GET /customers/{id}/email. Это даёт точечный доступ и audit log.

Альтернатива — отдельный restricted топик customer.pii с ACL только для notification-service. См. Security.

Breaking change без версии

R-KFK-EVT-X4: переименовали поле, не сменили event_type.

# БЫЛО
@dataclass(frozen=True)
class OrderConfirmedEvent:
    order_id: int
    total_amount: Decimal   # v1

# СТАЛО — breaking, но event_type остался "order.confirmed.v1"
@dataclass(frozen=True)
class OrderConfirmedEvent:
    order_id: int
    gross_amount: Decimal   # breaking rename

Старые consumer'ы маппят total_amount — поле отсутствует → NoneValidationError или неверный расчёт. Без смены event_type нет сигнала «обновитесь».

Любой breaking change: новый event_type (order.confirmed.v2), параллельная публикация v1 и v2, миграция consumer'ов, удаление v1.

Что запрещено — таблица

АнтипаттернПравилоЧто взамен
Имя — команда (ConfirmOrder как event)R-KFK-EVT-X1past tense (OrderConfirmed)
Aggregate целиком в payloadR-KFK-EVT-X2snapshot @dataclass с явными полями
PII (email, phone) в широких топикахR-KFK-EVT-X3только customer_id, PII по запросу
Breaking change без .v2R-KFK-EVT-X4новый event_type + parallel publish
occurred_at = время публикации в KafkaR-KFK-EVT-2commit в БД (business time)
Событие без event_idR-KFK-EVT-2UUID v7 обязательно
Event как dict или mutable dataclassR-KFK-EVT-4@dataclass(frozen=True)
Десериализация через importlib по строкеR-KFK-CFG-X1статический EVENT_REGISTRY
Event в adapter/, не в core/R-KFK-EVT-4core/<bc>/domain/event/

Куда дальше

  • Kafka → раздел 6. Event design — нормативные формулировки правил.
  • Producer — как event попадает в Kafka через aiokafka.
  • Outbox publishing — event_type и payload в outbox-таблице.
  • Idempotent consumer — dedup по event_id.
  • Consumer — manual commit и обработка в asyncio.
  • Retry topic + DLQ — retry-топики и DLQ без blocking retry.
  • Observability — traceparent в Kafka headers.
  • Security — PII в restricted топиках, TLS, ACL.
  • Конфигурация — KafkaSettings через pydantic-settings.