Опирается на правила:
R-KFK-EVT-1…R-KFK-EVT-4иR-KFK-EVT-X1…R-KFK-EVT-X4из Kafka Style Guide → раздел 6. Event design.
Важно знать
- Имя события — глагол в прошедшем времени:
OrderConfirmed,PaymentFailed,UserRegistered. НеConfirmOrder(команда), неOrderConfirmation(noun).- Payload:
event_idUUID v7,event_typeверсионированный (order.confirmed.v1),occurred_at,aggregate_type+aggregate_id, бизнес-данные.occurred_at— когда произошло событие (commit в БД), не когда опубликовано в Kafka.- PII не в payload для широковещательных топиков — только
customer_id, full PII по запросу.- Forward-compatible schema: добавление полей — non-breaking; удаление/переименование →
event_type.v2.- Domain event —
@dataclass(frozen=True)вcore/<bc>/domain/event/; Pydantic-модель — на границе consumer для десериализации.- Внутренние объекты (Aggregate, Entity целиком) в payload ломают forward-compat.
- Breaking change без версии — старые consumer'ы перестанут работать.
Дизайн события — контракт между producer и всеми consumer-ами (текущими и будущими). Любое изменение влияет на несколько систем. Правила UCP обеспечивают события читаемыми (в логах), стабильными (для downstream) и версионированными (для эволюции без поломок).
Имя события — past tense
R-KFK-EVT-1: глагол в прошедшем времени.
| Корректно | Неверно | Почему |
|---|---|---|
OrderConfirmed | ConfirmOrder | Команда — намерение; событие — свершившийся факт |
PaymentFailed | PaymentFailure / FailPayment | noun не описывает «что произошло» |
UserRegistered | UserRegistration | факт регистрации, не процесс |
ProductPublished | PublishProduct | команда vs событие |
OrderCancelled | CancelOrder | команда vs событие |
DDD разделяет три концепта:
- Command — намерение что-то сделать (
ConfirmOrder). Адресована конкретному сервису, может быть отклонена. - Event — факт того, что произошло (
OrderConfirmed). Прошлое, его нельзя отменить. Адресован всем заинтересованным. - Query — запрос данных (
GetOrderById).
Kafka topics несут именно events. Имя в прошедшем времени — сигнал «это факт, реагируйте если нужно».
Payload — обязательные поля
R-KFK-EVT-2: метаданные + бизнес-данные.
В Python событие живёт в двух формах:
@dataclass(frozen=True)вcore/— pure domain object без зависимостей на aiokafka/Pydantic.- Pydantic-модель на границе consumer — десериализация + валидация входящего JSON.
# core/order/domain/event/order_confirmed.py
from dataclasses import dataclass
from datetime import datetime
from decimal import Decimal
from uuid import UUID
@dataclass(frozen=True)
class OrderItemSnapshot:
product_id: int
quantity: int
price: Decimal
@dataclass(frozen=True)
class OrderConfirmedEvent:
event_id: UUID
event_type: str
occurred_at: datetime
aggregate_type: str
aggregate_id: int
order_id: int
customer_id: int
total_amount: Decimal
items: tuple[OrderItemSnapshot, ...]
@classmethod
def from_order(cls, order: "Order") -> "OrderConfirmedEvent":
return cls(
event_id=uuid7(),
event_type="order.confirmed.v1",
occurred_at=order.confirmed_at,
aggregate_type="Order",
aggregate_id=order.id,
order_id=order.id,
customer_id=order.customer_id,
total_amount=order.total_amount,
items=tuple(OrderItemSnapshot(
product_id=item.product_id,
quantity=item.quantity,
price=item.price,
) for item in order.items),
)
| Поле | Назначение |
|---|---|
event_id | UUID v7, уникальный, для dedup на consumer-side (R-KFK-IDEM-1) |
event_type | <aggregate>.<event>.v<N>, для routing и schema-evolution |
occurred_at | Когда событие произошло (commit в БД), не когда опубликовано в Kafka |
aggregate_type | Тип агрегата (Order), для маршрутизации |
aggregate_id | ID агрегата, для dedup + partition key |
| Бизнес-поля | order_id, customer_id, total_amount, items — что нужно consumer-у |
Разница occurred_at vs Kafka-timestamp:
- Kafka-timestamp — когда broker записал сообщение (outbox-relay может лагать на секунды).
occurred_at— когда бизнес-факт случился:order.confirmed_at— момент commit в БД.
Для аналитики и distributed tracing occurred_at критичен: он отражает реальное время бизнес-события.
Pydantic-модель на границе consumer
R-KFK-CFG-3: десериализация через явный реестр event_type → Pydantic-модель. Не динамический импорт по строке из payload.
# adapter/in/kafka/schema/order_confirmed_schema.py
from datetime import datetime
from decimal import Decimal
from uuid import UUID
from pydantic import BaseModel
class OrderItemSchema(BaseModel):
product_id: int
quantity: int
price: Decimal
class OrderConfirmedSchema(BaseModel):
event_id: UUID
event_type: str
occurred_at: datetime
aggregate_type: str
aggregate_id: int
order_id: int
customer_id: int
total_amount: Decimal
items: list[OrderItemSchema]
model_config = {"extra": "ignore"} # forward-compat: unknown fields ignored
extra = "ignore" (аналог Jackson default) — новые поля от producer-а игнорируются без ошибки, что обеспечивает forward-compat при additive-изменениях.
Реестр десериализации:
# adapter/in/kafka/event_registry.py
from pydantic import BaseModel
from .schema.order_confirmed_schema import OrderConfirmedSchema
from .schema.payment_failed_schema import PaymentFailedSchema
EVENT_REGISTRY: dict[str, type[BaseModel]] = {
"order.confirmed.v1": OrderConfirmedSchema,
"payment.failed.v1": PaymentFailedSchema,
}
def deserialize(raw: dict) -> BaseModel:
event_type = raw.get("event_type")
schema_cls = EVENT_REGISTRY.get(event_type)
if schema_cls is None:
raise ValueError(f"Unknown event_type: {event_type}")
return schema_cls.model_validate(raw)
Статический реестр — единственный допустимый способ. importlib.import_module по строке из payload — RCE-риск (аналог trusted.packages: '*' в Java).
Forward-compatible schema
R-KFK-EVT-3: какие изменения safe.
| Изменение | Breaking? | Что делать |
|---|---|---|
| Добавить новое поле | Нет | Добавить, extra="ignore" защитит старых consumer'ов |
| Сделать optional поле required | Да | Новый event_type.v2 |
| Удалить поле | Да | Новый event_type.v2 |
| Переименовать поле | Да | Новый event_type.v2 |
| Изменить тип поля | Да | Новый event_type.v2 |
| Изменить семантику без переименования | Опасно | Новое имя поля + новый event_type |
При breaking change — две версии события параллельно:
# core/order/domain/event/order_confirmed.py
@dataclass(frozen=True)
class OrderConfirmedEventV1:
event_id: UUID
event_type: str # "order.confirmed.v1"
occurred_at: datetime
order_id: int
customer_id: int
total_amount: Decimal # v1: единая сумма
@dataclass(frozen=True)
class OrderConfirmedEventV2:
event_id: UUID
event_type: str # "order.confirmed.v2"
occurred_at: datetime
order_id: int
customer_id: int
gross_amount: Decimal # v2: разбивка суммы
net_amount: Decimal
tax_amount: Decimal
Producer пишет обе версии в outbox в одной транзакции:
async def confirm_order(self, order_id: int, uow: UnitOfWork) -> None:
async with uow:
order = await uow.orders.get(order_id)
order.confirm()
uow.outbox.append(OrderConfirmedEventV1.from_order(order))
uow.outbox.append(OrderConfirmedEventV2.from_order(order))
Consumer'ы постепенно переключаются с v1 на v2. После миграции всех — producer перестаёт писать v1.
Domain event в core/
R-KFK-EVT-4: размещение в core без зависимостей на инфраструктуру.
core/
order/
domain/
order.py
order_item.py
event/
order_created.py
order_confirmed.py
order_cancelled.py
product/
domain/
event/
product_published.py
product_archived.py
customer/
domain/
event/
customer_registered.py
core/ — без зависимостей на aiokafka, Pydantic, SQLAlchemy. Чистый Python + DDD building blocks.
- Outbox-relay импортирует
@dataclassдля сериализации в JSON перед записью в БД. - Consumer-adapter импортирует Pydantic-схему с границы для валидации входящего payload.
- Mapper преобразует Pydantic-схему в
@dataclassперед вызовом domain handler.
Что запрещено
Имя — команда
R-KFK-EVT-X1: ConfirmOrder в топике orders.confirmed создаёт путаницу.
Consumer видит ConfirmOrder и думает «это команда, кто-то просит подтвердить». Событие же означает «уже подтверждён — реагируй». Реакция будет неверной.
Aggregate целиком в payload
R-KFK-EVT-X2:
# ПЛОХО — внутренняя структура Order ломает forward-compat
@dataclass(frozen=True)
class OrderConfirmedEvent:
event_id: UUID
order: Order # целиком aggregate
Что ломается:
- Любое изменение
Order— breaking change для всех consumer'ов. Orderможет содержать sensitive поля (customerс email, phone).- ORM-объект с lazy-relations: сериализация потянет лишние данные или упадёт вне сессии.
Корректно — snapshot:
@dataclass(frozen=True)
class OrderConfirmedEvent:
event_id: UUID
order_id: int
customer_id: int
total_amount: Decimal
items: tuple[OrderItemSnapshot, ...]
@dataclass(frozen=True)
class OrderItemSnapshot:
product_id: int
quantity: int
price: Decimal
Payload — это проекция агрегата, специально для consumer-а. Только то, что нужно.
PII в широковещательных топиках
R-KFK-EVT-X3: orders.confirmed слушают billing, notification, analytics, fraud-detection. Если в payload customer_email, customer_phone — все четыре consumer'а видят PII.
# ПЛОХО — email в широком топике
@dataclass(frozen=True)
class OrderConfirmedEvent:
order_id: int
customer_email: str # PII
customer_phone: str # PII
total_amount: Decimal
# ХОРОШО — только customer_id, PII подгружается через customer-service по необходимости
@dataclass(frozen=True)
class OrderConfirmedEvent:
order_id: int
customer_id: int
total_amount: Decimal
Notification-service, которому нужен email для отправки письма, делает HTTP-вызов в customer-service: GET /customers/{id}/email. Это даёт точечный доступ и audit log.
Альтернатива — отдельный restricted топик customer.pii с ACL только для notification-service. См. Security.
Breaking change без версии
R-KFK-EVT-X4: переименовали поле, не сменили event_type.
# БЫЛО
@dataclass(frozen=True)
class OrderConfirmedEvent:
order_id: int
total_amount: Decimal # v1
# СТАЛО — breaking, но event_type остался "order.confirmed.v1"
@dataclass(frozen=True)
class OrderConfirmedEvent:
order_id: int
gross_amount: Decimal # breaking rename
Старые consumer'ы маппят total_amount — поле отсутствует → None → ValidationError или неверный расчёт. Без смены event_type нет сигнала «обновитесь».
Любой breaking change: новый event_type (order.confirmed.v2), параллельная публикация v1 и v2, миграция consumer'ов, удаление v1.
Что запрещено — таблица
| Антипаттерн | Правило | Что взамен |
|---|---|---|
Имя — команда (ConfirmOrder как event) | R-KFK-EVT-X1 | past tense (OrderConfirmed) |
| Aggregate целиком в payload | R-KFK-EVT-X2 | snapshot @dataclass с явными полями |
| PII (email, phone) в широких топиках | R-KFK-EVT-X3 | только customer_id, PII по запросу |
Breaking change без .v2 | R-KFK-EVT-X4 | новый event_type + parallel publish |
occurred_at = время публикации в Kafka | R-KFK-EVT-2 | commit в БД (business time) |
Событие без event_id | R-KFK-EVT-2 | UUID v7 обязательно |
Event как dict или mutable dataclass | R-KFK-EVT-4 | @dataclass(frozen=True) |
Десериализация через importlib по строке | R-KFK-CFG-X1 | статический EVENT_REGISTRY |
Event в adapter/, не в core/ | R-KFK-EVT-4 | core/<bc>/domain/event/ |
Куда дальше
- Kafka → раздел 6. Event design — нормативные формулировки правил.
- Producer — как event попадает в Kafka через aiokafka.
- Outbox publishing —
event_typeиpayloadв outbox-таблице. - Idempotent consumer — dedup по
event_id. - Consumer — manual commit и обработка в asyncio.
- Retry topic + DLQ — retry-топики и DLQ без blocking retry.
- Observability —
traceparentв Kafka headers. - Security — PII в restricted топиках, TLS, ACL.
- Конфигурация —
KafkaSettingsчерез pydantic-settings.