Опирается на правила: R-KFK-CFG-1R-KFK-CFG-4 и R-KFK-CFG-X1R-KFK-CFG-X2 из Kafka Rules → раздел 7. Конфигурация.

Важно знать

  • KafkaConfig — типизированная структура с тегами envconfig:"..." required:"true"; никаких строковых os.Getenv по сервису.
  • KAFKA_BROKERS — обязательная переменная среды; хардкод localhost:9092 недопустим даже в коде инициализации.
  • kafka.WriterRequiredAcks: kafka.RequireAll, Balancer: &kafka.Hash{}, MaxAttempts: math.MaxInt32 — в конструкторе, не в точке вызова.
  • kafka.ReaderCommitInterval: 0 (ручной commit), StartOffset: kafka.FirstOffset — явно в ReaderConfig.
  • Десериализация событий — статический map[string]func([]byte) (Event, error), не reflect по строке из payload.
  • Проверка топиков через ReadPartitions на старте; отсутствие ожидаемого топика → fatal.
  • ClientID — per-service, обязателен для ACL-идентификации (см. R-KFK-SEC-2).

В Go нет фреймворка, который собирает Kafka-клиент из YAML автоматически — вся конфигурация живёт в коде. Это даёт контроль, но и создаёт риски: нетипизированные строки, магические числа в разных файлах, молчаливый старт на несуществующем топике. Три идиомы закрывают эти риски: типизированная структура через envconfig, явный реестр десериализаторов и проверка топиков на старте.

KafkaConfig через envconfig

R-KFK-CFG-1: параметры через типизированную структуру с валидацией.

// infra/config/kafka.go
package config

import "github.com/kelseyhightower/envconfig"

type KafkaConfig struct {
    Brokers  []string    `envconfig:"KAFKA_BROKERS"   required:"true"`
    ClientID string      `envconfig:"KAFKA_CLIENT_ID" required:"true"` // R-KFK-SEC-2
    Topics   TopicsConfig
    TLS      TLSConfig
}

type TopicsConfig struct {
    OrdersConfirmed string `envconfig:"KAFKA_TOPIC_ORDERS_CONFIRMED" required:"true"`
    PaymentsFailed  string `envconfig:"KAFKA_TOPIC_PAYMENTS_FAILED"  required:"true"`
    ProductsCreated string `envconfig:"KAFKA_TOPIC_PRODUCTS_CREATED" required:"true"`
}

type TLSConfig struct {
    Enabled  bool   `envconfig:"KAFKA_TLS_ENABLED"   default:"false"`
    CertFile string `envconfig:"KAFKA_TLS_CERT_FILE"`
    KeyFile  string `envconfig:"KAFKA_TLS_KEY_FILE"`
    CAFile   string `envconfig:"KAFKA_TLS_CA_FILE"`
}

func LoadKafkaConfig() (KafkaConfig, error) {
    var cfg KafkaConfig
    if err := envconfig.Process("", &cfg); err != nil {
        return KafkaConfig{}, fmt.Errorf("load kafka config: %w", err)
    }
    return cfg, nil
}

envconfig.Process проверяет все required:"true" при вызове — если KAFKA_BROKERS не задан, LoadKafkaConfig вернёт ошибку ещё в main, до создания какого-либо клиента. Это эквивалент @Validated в Java: fail-fast, не fail-at-first-message.

В main это выглядит так:

// cmd/order-service/main.go
func main() {
    kafkaCfg, err := config.LoadKafkaConfig()
    if err != nil {
        slog.Error("invalid kafka config", "error", err)
        os.Exit(1)
    }

    if err := kafka.ValidateTopics(ctx, kafkaCfg); err != nil {
        slog.Error("kafka topics validation failed", "error", err)
        os.Exit(1)
    }
    // ...
}

Конструктор Writer — producer-параметры

R-KFK-CFG-2: критические параметры producer зафиксированы в конструкторе, а не разбросаны по точкам вызова.

// infra/kafka/writer.go
package kafka

import (
    "math"

    "github.com/segmentio/kafka-go"
)

func NewOrderWriter(cfg config.KafkaConfig) *kafka.Writer {
    return &kafka.Writer{
        Addr:         kafka.TCP(cfg.Brokers...),
        Topic:        cfg.Topics.OrdersConfirmed,
        Balancer:     &kafka.Hash{},         // R-KFK-PROD-2: партиционирование по ключу
        RequiredAcks: kafka.RequireAll,      // R-KFK-PROD-1/X2: acks=all
        MaxAttempts:  math.MaxInt32,         // R-KFK-PROD-1: retries≈∞
        Compression:  kafka.Snappy,
    }
}

func NewProductWriter(cfg config.KafkaConfig) *kafka.Writer {
    return &kafka.Writer{
        Addr:         kafka.TCP(cfg.Brokers...),
        Topic:        cfg.Topics.ProductsCreated,
        Balancer:     &kafka.Hash{},
        RequiredAcks: kafka.RequireAll,
        MaxAttempts:  math.MaxInt32,
    }
}

