Опирается на правила:
R-KFK-EVT-1…R-KFK-EVT-4иR-KFK-EVT-X1…R-KFK-EVT-X4из Kafka Style Guide → раздел 6. Event design.
Важно знать
- Имя события — глагол в прошедшем времени:
OrderConfirmed,PaymentFailed,CustomerRegistered. НеConfirmOrder(команда), неOrderConfirmation(существительное).- Payload:
EventIDUUID v7,EventTypeверсионированный (order.confirmed.v1),OccurredAt,AggregateID, бизнес-поля.OccurredAt— момент бизнес-факта (commit в БД), не время публикации в брокер.- PII не в payload широковещательных топиков — только
CustomerID, PII подгружается отдельно.- Forward-compatible schema: добавление полей — non-breaking; удаление/переименование — breaking, требует
event_type: "order.confirmed.v2".- Событие — неизменяемая Go-структура в
core/<bc>/event/, конструкторNewXxxEvent— единственная точка создания.- Aggregate целиком в payload ломает forward-compat — нужен snapshot с явными полями.
- Breaking change без версии лишает consumer-ов сигнала для обновления.
Дизайн события — контракт между producer и всеми consumer-ами, текущими и будущими. Любое изменение структуры затрагивает несколько сервисов одновременно. Правила R-KFK-EVT-* делают события читаемыми в логах, стабильными для downstream-систем и безопасными для эволюции.
Имя события — past tense
R-KFK-EVT-1: глагол в прошедшем времени.
| Корректно | Неверно | Почему |
|---|---|---|
OrderConfirmed | ConfirmOrder | команда — намерение; событие — свершившийся факт |
PaymentFailed | PaymentFailure / FailPayment | существительное не описывает «что произошло» |
CustomerRegistered | CustomerRegistration | факт регистрации, не процесс |
ProductReserved | ReserveProduct | команда vs событие |
OrderCancelled | CancelOrder | команда vs событие |
DDD различает три концепта:
- Command — намерение (
ConfirmOrder). Адресован одному сервису, может быть отклонён. - Event — факт (
OrderConfirmed). Прошлое нельзя отменить, только компенсировать. Адресован всем заинтересованным. - Query — запрос данных (
GetOrderByID).
Kafka-топики несут события. Имя в прошедшем времени — сигнал «это факт, реагируйте при необходимости».
Payload — обязательные поля
R-KFK-EVT-2: метаданные + бизнес-данные.
// core/order/event/order_confirmed.go
type OrderConfirmedEvent struct {
EventID string `json:"event_id"` // UUID v7
EventType string `json:"event_type"` // "order.confirmed.v1"
OccurredAt time.Time `json:"occurred_at"` // момент факта, не публикации
AggregateID string `json:"aggregate_id"` // order id
CustomerID string `json:"customer_id"`
TotalAmount int64 `json:"total_amount"` // минорные единицы (копейки), не float64
Items []OrderItemSnapshot `json:"items"`
}
type OrderItemSnapshot struct {
ProductID string `json:"product_id"`
Quantity int `json:"quantity"`
Price int64 `json:"price"`
}
func NewOrderConfirmedEvent(order Order) OrderConfirmedEvent {
return OrderConfirmedEvent{
EventID: newUUIDv7(),
EventType: "order.confirmed.v1",
OccurredAt: order.ConfirmedAt,
AggregateID: order.ID,
CustomerID: order.CustomerID,
TotalAmount: order.TotalAmount,
Items: snapshotItems(order.Items),
}
}
| Поле | Назначение |
|---|---|
EventID | UUID v7 — уникален, возрастает хронологически, используется для dedup на consumer-side |
EventType | <aggregate>.<event>.v<N> — для routing и schema-evolution |
OccurredAt | Когда произошёл бизнес-факт (commit в БД), не когда брокер записал сообщение |
AggregateID | ID агрегата — partition key + dedup + маршрутизация |
| Бизнес-поля | только то, что нужно consumer-у — CustomerID, TotalAmount, Items |
Разница OccurredAt и Kafka-timestamp:
- Kafka-timestamp — когда брокер принял сообщение. Может отставать от реального события на секунды, если outbox-relay лагал.
OccurredAt—order.ConfirmedAt— момент commit в БД, реальное бизнес-время.
Для аналитики и distributed tracing поле OccurredAt критично: именно оно отражает хронологию бизнес-фактов.
Деньги — int64 в минорных единицах, не float64. Плавающая точка накапливает погрешность при сериализации JSON; для суммы в 100 рублей используй 10000 (копейки).
Forward-compatible schema
R-KFK-EVT-3: какие изменения безопасны.
| Изменение | Breaking? | Что делать |
|---|---|---|
| Добавить новое поле | Нет | добавить с json:",omitempty" — consumer игнорирует неизвестное |
| Удалить поле | Да | новый event_type: "order.confirmed.v2" |
| Переименовать поле | Да | новый event_type: "order.confirmed.v2" |
| Изменить тип поля | Да | новый event_type: "order.confirmed.v2" |
| Изменить семантику без переименования | Опасно | новое имя поля + новый eventType |
encoding/json по умолчанию игнорирует неизвестные поля при десериализации — это позволяет добавлять поля без поломки существующих consumer-ов.
При breaking change producer публикует обе версии в один топик в течение переходного периода:
// core/order/event/order_confirmed_v2.go
type OrderConfirmedEventV2 struct {
EventID string `json:"event_id"`
EventType string `json:"event_type"` // "order.confirmed.v2"
OccurredAt time.Time `json:"occurred_at"`
AggregateID string `json:"aggregate_id"`
CustomerID string `json:"customer_id"`
GrossAmount int64 `json:"gross_amount"` // переименовано из total_amount
NetAmount int64 `json:"net_amount"` // новое поле
TaxAmount int64 `json:"tax_amount"` // новое поле
Items []OrderItemSnapshot `json:"items"`
}
// infra/kafka/outbox_relay.go — parallel publish во время transition
func (r *OutboxRelay) buildMessages(e OutboxEvent) []kafka.Message {
msgs := []kafka.Message{
{Topic: topicFor(e.EventType), Key: []byte(e.AggregateID), Value: e.Payload},
}
// transition period: публикуем и v2 пока все consumer не переключились
if e.EventType == "order.confirmed.v1" {
v2payload := upgradeToV2(e.Payload)
msgs = append(msgs, kafka.Message{
Topic: topicFor("order.confirmed.v2"),
Key: []byte(e.AggregateID),
Value: v2payload,
})
}
return msgs
}
Consumer-ы переключаются с v1 на v2 постепенно; после миграции всех — producer перестаёт публиковать v1.
Структура события в core/
R-KFK-EVT-4: событие — неизменяемая Go-структура в core/<bc>/event/.
core/
order/
order.go
event/
order_created.go
order_confirmed.go
order_cancelled.go
product/
event/
product_reserved.go
product_released.go
customer/
event/
customer_registered.go
customer_blocked.go
Пакет core/<bc>/event/ не импортирует kafka-go, pgx, chi или любой другой инфраструктурный пакет — только стандартная библиотека и time. Инфраструктурные слои (infra/kafka/, adapters/in/kafka/) импортируют события для сериализации и десериализации.
Конструктор NewXxxEvent — единственная точка создания. Это гарантирует, что EventID всегда заполнен, EventType всегда корректен, OccurredAt всегда указывает на реальный момент факта.
// core/customer/event/customer_registered.go
type CustomerRegisteredEvent struct {
EventID string `json:"event_id"`
EventType string `json:"event_type"`
OccurredAt time.Time `json:"occurred_at"`
CustomerID string `json:"customer_id"`
Segment string `json:"segment"`
}
func NewCustomerRegisteredEvent(c Customer) CustomerRegisteredEvent {
return CustomerRegisteredEvent{
EventID: newUUIDv7(),
EventType: "customer.registered.v1",
OccurredAt: c.RegisteredAt,
CustomerID: c.ID,
Segment: c.Segment,
}
}
PII в широковещательных топиках
R-KFK-EVT-X3: топик orders.confirmed читают billing, notifications, analytics, fraud-detection. Если в payload customer_email и customer_phone — все четыре сервиса получают PII.
// ПЛОХО — PII в широком топике
type OrderConfirmedEvent struct {
OrderID string `json:"order_id"`
CustomerEmail string `json:"customer_email"` // видят все consumer-ы
CustomerPhone string `json:"customer_phone"`
TotalAmount int64 `json:"total_amount"`
}
// ХОРОШО — только CustomerID; PII подгружается через customer-сервис
type OrderConfirmedEvent struct {
EventID string `json:"event_id"`
EventType string `json:"event_type"`
OccurredAt time.Time `json:"occurred_at"`
OrderID string `json:"order_id"`
CustomerID string `json:"customer_id"`
TotalAmount int64 `json:"total_amount"`
}
Notification-сервису, которому нужен email для письма, делает HTTP-вызов в customer-сервис: GET /customers/{id}/email. Это даёт точечный доступ и audit-лог вместо массового чтения PII из топика.
Альтернатива — отдельный restricted топик customer.pii с ACL только для notification-сервиса. Подробнее — Security.
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
Имя — команда (ConfirmOrder как событие) | R-KFK-EVT-X1 | past tense (OrderConfirmed) |
Aggregate целиком в payload (Order struct) | R-KFK-EVT-X2 | snapshot-структура с явными полями |
| PII (email, phone) в широких топиках | R-KFK-EVT-X3 | только CustomerID, PII по запросу |
Breaking change без .v2 в EventType | R-KFK-EVT-X4 | новый eventType + параллельная публикация |
OccurredAt = время публикации в Kafka | R-KFK-EVT-2 | commit в БД (бизнес-время) |
Событие без EventID | R-KFK-EVT-2 | UUID v7 обязателен |
Деньги как float64 в payload | R-KFK-EVT-2 | int64 в минорных единицах |
Событие создаётся вне конструктора NewXxxEvent | R-KFK-EVT-4 | конструктор — единственная точка создания |
Куда дальше
- Producer — как событие попадает в Kafka через
kafka.Writer. - Outbox publishing —
event_typeи payload в outbox-таблице, relay-горутина. - Idempotent consumer — dedup по
EventIDчерезprocessed_event. - Consumer —
kafka.Reader, manual commit, goroutine-per-consumer идиома. - Retry topic + DLQ — retry-топики и DLQ вне основного poll-цикла.
- Security — PII в restricted топиках, TLS-dialer, ACL.
- Конфигурация —
KafkaConfigчерезenvconfig, статический реестр событий. - Observability —
traceparentв Kafka headers, promauto-метрики.