Опирается на правила: R-KFK-EVT-1R-KFK-EVT-4 и R-KFK-EVT-X1R-KFK-EVT-X4 из Kafka Style Guide → раздел 6. Event design.

Важно знать

  • Имя события — глагол в прошедшем времени: OrderConfirmed, PaymentFailed, CustomerRegistered. Не ConfirmOrder (команда), не OrderConfirmation (существительное).
  • Payload: EventID UUID v7, EventType версионированный (order.confirmed.v1), OccurredAt, AggregateID, бизнес-поля.
  • OccurredAt — момент бизнес-факта (commit в БД), не время публикации в брокер.
  • PII не в payload широковещательных топиков — только CustomerID, PII подгружается отдельно.
  • Forward-compatible schema: добавление полей — non-breaking; удаление/переименование — breaking, требует event_type: "order.confirmed.v2".
  • Событие — неизменяемая Go-структура в core/<bc>/event/, конструктор NewXxxEvent — единственная точка создания.
  • Aggregate целиком в payload ломает forward-compat — нужен snapshot с явными полями.
  • Breaking change без версии лишает consumer-ов сигнала для обновления.

Дизайн события — контракт между producer и всеми consumer-ами, текущими и будущими. Любое изменение структуры затрагивает несколько сервисов одновременно. Правила R-KFK-EVT-* делают события читаемыми в логах, стабильными для downstream-систем и безопасными для эволюции.

Имя события — past tense

R-KFK-EVT-1: глагол в прошедшем времени.

КорректноНеверноПочему
OrderConfirmedConfirmOrderкоманда — намерение; событие — свершившийся факт
PaymentFailedPaymentFailure / FailPaymentсуществительное не описывает «что произошло»
CustomerRegisteredCustomerRegistrationфакт регистрации, не процесс
ProductReservedReserveProductкоманда vs событие
OrderCancelledCancelOrderкоманда vs событие

DDD различает три концепта:

  • Command — намерение (ConfirmOrder). Адресован одному сервису, может быть отклонён.
  • Event — факт (OrderConfirmed). Прошлое нельзя отменить, только компенсировать. Адресован всем заинтересованным.
  • Query — запрос данных (GetOrderByID).

Kafka-топики несут события. Имя в прошедшем времени — сигнал «это факт, реагируйте при необходимости».

Payload — обязательные поля

R-KFK-EVT-2: метаданные + бизнес-данные.

// core/order/event/order_confirmed.go

type OrderConfirmedEvent struct {
    EventID     string    `json:"event_id"`      // UUID v7
    EventType   string    `json:"event_type"`    // "order.confirmed.v1"
    OccurredAt  time.Time `json:"occurred_at"`   // момент факта, не публикации
    AggregateID string    `json:"aggregate_id"`  // order id
    CustomerID  string    `json:"customer_id"`
    TotalAmount int64     `json:"total_amount"`  // минорные единицы (копейки), не float64
    Items       []OrderItemSnapshot `json:"items"`
}

type OrderItemSnapshot struct {
    ProductID string `json:"product_id"`
    Quantity  int    `json:"quantity"`
    Price     int64  `json:"price"`
}

func NewOrderConfirmedEvent(order Order) OrderConfirmedEvent {
    return OrderConfirmedEvent{
        EventID:     newUUIDv7(),
        EventType:   "order.confirmed.v1",
        OccurredAt:  order.ConfirmedAt,
        AggregateID: order.ID,
        CustomerID:  order.CustomerID,
        TotalAmount: order.TotalAmount,
        Items:       snapshotItems(order.Items),
    }
}
ПолеНазначение
EventIDUUID v7 — уникален, возрастает хронологически, используется для dedup на consumer-side
EventType<aggregate>.<event>.v<N> — для routing и schema-evolution
OccurredAtКогда произошёл бизнес-факт (commit в БД), не когда брокер записал сообщение
AggregateIDID агрегата — partition key + dedup + маршрутизация
Бизнес-полятолько то, что нужно consumer-у — CustomerID, TotalAmount, Items

Разница OccurredAt и Kafka-timestamp:

  • Kafka-timestamp — когда брокер принял сообщение. Может отставать от реального события на секунды, если outbox-relay лагал.
  • OccurredAtorder.ConfirmedAt — момент commit в БД, реальное бизнес-время.

Для аналитики и distributed tracing поле OccurredAt критично: именно оно отражает хронологию бизнес-фактов.

Деньги — int64 в минорных единицах, не float64. Плавающая точка накапливает погрешность при сериализации JSON; для суммы в 100 рублей используй 10000 (копейки).

Forward-compatible schema

R-KFK-EVT-3: какие изменения безопасны.

ИзменениеBreaking?Что делать
Добавить новое полеНетдобавить с json:",omitempty" — consumer игнорирует неизвестное
Удалить полеДановый event_type: "order.confirmed.v2"
Переименовать полеДановый event_type: "order.confirmed.v2"
Изменить тип поляДановый event_type: "order.confirmed.v2"
Изменить семантику без переименованияОпасноновое имя поля + новый eventType

encoding/json по умолчанию игнорирует неизвестные поля при десериализации — это позволяет добавлять поля без поломки существующих consumer-ов.

При breaking change producer публикует обе версии в один топик в течение переходного периода:

// core/order/event/order_confirmed_v2.go

