Опирается на правила: R-KFK-PROD-1R-KFK-PROD-4 и R-KFK-PROD-X1R-KFK-PROD-X4 из Kafka Style Guide → раздел 1. Producer.

Важно знать

  • RequiredAcks: kafka.RequireAll всегда. В kafka-go нет флага enable.idempotence; RequireAll + MaxAttempts: math.MaxInt32 дают эквивалентную семантику — broker подтверждает после репликации, producer retry-ит до победы.
  • Balancer: &kafka.Hash{} — маршрутизация по ключу. RoundRobin для бизнес-событий запрещён.
  • Partition key обязателен для всех бизнес-событий. Дефолтный ключ — aggregate id ([]byte(order.ID)).
  • JSON-сериализация (encoding/json) по умолчанию. Ключ — []byte(aggregateID).
  • writer.WriteMessages из UseCase Handler для domain events — запрещён. События идут через outbox-relay.
  • RequiredAcks: kafka.RequireNone / kafka.RequireOne — никогда в проде.
  • Kafka не XA — публикация в одной горутине с pgx.Tx без outbox приводит к расхождению при partial failure.

Kafka producer — точка, где сервис «публикует факт» во внешний мир. Ошибка здесь — потерянное событие или дубликат, который downstream не отличит от настоящего. Правила R-KFK-PROD-* формулируют требования так, чтобы producer всегда был надёжным на partition и атомарным с DB через outbox.

kafka.Writer — конструктор и обязательные поля

R-KFK-PROD-1: kafka-go не имеет флага enable.idempotence как в Java-клиенте. Эквивалентная семантика достигается сочетанием трёх полей:

// infra/kafka/writer.go
package kafka

import (
    "math"

    "github.com/segmentio/kafka-go"
)

func NewOrderWriter(cfg KafkaConfig) *kafka.Writer {
    return &kafka.Writer{
        Addr:         kafka.TCP(cfg.Brokers...),
        Topic:        cfg.Topics.OrdersConfirmed,
        Balancer:     &kafka.Hash{},        // R-KFK-PROD-2: маршрутизация по ключу
        RequiredAcks: kafka.RequireAll,     // R-KFK-PROD-1/X2: acks=all
        MaxAttempts:  math.MaxInt32,        // R-KFK-PROD-1: retries≈∞
    }
}

Что даёт каждое поле:

  • RequiredAcks: kafka.RequireAll — broker подтверждает только после репликации на min.insync.replicas фолловеров. Без этого потеря leader-а между write и репликацией = потеря данных.
  • MaxAttempts: math.MaxInt32 — producer retry-ит до успеха или отмены контекста. Без этого временная недоступность broker-а = потерянное событие.
  • Balancer: &kafka.Hash{} — маршрутизирует сообщение на partition по хешу Key. Без этого или с RoundRobin ordering нарушается.

Для топиков с разными настройками создаётся отдельный kafka.Writer через NewProductWriter, NewPaymentWriter и т. д. — один writer на topic.

Partition key — обязателен

R-KFK-PROD-2: ключ определяет, на какой partition уйдёт сообщение.

// infra/kafka/order_producer.go
package kafka

import (
    "context"
    "encoding/json"
    "fmt"

    "github.com/segmentio/kafka-go"
)

type OrderEventProducer struct {
    writer *kafka.Writer
}

func NewOrderEventProducer(w *kafka.Writer) *OrderEventProducer {
    return &OrderEventProducer{writer: w}
}

func (p *OrderEventProducer) PublishConfirmed(ctx context.Context, evt OrderConfirmedEvent) error {
    payload, err := json.Marshal(evt)
    if err != nil {
        return fmt.Errorf("marshal OrderConfirmed %s: %w", evt.OrderID, err)
    }
    msg := kafka.Message{
        Key:   []byte(evt.OrderID),  // R-KFK-PROD-2: ключ = aggregate id
        Value: payload,              // R-KFK-PROD-3: JSON
    }
    if err := p.writer.WriteMessages(ctx, msg); err != nil {
        return fmt.Errorf("publish OrderConfirmed %s: %w", evt.OrderID, err)
    }
    return nil
}

Дефолтный ключ — []byte(order.ID) (aggregate id). Это гарантирует: все события одного order.id уходят на один partition, а внутри partition Kafka сохраняет порядок.

Сценарий поломки без ключа:

  1. OrderCreated(orderID=SBR-4201) → partition 0.
  2. OrderConfirmed(orderID=SBR-4201) → partition 3 (round-robin).
  3. Consumer partition 3 обрабатывает OrderConfirmed до того, как consumer partition 0 обработал OrderCreated.
  4. Downstream пытается подтвердить несуществующий заказ — ошибка валидации.

Правило: для каждого aggregate — стабильный ключ. ProductID, CustomerID, OrderID — в зависимости от aggregate-root события.

JSON по умолчанию

R-KFK-PROD-3: сериализация через encoding/json, ключ — []byte(aggregateID).

// core/order/event/order_confirmed.go
package event

import "time"

type OrderConfirmedEvent struct {
    EventID    string    `json:"event_id"`    // UUID v7
    OccurredAt time.Time `json:"occurred_at"`
    OrderID    string    `json:"order_id"`    // aggregate id
    CustomerID string    `json:"customer_id"`
    TotalRub   int64     `json:"total_rub"`   // копейки, не float64
    EventType  string    `json:"event_type"`  // "OrderConfirmed"
}

