Опирается на правила:
R-KFK-PROD-1…R-KFK-PROD-4иR-KFK-PROD-X1…R-KFK-PROD-X4из Kafka Style Guide → раздел 1. Producer.
Важно знать
RequiredAcks: kafka.RequireAllвсегда. Вkafka-goнет флагаenable.idempotence;RequireAll+MaxAttempts: math.MaxInt32дают эквивалентную семантику — broker подтверждает после репликации, producer retry-ит до победы.Balancer: &kafka.Hash{}— маршрутизация по ключу.RoundRobinдля бизнес-событий запрещён.- Partition key обязателен для всех бизнес-событий. Дефолтный ключ — aggregate id (
[]byte(order.ID)).- JSON-сериализация (
encoding/json) по умолчанию. Ключ —[]byte(aggregateID).writer.WriteMessagesиз UseCase Handler для domain events — запрещён. События идут через outbox-relay.RequiredAcks: kafka.RequireNone/kafka.RequireOne— никогда в проде.- Kafka не XA — публикация в одной горутине с
pgx.Txбез outbox приводит к расхождению при partial failure.
Kafka producer — точка, где сервис «публикует факт» во внешний мир. Ошибка здесь — потерянное событие или дубликат, который downstream не отличит от настоящего. Правила R-KFK-PROD-* формулируют требования так, чтобы producer всегда был надёжным на partition и атомарным с DB через outbox.
kafka.Writer — конструктор и обязательные поля
R-KFK-PROD-1: kafka-go не имеет флага enable.idempotence как в Java-клиенте. Эквивалентная семантика достигается сочетанием трёх полей:
// infra/kafka/writer.go
package kafka
import (
"math"
"github.com/segmentio/kafka-go"
)
func NewOrderWriter(cfg KafkaConfig) *kafka.Writer {
return &kafka.Writer{
Addr: kafka.TCP(cfg.Brokers...),
Topic: cfg.Topics.OrdersConfirmed,
Balancer: &kafka.Hash{}, // R-KFK-PROD-2: маршрутизация по ключу
RequiredAcks: kafka.RequireAll, // R-KFK-PROD-1/X2: acks=all
MaxAttempts: math.MaxInt32, // R-KFK-PROD-1: retries≈∞
}
}
Что даёт каждое поле:
RequiredAcks: kafka.RequireAll— broker подтверждает только после репликации наmin.insync.replicasфолловеров. Без этого потеря leader-а между write и репликацией = потеря данных.MaxAttempts: math.MaxInt32— producer retry-ит до успеха или отмены контекста. Без этого временная недоступность broker-а = потерянное событие.Balancer: &kafka.Hash{}— маршрутизирует сообщение на partition по хешуKey. Без этого или сRoundRobinordering нарушается.
Для топиков с разными настройками создаётся отдельный kafka.Writer через NewProductWriter, NewPaymentWriter и т. д. — один writer на topic.
Partition key — обязателен
R-KFK-PROD-2: ключ определяет, на какой partition уйдёт сообщение.
// infra/kafka/order_producer.go
package kafka
import (
"context"
"encoding/json"
"fmt"
"github.com/segmentio/kafka-go"
)
type OrderEventProducer struct {
writer *kafka.Writer
}
func NewOrderEventProducer(w *kafka.Writer) *OrderEventProducer {
return &OrderEventProducer{writer: w}
}
func (p *OrderEventProducer) PublishConfirmed(ctx context.Context, evt OrderConfirmedEvent) error {
payload, err := json.Marshal(evt)
if err != nil {
return fmt.Errorf("marshal OrderConfirmed %s: %w", evt.OrderID, err)
}
msg := kafka.Message{
Key: []byte(evt.OrderID), // R-KFK-PROD-2: ключ = aggregate id
Value: payload, // R-KFK-PROD-3: JSON
}
if err := p.writer.WriteMessages(ctx, msg); err != nil {
return fmt.Errorf("publish OrderConfirmed %s: %w", evt.OrderID, err)
}
return nil
}
Дефолтный ключ — []byte(order.ID) (aggregate id). Это гарантирует: все события одного order.id уходят на один partition, а внутри partition Kafka сохраняет порядок.
Сценарий поломки без ключа:
OrderCreated(orderID=SBR-4201)→ partition 0.OrderConfirmed(orderID=SBR-4201)→ partition 3 (round-robin).- Consumer partition 3 обрабатывает
OrderConfirmedдо того, как consumer partition 0 обработалOrderCreated. - Downstream пытается подтвердить несуществующий заказ — ошибка валидации.
Правило: для каждого aggregate — стабильный ключ. ProductID, CustomerID, OrderID — в зависимости от aggregate-root события.
JSON по умолчанию
R-KFK-PROD-3: сериализация через encoding/json, ключ — []byte(aggregateID).
// core/order/event/order_confirmed.go
package event
import "time"
type OrderConfirmedEvent struct {
EventID string `json:"event_id"` // UUID v7
OccurredAt time.Time `json:"occurred_at"`
OrderID string `json:"order_id"` // aggregate id
CustomerID string `json:"customer_id"`
TotalRub int64 `json:"total_rub"` // копейки, не float64
EventType string `json:"event_type"` // "OrderConfirmed"
}
func NewOrderConfirmedEvent(order Order) OrderConfirmedEvent {
return OrderConfirmedEvent{
EventID: newUUIDv7(),
OccurredAt: time.Now().UTC(),
OrderID: order.ID,
CustomerID: order.CustomerID,
TotalRub: order.TotalKopecks,
EventType: "OrderConfirmed",
}
}
JSON прост в дебаге (kafka-console-consumer показывает читаемый payload), не требует Schema Registry. Avro/Protobuf — для high-throughput топиков с миллиардами событий в сутки; требует Schema Registry и дополнительной инфры.
Деньги — int64 (минорные единицы), никогда float64. float64 теряет точность на больших суммах — классический источник расхождений в финансовых системах.
Не WriteMessages из UseCase Handler
R-KFK-PROD-4: domain events публикуются через outbox-relay, не прямым writer.WriteMessages.
Неправильно — Kafka и DB не атомарны:
// core/order/handler.go — ОШИБКА
func (h *ConfirmOrderHandler) Handle(ctx context.Context, cmd ConfirmOrderCommand) error {
tx, err := h.db.Begin(ctx)
if err != nil {
return fmt.Errorf("begin: %w", err)
}
defer tx.Rollback(ctx)
order, err := h.orders.GetForUpdate(ctx, tx, cmd.OrderID)
if err != nil {
return err
}
order.Confirm()
if err := h.orders.Save(ctx, tx, order); err != nil {
return err
}
if err := tx.Commit(ctx); err != nil {
return fmt.Errorf("commit: %w", err)
}
// ОШИБКА: publish ПОСЛЕ commit — если здесь упадём, событие потеряно
evt := NewOrderConfirmedEvent(order)
return h.producer.PublishConfirmed(ctx, evt)
}
Сценарии поломки:
Commitпрошёл, процесс упал доPublishConfirmed— заказ подтверждён, downstream ничего не знает.PublishConfirmedпрошёл,Commitупал сdeadlock— событие опубликовано, в БД заказ не подтверждён.
Правильно — через outbox в той же транзакции:
// core/order/handler.go
func (h *ConfirmOrderHandler) Handle(ctx context.Context, cmd ConfirmOrderCommand) error {
tx, err := h.db.Begin(ctx)
if err != nil {
return fmt.Errorf("begin: %w", err)
}
defer tx.Rollback(ctx)
order, err := h.orders.GetForUpdate(ctx, tx, cmd.OrderID)
if err != nil {
return err
}
order.Confirm()
if err := h.orders.Save(ctx, tx, order); err != nil {
return err
}
// outbox-запись в той же транзакции — атомарность гарантирована PG
evt := NewOrderConfirmedEvent(order)
payload, err := json.Marshal(evt)
if err != nil {
return fmt.Errorf("marshal event: %w", err)
}
if err := h.outbox.Store(ctx, tx, OutboxEvent{
AggregateID: order.ID,
EventType: "OrderConfirmed",
Payload: payload,
}); err != nil {
return fmt.Errorf("outbox store: %w", err)
}
return tx.Commit(ctx)
}
Запись в outbox идёт в той же pgx.Tx, что Save. Атомарность гарантирована PostgreSQL. Отдельный OutboxRelay читает unpublished events через FOR UPDATE SKIP LOCKED и публикует через kafka.Writer. Подробнее — Outbox publishing.
Допустимый прямой WriteMessages
Прямой writer.WriteMessages без outbox допустим только когда нет транзакционного контекста:
- Технические audit-events (в дополнение к
audit_log-таблице). - Сигналы мониторинга / метрики.
- Команды другим сервисам из admin-инструмента без DB-операции.
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
RequiredAcks: kafka.RequireNone или kafka.RequireOne | R-KFK-PROD-X2 | kafka.RequireAll всегда |
Balancer: &kafka.RoundRobin{} для бизнес-событий | R-KFK-PROD-X2 | &kafka.Hash{} |
Key: nil в kafka.Message для business events | R-KFK-PROD-X3 | []byte(aggregateID) |
writer.WriteMessages из Handler совместно с pgx.Tx | R-KFK-PROD-X4 | outbox pattern |
Publish сразу после tx.Commit без outbox | R-KFK-OBX-X2 | outbox-relay |
MaxAttempts: 0 (дефолт — 10) в проде | R-KFK-PROD-1 | math.MaxInt32 |
Aggregate-структура целиком в Value | R-KFK-EVT-X2 | структура с явными полями, конструктор |
Куда дальше
- Конфигурация —
KafkaConfigчерезenvconfig, проверка топиков на старте. - Outbox publishing — relay-горутина,
FOR UPDATE SKIP LOCKED, batch 10–50. - Event design — payload format,
EventID(UUID v7), forward-compat. - Consumer —
CommitInterval: 0, manual commit,StartOffset: kafka.FirstOffset. - Idempotent consumer —
processed_event, dedup в одной транзакции. - Retry topic + DLQ — retry-топики вне poll-цикла,
isTransient, DLQ. - Observability —
promauto,traceparentв headers, consumer lag alerts. - Security — TLS-dialer, per-service
ClientID, PII restricted-топики.