type OrderConfirmedEventV2 struct {
    EventID     string    `json:"event_id"`
    EventType   string    `json:"event_type"`    // "order.confirmed.v2"
    OccurredAt  time.Time `json:"occurred_at"`
    AggregateID string    `json:"aggregate_id"`
    CustomerID  string    `json:"customer_id"`
    GrossAmount int64     `json:"gross_amount"`  // переименовано из total_amount
    NetAmount   int64     `json:"net_amount"`    // новое поле
    TaxAmount   int64     `json:"tax_amount"`    // новое поле
    Items       []OrderItemSnapshot `json:"items"`
}
// infra/kafka/outbox_relay.go — parallel publish во время transition

func (r *OutboxRelay) buildMessages(e OutboxEvent) []kafka.Message {
    msgs := []kafka.Message{
        {Topic: topicFor(e.EventType), Key: []byte(e.AggregateID), Value: e.Payload},
    }
    // transition period: публикуем и v2 пока все consumer не переключились
    if e.EventType == "order.confirmed.v1" {
        v2payload := upgradeToV2(e.Payload)
        msgs = append(msgs, kafka.Message{
            Topic: topicFor("order.confirmed.v2"),
            Key:   []byte(e.AggregateID),
            Value: v2payload,
        })
    }
    return msgs
}

Consumer-ы переключаются с v1 на v2 постепенно; после миграции всех — producer перестаёт публиковать v1.

Структура события в core/

R-KFK-EVT-4: событие — неизменяемая Go-структура в core/<bc>/event/.

core/
  order/
    order.go
    event/
      order_created.go
      order_confirmed.go
      order_cancelled.go
  product/
    event/
      product_reserved.go
      product_released.go
  customer/
    event/
      customer_registered.go
      customer_blocked.go

Пакет core/<bc>/event/ не импортирует kafka-go, pgx, chi или любой другой инфраструктурный пакет — только стандартная библиотека и time. Инфраструктурные слои (infra/kafka/, adapters/in/kafka/) импортируют события для сериализации и десериализации.

Конструктор NewXxxEvent — единственная точка создания. Это гарантирует, что EventID всегда заполнен, EventType всегда корректен, OccurredAt всегда указывает на реальный момент факта.

// core/customer/event/customer_registered.go

type CustomerRegisteredEvent struct {
    EventID    string    `json:"event_id"`
    EventType  string    `json:"event_type"`
    OccurredAt time.Time `json:"occurred_at"`
    CustomerID string    `json:"customer_id"`
    Segment    string    `json:"segment"`
}

func NewCustomerRegisteredEvent(c Customer) CustomerRegisteredEvent {
    return CustomerRegisteredEvent{
        EventID:    newUUIDv7(),
        EventType:  "customer.registered.v1",
        OccurredAt: c.RegisteredAt,
        CustomerID: c.ID,
        Segment:    c.Segment,
    }
}

PII в широковещательных топиках

R-KFK-EVT-X3: топик orders.confirmed читают billing, notifications, analytics, fraud-detection. Если в payload customer_email и customer_phone — все четыре сервиса получают PII.

// ПЛОХО — PII в широком топике
type OrderConfirmedEvent struct {
    OrderID       string `json:"order_id"`
    CustomerEmail string `json:"customer_email"` // видят все consumer-ы
    CustomerPhone string `json:"customer_phone"`
    TotalAmount   int64  `json:"total_amount"`
}

// ХОРОШО — только CustomerID; PII подгружается через customer-сервис
type OrderConfirmedEvent struct {
    EventID     string `json:"event_id"`
    EventType   string `json:"event_type"`
    OccurredAt  time.Time `json:"occurred_at"`
    OrderID     string `json:"order_id"`
    CustomerID  string `json:"customer_id"`
    TotalAmount int64  `json:"total_amount"`
}

Notification-сервису, которому нужен email для письма, делает HTTP-вызов в customer-сервис: GET /customers/{id}/email. Это даёт точечный доступ и audit-лог вместо массового чтения PII из топика.

Альтернатива — отдельный restricted топик customer.pii с ACL только для notification-сервиса. Подробнее — Security.

Что запрещено

АнтипаттернПравилоЧто взамен
Имя — команда (ConfirmOrder как событие)R-KFK-EVT-X1past tense (OrderConfirmed)
Aggregate целиком в payload (Order struct)R-KFK-EVT-X2snapshot-структура с явными полями
PII (email, phone) в широких топикахR-KFK-EVT-X3только CustomerID, PII по запросу
Breaking change без .v2 в EventTypeR-KFK-EVT-X4новый eventType + параллельная публикация
OccurredAt = время публикации в KafkaR-KFK-EVT-2commit в БД (бизнес-время)
Событие без EventIDR-KFK-EVT-2UUID v7 обязателен
Деньги как float64 в payloadR-KFK-EVT-2int64 в минорных единицах
Событие создаётся вне конструктора NewXxxEventR-KFK-EVT-4конструктор — единственная точка создания

Куда дальше

  • Producer — как событие попадает в Kafka через kafka.Writer.
  • Outbox publishing — event_type и payload в outbox-таблице, relay-горутина.
  • Idempotent consumer — dedup по EventID через processed_event.
  • Consumer — kafka.Reader, manual commit, goroutine-per-consumer идиома.
  • Retry topic + DLQ — retry-топики и DLQ вне основного poll-цикла.
  • Security — PII в restricted топиках, TLS-dialer, ACL.
  • Конфигурация — KafkaConfig через envconfig, статический реестр событий.
  • Observability — traceparent в Kafka headers, promauto-метрики.