func NewOrderConfirmedEvent(order Order) OrderConfirmedEvent {
    return OrderConfirmedEvent{
        EventID:    newUUIDv7(),
        OccurredAt: time.Now().UTC(),
        OrderID:    order.ID,
        CustomerID: order.CustomerID,
        TotalRub:   order.TotalKopecks,
        EventType:  "OrderConfirmed",
    }
}

JSON прост в дебаге (kafka-console-consumer показывает читаемый payload), не требует Schema Registry. Avro/Protobuf — для high-throughput топиков с миллиардами событий в сутки; требует Schema Registry и дополнительной инфры.

Деньги — int64 (минорные единицы), никогда float64. float64 теряет точность на больших суммах — классический источник расхождений в финансовых системах.

Не WriteMessages из UseCase Handler

R-KFK-PROD-4: domain events публикуются через outbox-relay, не прямым writer.WriteMessages.

Неправильно — Kafka и DB не атомарны:

// core/order/handler.go — ОШИБКА
func (h *ConfirmOrderHandler) Handle(ctx context.Context, cmd ConfirmOrderCommand) error {
    tx, err := h.db.Begin(ctx)
    if err != nil {
        return fmt.Errorf("begin: %w", err)
    }
    defer tx.Rollback(ctx)

    order, err := h.orders.GetForUpdate(ctx, tx, cmd.OrderID)
    if err != nil {
        return err
    }
    order.Confirm()

    if err := h.orders.Save(ctx, tx, order); err != nil {
        return err
    }

    if err := tx.Commit(ctx); err != nil {
        return fmt.Errorf("commit: %w", err)
    }

    // ОШИБКА: publish ПОСЛЕ commit — если здесь упадём, событие потеряно
    evt := NewOrderConfirmedEvent(order)
    return h.producer.PublishConfirmed(ctx, evt)
}

Сценарии поломки:

  1. Commit прошёл, процесс упал до PublishConfirmed — заказ подтверждён, downstream ничего не знает.
  2. PublishConfirmed прошёл, Commit упал с deadlock — событие опубликовано, в БД заказ не подтверждён.

Правильно — через outbox в той же транзакции:

// core/order/handler.go
func (h *ConfirmOrderHandler) Handle(ctx context.Context, cmd ConfirmOrderCommand) error {
    tx, err := h.db.Begin(ctx)
    if err != nil {
        return fmt.Errorf("begin: %w", err)
    }
    defer tx.Rollback(ctx)

    order, err := h.orders.GetForUpdate(ctx, tx, cmd.OrderID)
    if err != nil {
        return err
    }
    order.Confirm()

    if err := h.orders.Save(ctx, tx, order); err != nil {
        return err
    }

    // outbox-запись в той же транзакции — атомарность гарантирована PG
    evt := NewOrderConfirmedEvent(order)
    payload, err := json.Marshal(evt)
    if err != nil {
        return fmt.Errorf("marshal event: %w", err)
    }
    if err := h.outbox.Store(ctx, tx, OutboxEvent{
        AggregateID: order.ID,
        EventType:   "OrderConfirmed",
        Payload:     payload,
    }); err != nil {
        return fmt.Errorf("outbox store: %w", err)
    }

    return tx.Commit(ctx)
}

Запись в outbox идёт в той же pgx.Tx, что Save. Атомарность гарантирована PostgreSQL. Отдельный OutboxRelay читает unpublished events через FOR UPDATE SKIP LOCKED и публикует через kafka.Writer. Подробнее — Outbox publishing.

Допустимый прямой WriteMessages

Прямой writer.WriteMessages без outbox допустим только когда нет транзакционного контекста:

  • Технические audit-events (в дополнение к audit_log-таблице).
  • Сигналы мониторинга / метрики.
  • Команды другим сервисам из admin-инструмента без DB-операции.

Что запрещено

АнтипаттернПравилоЧто взамен
RequiredAcks: kafka.RequireNone или kafka.RequireOneR-KFK-PROD-X2kafka.RequireAll всегда
Balancer: &kafka.RoundRobin{} для бизнес-событийR-KFK-PROD-X2&kafka.Hash{}
Key: nil в kafka.Message для business eventsR-KFK-PROD-X3[]byte(aggregateID)
writer.WriteMessages из Handler совместно с pgx.TxR-KFK-PROD-X4outbox pattern
Publish сразу после tx.Commit без outboxR-KFK-OBX-X2outbox-relay
MaxAttempts: 0 (дефолт — 10) в продеR-KFK-PROD-1math.MaxInt32
Aggregate-структура целиком в ValueR-KFK-EVT-X2структура с явными полями, конструктор

Куда дальше

  • Конфигурация — KafkaConfig через envconfig, проверка топиков на старте.
  • Outbox publishing — relay-горутина, FOR UPDATE SKIP LOCKED, batch 10–50.
  • Event design — payload format, EventID (UUID v7), forward-compat.
  • Consumer — CommitInterval: 0, manual commit, StartOffset: kafka.FirstOffset.
  • Idempotent consumer — processed_event, dedup в одной транзакции.
  • Retry topic + DLQ — retry-топики вне poll-цикла, isTransient, DLQ.
  • Observability — promauto, traceparent в headers, consumer lag alerts.
  • Security — TLS-dialer, per-service ClientID, PII restricted-топики.