kafka-go не экспонирует enable.idempotence как флаг — RequireAll + MaxAttempts: math.MaxInt32 дают эквивалентную семантику at-least-once с ordering per-partition. Если нужен exactly-once — kafka.Writer с Transport + Kafka-транзакции (отдельная тема).

Конструктор Reader — consumer-параметры

R-KFK-CFG-2: CommitInterval и StartOffset — явно в ReaderConfig.

// infra/kafka/reader.go
package kafka

import "github.com/segmentio/kafka-go"

func NewOrderConfirmedReader(cfg config.KafkaConfig) *kafka.Reader {
    return kafka.NewReader(kafka.ReaderConfig{
        Brokers:        cfg.Brokers,
        Topic:          cfg.Topics.OrdersConfirmed,
        GroupID:        "billing-order-confirmed",  // R-KFK-CONS-1: <service>-<purpose>
        MinBytes:       1,
        MaxBytes:       10 << 20,                   // 10 МБ
        CommitInterval: 0,                          // R-KFK-CONS-2: ручной commit
        StartOffset:    kafka.FirstOffset,           // R-KFK-CONS-4: earliest
        MaxWait:        500 * time.Millisecond,
    })
}

func NewPaymentFailedReader(cfg config.KafkaConfig) *kafka.Reader {
    return kafka.NewReader(kafka.ReaderConfig{
        Brokers:        cfg.Brokers,
        Topic:          cfg.Topics.PaymentsFailed,
        GroupID:        "order-payment-failed",
        CommitInterval: 0,
        StartOffset:    kafka.FirstOffset,
        MaxWait:        500 * time.Millisecond,
    })
}

CommitInterval: 0 отключает авто-commit полностью — единственный способ продвинуть offset — явный вызов reader.CommitMessages(ctx, msg) после успешной обработки. Подробнее — Consumer.

Реестр десериализаторов

R-KFK-CFG-3: вместо динамического reflect по строке из payload — статический map.

// infra/kafka/registry.go
package kafka

import (
    "encoding/json"
    "fmt"

    "github.com/example/order-service/core/order/event"
    customevent "github.com/example/order-service/core/customer/event"
)

type Event interface {
    EventID() string
}

var eventRegistry = map[string]func([]byte) (Event, error){
    "OrderConfirmed":   unmarshal[event.OrderConfirmedEvent],
    "OrderCancelled":   unmarshal[event.OrderCancelledEvent],
    "PaymentFailed":    unmarshal[event.PaymentFailedEvent],
    "CustomerCreated":  unmarshal[customevent.CustomerCreatedEvent],
    "ProductReserved":  unmarshal[event.ProductReservedEvent],
}

func unmarshal[T Event](data []byte) (Event, error) {
    var e T
    if err := json.Unmarshal(data, &e); err != nil {
        return nil, fmt.Errorf("unmarshal %T: %w", e, err)
    }
    return e, nil
}

func Decode(eventType string, data []byte) (Event, error) {
    fn, ok := eventRegistry[eventType]
    if !ok {
        return nil, fmt.Errorf("unknown event type %q", eventType)
    }
    return fn(data)
}

Реестр — явный список известных типов. Строка "OrderConfirmed" из payload маппится только на event.OrderConfirmedEvent. Неизвестный eventType → ошибка-значение, которую caller направит в DLQ как poison pill — без паники, без reflect, без риска исполнить произвольный код.

В Java-стеке аналогом является spring.json.trusted.packages + use.type.headers: false. В Go нет такого фреймворкового механизма — статический реестр его заменяет.

Проверка топиков на старте

R-KFK-CFG-4: отсутствие ожидаемого топика — fatal при старте.

// infra/kafka/validate.go
package kafka

import (
    "context"
    "fmt"

    "github.com/segmentio/kafka-go"
)

func ValidateTopics(ctx context.Context, cfg config.KafkaConfig) error {
    conn, err := kafka.DialContext(ctx, "tcp", cfg.Brokers[0])
    if err != nil {
        return fmt.Errorf("dial kafka for topic validation: %w", err)
    }
    defer conn.Close()

    partitions, err := conn.ReadPartitions()
    if err != nil {
        return fmt.Errorf("read partitions: %w", err)
    }

    existing := make(map[string]struct{}, len(partitions))
    for _, p := range partitions {
        existing[p.Topic] = struct{}{}
    }

    required := []string{
        cfg.Topics.OrdersConfirmed,
        cfg.Topics.PaymentsFailed,
        cfg.Topics.ProductsCreated,
    }
    for _, topic := range required {
        if _, ok := existing[topic]; !ok {
            return fmt.Errorf("required topic %q not found in kafka cluster", topic)
        }
    }
    return nil
}

