Опирается на правила:
R-KFK-CFG-1…R-KFK-CFG-4иR-KFK-CFG-X1…R-KFK-CFG-X2из Kafka Rules → раздел 7. Конфигурация.
Важно знать
KafkaConfig— типизированная структура с тегамиenvconfig:"..." required:"true"; никаких строковыхos.Getenvпо сервису.KAFKA_BROKERS— обязательная переменная среды; хардкодlocalhost:9092недопустим даже в коде инициализации.kafka.Writer—RequiredAcks: kafka.RequireAll,Balancer: &kafka.Hash{},MaxAttempts: math.MaxInt32— в конструкторе, не в точке вызова.kafka.Reader—CommitInterval: 0(ручной commit),StartOffset: kafka.FirstOffset— явно вReaderConfig.- Десериализация событий — статический
map[string]func([]byte) (Event, error), неreflectпо строке из payload.- Проверка топиков через
ReadPartitionsна старте; отсутствие ожидаемого топика →fatal.ClientID— per-service, обязателен для ACL-идентификации (см.R-KFK-SEC-2).
В Go нет фреймворка, который собирает Kafka-клиент из YAML автоматически — вся конфигурация живёт в коде. Это даёт контроль, но и создаёт риски: нетипизированные строки, магические числа в разных файлах, молчаливый старт на несуществующем топике. Три идиомы закрывают эти риски: типизированная структура через envconfig, явный реестр десериализаторов и проверка топиков на старте.
KafkaConfig через envconfig
R-KFK-CFG-1: параметры через типизированную структуру с валидацией.
// infra/config/kafka.go
package config
import "github.com/kelseyhightower/envconfig"
type KafkaConfig struct {
Brokers []string `envconfig:"KAFKA_BROKERS" required:"true"`
ClientID string `envconfig:"KAFKA_CLIENT_ID" required:"true"` // R-KFK-SEC-2
Topics TopicsConfig
TLS TLSConfig
}
type TopicsConfig struct {
OrdersConfirmed string `envconfig:"KAFKA_TOPIC_ORDERS_CONFIRMED" required:"true"`
PaymentsFailed string `envconfig:"KAFKA_TOPIC_PAYMENTS_FAILED" required:"true"`
ProductsCreated string `envconfig:"KAFKA_TOPIC_PRODUCTS_CREATED" required:"true"`
}
type TLSConfig struct {
Enabled bool `envconfig:"KAFKA_TLS_ENABLED" default:"false"`
CertFile string `envconfig:"KAFKA_TLS_CERT_FILE"`
KeyFile string `envconfig:"KAFKA_TLS_KEY_FILE"`
CAFile string `envconfig:"KAFKA_TLS_CA_FILE"`
}
func LoadKafkaConfig() (KafkaConfig, error) {
var cfg KafkaConfig
if err := envconfig.Process("", &cfg); err != nil {
return KafkaConfig{}, fmt.Errorf("load kafka config: %w", err)
}
return cfg, nil
}
envconfig.Process проверяет все required:"true" при вызове — если KAFKA_BROKERS не задан, LoadKafkaConfig вернёт ошибку ещё в main, до создания какого-либо клиента. Это эквивалент @Validated в Java: fail-fast, не fail-at-first-message.
В main это выглядит так:
// cmd/order-service/main.go
func main() {
kafkaCfg, err := config.LoadKafkaConfig()
if err != nil {
slog.Error("invalid kafka config", "error", err)
os.Exit(1)
}
if err := kafka.ValidateTopics(ctx, kafkaCfg); err != nil {
slog.Error("kafka topics validation failed", "error", err)
os.Exit(1)
}
// ...
}
Конструктор Writer — producer-параметры
R-KFK-CFG-2: критические параметры producer зафиксированы в конструкторе, а не разбросаны по точкам вызова.
// infra/kafka/writer.go
package kafka
import (
"math"
"github.com/segmentio/kafka-go"
)
func NewOrderWriter(cfg config.KafkaConfig) *kafka.Writer {
return &kafka.Writer{
Addr: kafka.TCP(cfg.Brokers...),
Topic: cfg.Topics.OrdersConfirmed,
Balancer: &kafka.Hash{}, // R-KFK-PROD-2: партиционирование по ключу
RequiredAcks: kafka.RequireAll, // R-KFK-PROD-1/X2: acks=all
MaxAttempts: math.MaxInt32, // R-KFK-PROD-1: retries≈∞
Compression: kafka.Snappy,
}
}
func NewProductWriter(cfg config.KafkaConfig) *kafka.Writer {
return &kafka.Writer{
Addr: kafka.TCP(cfg.Brokers...),
Topic: cfg.Topics.ProductsCreated,
Balancer: &kafka.Hash{},
RequiredAcks: kafka.RequireAll,
MaxAttempts: math.MaxInt32,
}
}
kafka-go не экспонирует enable.idempotence как флаг — RequireAll + MaxAttempts: math.MaxInt32 дают эквивалентную семантику at-least-once с ordering per-partition. Если нужен exactly-once — kafka.Writer с Transport + Kafka-транзакции (отдельная тема).
Конструктор Reader — consumer-параметры
R-KFK-CFG-2: CommitInterval и StartOffset — явно в ReaderConfig.
// infra/kafka/reader.go
package kafka
import "github.com/segmentio/kafka-go"
func NewOrderConfirmedReader(cfg config.KafkaConfig) *kafka.Reader {
return kafka.NewReader(kafka.ReaderConfig{
Brokers: cfg.Brokers,
Topic: cfg.Topics.OrdersConfirmed,
GroupID: "billing-order-confirmed", // R-KFK-CONS-1: <service>-<purpose>
MinBytes: 1,
MaxBytes: 10 << 20, // 10 МБ
CommitInterval: 0, // R-KFK-CONS-2: ручной commit
StartOffset: kafka.FirstOffset, // R-KFK-CONS-4: earliest
MaxWait: 500 * time.Millisecond,
})
}
func NewPaymentFailedReader(cfg config.KafkaConfig) *kafka.Reader {
return kafka.NewReader(kafka.ReaderConfig{
Brokers: cfg.Brokers,
Topic: cfg.Topics.PaymentsFailed,
GroupID: "order-payment-failed",
CommitInterval: 0,
StartOffset: kafka.FirstOffset,
MaxWait: 500 * time.Millisecond,
})
}
CommitInterval: 0 отключает авто-commit полностью — единственный способ продвинуть offset — явный вызов reader.CommitMessages(ctx, msg) после успешной обработки. Подробнее — Consumer.
Реестр десериализаторов
R-KFK-CFG-3: вместо динамического reflect по строке из payload — статический map.
// infra/kafka/registry.go
package kafka
import (
"encoding/json"
"fmt"
"github.com/example/order-service/core/order/event"
customevent "github.com/example/order-service/core/customer/event"
)
type Event interface {
EventID() string
}
var eventRegistry = map[string]func([]byte) (Event, error){
"OrderConfirmed": unmarshal[event.OrderConfirmedEvent],
"OrderCancelled": unmarshal[event.OrderCancelledEvent],
"PaymentFailed": unmarshal[event.PaymentFailedEvent],
"CustomerCreated": unmarshal[customevent.CustomerCreatedEvent],
"ProductReserved": unmarshal[event.ProductReservedEvent],
}
func unmarshal[T Event](data []byte) (Event, error) {
var e T
if err := json.Unmarshal(data, &e); err != nil {
return nil, fmt.Errorf("unmarshal %T: %w", e, err)
}
return e, nil
}
func Decode(eventType string, data []byte) (Event, error) {
fn, ok := eventRegistry[eventType]
if !ok {
return nil, fmt.Errorf("unknown event type %q", eventType)
}
return fn(data)
}
Реестр — явный список известных типов. Строка "OrderConfirmed" из payload маппится только на event.OrderConfirmedEvent. Неизвестный eventType → ошибка-значение, которую caller направит в DLQ как poison pill — без паники, без reflect, без риска исполнить произвольный код.
В Java-стеке аналогом является spring.json.trusted.packages + use.type.headers: false. В Go нет такого фреймворкового механизма — статический реестр его заменяет.
Проверка топиков на старте
R-KFK-CFG-4: отсутствие ожидаемого топика — fatal при старте.
// infra/kafka/validate.go
package kafka
import (
"context"
"fmt"
"github.com/segmentio/kafka-go"
)
func ValidateTopics(ctx context.Context, cfg config.KafkaConfig) error {
conn, err := kafka.DialContext(ctx, "tcp", cfg.Brokers[0])
if err != nil {
return fmt.Errorf("dial kafka for topic validation: %w", err)
}
defer conn.Close()
partitions, err := conn.ReadPartitions()
if err != nil {
return fmt.Errorf("read partitions: %w", err)
}
existing := make(map[string]struct{}, len(partitions))
for _, p := range partitions {
existing[p.Topic] = struct{}{}
}
required := []string{
cfg.Topics.OrdersConfirmed,
cfg.Topics.PaymentsFailed,
cfg.Topics.ProductsCreated,
}
for _, topic := range required {
if _, ok := existing[topic]; !ok {
return fmt.Errorf("required topic %q not found in kafka cluster", topic)
}
}
return nil
}
Сценарий без этой проверки: деплой order-service с новым KAFKA_TOPIC_PRODUCTS_CREATED=products.created; SRE забыл создать топик; сервис стартует, reader молча ждёт несуществующий топик; никаких алертов; через неделю обнаруживается, что резервирование товаров не работало. С ValidateTopics → os.Exit(1) → K8s CrashLoopBackOff → SRE видит сразу.
В dev/test ValidateTopics можно пропустить через переменную KAFKA_SKIP_TOPIC_VALIDATION=true — это отдельный путь в main, не изменение логики ValidateTopics.
TLS-dialer
R-KFK-SEC-1: в проде kafka.Dialer с TLSConfig.
// infra/kafka/dialer.go
package kafka
import (
"crypto/tls"
"crypto/x509"
"os"
"time"
"github.com/segmentio/kafka-go"
)
func NewTLSDialer(cfg config.TLSConfig) (*kafka.Dialer, error) {
if !cfg.Enabled {
return kafka.DefaultDialer, nil
}
cert, err := tls.LoadX509KeyPair(cfg.CertFile, cfg.KeyFile)
if err != nil {
return nil, fmt.Errorf("load tls cert: %w", err)
}
caPEM, err := os.ReadFile(cfg.CAFile)
if err != nil {
return nil, fmt.Errorf("read ca file: %w", err)
}
pool := x509.NewCertPool()
pool.AppendCertsFromPEM(caPEM)
return &kafka.Dialer{
Timeout: 10 * time.Second,
DualStack: true,
TLS: &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: pool,
},
}, nil
}
kafka.DefaultDialer без TLS допустим только в KAFKA_TLS_ENABLED=false (локальная разработка). В prod/staging cfg.TLS.Enabled должен быть true — это можно проверить в ValidateTopics дополнительным шагом.
Сборка в main
// cmd/order-service/main.go
func run(ctx context.Context) error {
kafkaCfg, err := config.LoadKafkaConfig()
if err != nil {
return fmt.Errorf("kafka config: %w", err)
}
if err := infrakafka.ValidateTopics(ctx, kafkaCfg); err != nil {
return fmt.Errorf("kafka topics: %w", err)
}
dialer, err := infrakafka.NewTLSDialer(kafkaCfg.TLS)
if err != nil {
return fmt.Errorf("kafka dialer: %w", err)
}
orderWriter := infrakafka.NewOrderWriterWithDialer(kafkaCfg, dialer)
defer orderWriter.Close()
orderReader := infrakafka.NewOrderConfirmedReader(kafkaCfg)
defer orderReader.Close()
// регистрируем горутины и запускаем сервер
// ...
return nil
}
Все клиенты собираются в одном месте — main или wire-граф DI. Конструкторы NewOrderWriter, NewOrderConfirmedReader не импортируют os.Getenv и не читают конфиг самостоятельно — получают KafkaConfig явным аргументом. Это делает их тестируемыми без переменных среды.
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
brokers := []string{"localhost:9092"} хардкодом | R-KFK-CFG-X2 | KafkaConfig.Brokers через envconfig:"KAFKA_BROKERS" required:"true" |
reflect.New(registry[typeName]) по строке из payload | R-KFK-CFG-X1 | статический map[string]func([]byte) (Event, error) |
CommitInterval: 5 * time.Second (авто-commit) | R-KFK-CONS-X1 / R-KFK-CFG-2 | CommitInterval: 0 + явный CommitMessages |
RequiredAcks: kafka.RequireNone или kafka.RequireOne | R-KFK-PROD-X1 / R-KFK-CFG-2 | kafka.RequireAll |
Balancer: &kafka.RoundRobin{} для бизнес-событий | R-KFK-PROD-X2 / R-KFK-CFG-2 | &kafka.Hash{} — партиция по ключу |
| Запуск без проверки топиков | R-KFK-CFG-4 | ValidateTopics в main → os.Exit(1) |
ClientID отсутствует или общий на кластер | R-KFK-SEC-2 | envconfig:"KAFKA_CLIENT_ID" required:"true" per-service |
os.Getenv("KAFKA_BROKERS") в конструкторе клиента | R-KFK-CFG-1 | KafkaConfig через LoadKafkaConfig(), конструктор принимает структуру |
KAFKA_TLS_ENABLED=false в prod | R-KFK-SEC-X1 | TLS-dialer с cert + key + CA |
Куда дальше
- Producer —
RequireAll,&kafka.Hash{}, outbox-relay. - Consumer —
CommitInterval: 0,GroupID,StartOffset. - Security — TLS-dialer, per-service
ClientID, ACL. - Observability —
promauto,traceparentв Kafka headers. - Event design — структура payload,
EventIDUUID v7. - Outbox publishing — outbox-relay,
FOR UPDATE SKIP LOCKED. - Idempotent consumer —
processed_event, dedup в одной транзакции. - Retry topic + DLQ — retry-топики,
isTransient, DLQ-alert.