Сценарий без этой проверки: деплой order-service с новым KAFKA_TOPIC_PRODUCTS_CREATED=products.created; SRE забыл создать топик; сервис стартует, reader молча ждёт несуществующий топик; никаких алертов; через неделю обнаруживается, что резервирование товаров не работало. С ValidateTopicsos.Exit(1) → K8s CrashLoopBackOff → SRE видит сразу.

В dev/test ValidateTopics можно пропустить через переменную KAFKA_SKIP_TOPIC_VALIDATION=true — это отдельный путь в main, не изменение логики ValidateTopics.

TLS-dialer

R-KFK-SEC-1: в проде kafka.Dialer с TLSConfig.

// infra/kafka/dialer.go
package kafka

import (
    "crypto/tls"
    "crypto/x509"
    "os"
    "time"

    "github.com/segmentio/kafka-go"
)

func NewTLSDialer(cfg config.TLSConfig) (*kafka.Dialer, error) {
    if !cfg.Enabled {
        return kafka.DefaultDialer, nil
    }

    cert, err := tls.LoadX509KeyPair(cfg.CertFile, cfg.KeyFile)
    if err != nil {
        return nil, fmt.Errorf("load tls cert: %w", err)
    }

    caPEM, err := os.ReadFile(cfg.CAFile)
    if err != nil {
        return nil, fmt.Errorf("read ca file: %w", err)
    }
    pool := x509.NewCertPool()
    pool.AppendCertsFromPEM(caPEM)

    return &kafka.Dialer{
        Timeout:   10 * time.Second,
        DualStack: true,
        TLS: &tls.Config{
            Certificates: []tls.Certificate{cert},
            RootCAs:      pool,
        },
    }, nil
}

kafka.DefaultDialer без TLS допустим только в KAFKA_TLS_ENABLED=false (локальная разработка). В prod/staging cfg.TLS.Enabled должен быть true — это можно проверить в ValidateTopics дополнительным шагом.

Сборка в main

// cmd/order-service/main.go
func run(ctx context.Context) error {
    kafkaCfg, err := config.LoadKafkaConfig()
    if err != nil {
        return fmt.Errorf("kafka config: %w", err)
    }

    if err := infrakafka.ValidateTopics(ctx, kafkaCfg); err != nil {
        return fmt.Errorf("kafka topics: %w", err)
    }

    dialer, err := infrakafka.NewTLSDialer(kafkaCfg.TLS)
    if err != nil {
        return fmt.Errorf("kafka dialer: %w", err)
    }

    orderWriter := infrakafka.NewOrderWriterWithDialer(kafkaCfg, dialer)
    defer orderWriter.Close()

    orderReader := infrakafka.NewOrderConfirmedReader(kafkaCfg)
    defer orderReader.Close()

    // регистрируем горутины и запускаем сервер
    // ...
    return nil
}

Все клиенты собираются в одном месте — main или wire-граф DI. Конструкторы NewOrderWriter, NewOrderConfirmedReader не импортируют os.Getenv и не читают конфиг самостоятельно — получают KafkaConfig явным аргументом. Это делает их тестируемыми без переменных среды.

Что запрещено

АнтипаттернПравилоЧто взамен
brokers := []string{"localhost:9092"} хардкодомR-KFK-CFG-X2KafkaConfig.Brokers через envconfig:"KAFKA_BROKERS" required:"true"
reflect.New(registry[typeName]) по строке из payloadR-KFK-CFG-X1статический map[string]func([]byte) (Event, error)
CommitInterval: 5 * time.Second (авто-commit)R-KFK-CONS-X1 / R-KFK-CFG-2CommitInterval: 0 + явный CommitMessages
RequiredAcks: kafka.RequireNone или kafka.RequireOneR-KFK-PROD-X1 / R-KFK-CFG-2kafka.RequireAll
Balancer: &kafka.RoundRobin{} для бизнес-событийR-KFK-PROD-X2 / R-KFK-CFG-2&kafka.Hash{} — партиция по ключу
Запуск без проверки топиковR-KFK-CFG-4ValidateTopics в mainos.Exit(1)
ClientID отсутствует или общий на кластерR-KFK-SEC-2envconfig:"KAFKA_CLIENT_ID" required:"true" per-service
os.Getenv("KAFKA_BROKERS") в конструкторе клиентаR-KFK-CFG-1KafkaConfig через LoadKafkaConfig(), конструктор принимает структуру
KAFKA_TLS_ENABLED=false в prodR-KFK-SEC-X1TLS-dialer с cert + key + CA

Куда дальше

  • Producer — RequireAll, &kafka.Hash{}, outbox-relay.
  • Consumer — CommitInterval: 0, GroupID, StartOffset.
  • Security — TLS-dialer, per-service ClientID, ACL.
  • Observability — promauto, traceparent в Kafka headers.
  • Event design — структура payload, EventID UUID v7.
  • Outbox publishing — outbox-relay, FOR UPDATE SKIP LOCKED.
  • Idempotent consumer — processed_event, dedup в одной транзакции.
  • Retry topic + DLQ — retry-топики, isTransient, DLQ-alert.