Опирается на правила: R-KFK-SEC-1, R-KFK-SEC-2, R-KFK-SEC-3, R-KFK-SEC-X1, R-KFK-SEC-X2 из Kafka Style Guide → раздел 9. Security.

Важно знать

  • TLS в проде обязателенkafka.Dialer с заполненным TLS *tls.Config; без него — PLAINTEXT.
  • ssl.endpoint.identification.algorithm аналог в kafka-gotls.Config.ServerName; без него MITM возможен.
  • per-service ClientID — единственный способ идентификации сервиса в ACLs; общий ClientID ломает изоляцию.
  • PII — в широком топике только customer_id; email и phone — через restricted-топик или HTTP к Customer-сервису.
  • PLAINTEXT допустим только в docker-compose локальной разработки.
  • Один ClientID на кластер — компрометация любого сервиса открывает все топики.
  • KafkaConfig.TLS загружается из env (KAFKA_TLS_CERT_FILE, KAFKA_TLS_KEY_FILE, KAFKA_TLS_CA_FILE); credentials не в коде.
  • mTLS даёт автоматическую identity-привязку для ACLs через CN client-сертификата.

Kafka — общий бус между сервисами. Без security broker становится open relay: любой компонент видит все сообщения, любой может публиковать в любой топик, PII разлетается по всем подписчикам. UCP формулирует три слоя защиты: транспортный (TLS через kafka.Dialer), авторизационный (ACLs, ClientID), data-classification (PII в restricted топиках).

TLS через kafka.Dialer

R-KFK-SEC-1 — в проде весь трафик к брокеру — зашифрованно.

В kafka-go TLS настраивается через kafka.Dialer, который передаётся как в Writer, так и в Reader:

// infra/config/kafka.go
type TLSConfig struct {
    CertFile string `envconfig:"KAFKA_TLS_CERT_FILE"`
    KeyFile  string `envconfig:"KAFKA_TLS_KEY_FILE"`
    CAFile   string `envconfig:"KAFKA_TLS_CA_FILE"`
}

func (c TLSConfig) Build() (*tls.Config, error) {
    if c.CertFile == "" {
        return nil, nil // локальная разработка без TLS
    }
    cert, err := tls.LoadX509KeyPair(c.CertFile, c.KeyFile)
    if err != nil {
        return nil, fmt.Errorf("load client cert: %w", err)
    }
    pool := x509.NewCertPool()
    ca, err := os.ReadFile(c.CAFile)
    if err != nil {
        return nil, fmt.Errorf("read CA: %w", err)
    }
    if !pool.AppendCertsFromPEM(ca) {
        return nil, errors.New("failed to parse CA cert")
    }
    return &tls.Config{
        Certificates: []tls.Certificate{cert},
        RootCAs:      pool,
        MinVersion:   tls.VersionTLS12,
    }, nil
}
// infra/kafka/dialer.go
func NewDialer(cfg KafkaConfig) (*kafka.Dialer, error) {
    tlsCfg, err := cfg.TLS.Build()
    if err != nil {
        return nil, fmt.Errorf("build TLS: %w", err)
    }
    return &kafka.Dialer{
        Timeout:   10 * time.Second,
        DualStack: true,
        TLS:       tlsCfg, // nil = PLAINTEXT (только локально)
    }, nil
}

Dialer передаётся в Writer и Reader при конструировании:

// infra/kafka/writer.go
func NewOrderWriter(cfg KafkaConfig, dialer *kafka.Dialer) *kafka.Writer {
    return kafka.NewWriter(kafka.WriterConfig{
        Brokers:      cfg.Brokers,
        Topic:        cfg.Topics.OrdersConfirmed,
        Dialer:       dialer,
        RequiredAcks: int(kafka.RequireAll),
        Balancer:     &kafka.Hash{},
    })
}

// infra/kafka/reader.go
func NewOrderConfirmedReader(cfg KafkaConfig, dialer *kafka.Dialer) *kafka.Reader {
    return kafka.NewReader(kafka.ReaderConfig{
        Brokers:        cfg.Brokers,
        Topic:          cfg.Topics.OrdersConfirmed,
        GroupID:        "billing-order-confirmed",
        Dialer:         dialer,
        CommitInterval: 0,
        StartOffset:    kafka.FirstOffset,
    })
}

mTLS — Certificates в tls.Config даёт автоматическую identity-привязку: брокер видит CN=order-service-prod в client-сертификате и применяет ACL именно для этого principal.

Альтернатива — SASL/SCRAM-SHA-512 поверх TLS. В kafka-go — через kafka.Dialer.SASLMechanism:

mechanism, err := scram.Mechanism(scram.SHA512, username, password)
if err != nil {
    return nil, fmt.Errorf("scram mechanism: %w", err)
}
dialer := &kafka.Dialer{
    TLS:           tlsCfg,
    SASLMechanism: mechanism,
}

Никогда PLAINTEXT в проде

// КАТАСТРОФА: dialer без TLS в проде
dialer := &kafka.Dialer{
    Timeout: 10 * time.Second,
    // TLS: nil — всё в открытом виде
}

В сетевом capture — все сообщения, headers, traceparent, payload заказов — в открытом виде. PLAINTEXT допустим только в docker-compose при локальной разработке с изолированной сетью.

per-service ClientID

R-KFK-SEC-2 — каждый сервис идентифицируется отдельным ClientID; ACLs назначаются по нему.

// infra/config/kafka.go
type KafkaConfig struct {
    Brokers  []string `envconfig:"KAFKA_BROKERS"    required:"true"`
    ClientID string   `envconfig:"KAFKA_CLIENT_ID"  required:"true"` // R-KFK-SEC-2
    Topics   TopicsConfig
    TLS      TLSConfig
}
// infra/kafka/writer.go
func NewOrderWriter(cfg KafkaConfig, dialer *kafka.Dialer) *kafka.Writer {
    return kafka.NewWriter(kafka.WriterConfig{
        Brokers:  cfg.Brokers,
        Topic:    cfg.Topics.OrdersConfirmed,
        Dialer:   dialer,
        // ClientID идёт через Dialer.ClientID или WriterConfig в зависимости от версии kafka-go;
        // если поле недоступно напрямую — через кастомный Transport:
    })
}

В среде order-service-prod переменная KAFKA_CLIENT_ID=order-service-prod. Брокер видит этот идентификатор в каждом запросе и применяет соответствующий ACL:

Principal: order-service-prod
  READ:  payment.events, inventory.events
  WRITE: order-service.order.confirmed, order-service.order.created

billing-service-prod не может писать в order-service.order.confirmed — нет ACL. Даже если bug в коде или инъекция попытается это сделать — брокер отклонит запрос.

ACLs описываются в IaC (Terraform, Pulumi), не руками. Проектирование — DevOps/SRE; в коде Go — только ClientID из KafkaConfig.

PII в restricted топиках

R-KFK-SEC-3 — два паттерна изоляции персональных данных.

Паттерн 1: restricted топик

customer-service публикует:
  - customer.events           ← широкий (customer_id only)
  - customer.events.full      ← restricted (email, phone, address)

ACL для customer.events.full:
  READ: notification-service, customer-support-service

Go-структуры событий явно разделены:

// core/customer/event/customer_registered.go — широкий топик
type CustomerRegistered struct {
    EventID    string    `json:"event_id"`
    OccurredAt time.Time `json:"occurred_at"`
    CustomerID string    `json:"customer_id"` // только id — без PII
    EventType  string    `json:"event_type"`
}

// core/customer/event/customer_registered_full.go — restricted топик
type CustomerRegisteredFull struct {
    EventID    string    `json:"event_id"`
    OccurredAt time.Time `json:"occurred_at"`
    CustomerID string    `json:"customer_id"`
    Email      string    `json:"email"`
    Phone      string    `json:"phone"`
    EventType  string    `json:"event_type"`
}

Outbox-relay направляет по отдельным топикам через маппинг eventType → topic.

Паттерн 2: «слабая ссылка»

PII никогда не попадает в Kafka. Подписчик получает только customer_id и подгружает нужные поля через HTTP:

// adapters/in/kafka/order_confirmed_consumer.go
func (c *OrderConfirmedConsumer) handleMessage(ctx context.Context, evt OrderConfirmedEvent) error {
    // evt содержит только order_id, customer_id, total_amount
    customer, err := c.customerClient.GetEmail(ctx, evt.CustomerID)
    if err != nil {
        return fmt.Errorf("get customer email %s: %w", evt.CustomerID, err)
    }
    return c.notifications.SendOrderConfirmed(ctx, customer.Email, evt.OrderID)
}
// adapters/out/http/customer_client.go
type CustomerClient struct {
    base   string
    client *http.Client
}

func (c *CustomerClient) GetEmail(ctx context.Context, customerID string) (*CustomerEmail, error) {
    req, err := http.NewRequestWithContext(ctx, http.MethodGet,
        c.base+"/internal/customers/"+customerID+"/email", nil)
    if err != nil {
        return nil, fmt.Errorf("build request: %w", err)
    }
    resp, err := c.client.Do(req)
    if err != nil {
        return nil, fmt.Errorf("customer email request: %w", err)
    }
    defer resp.Body.Close()

    if resp.StatusCode != http.StatusOK {
        return nil, fmt.Errorf("customer service: status %d", resp.StatusCode)
    }
    var result CustomerEmail
    if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
        return nil, fmt.Errorf("decode customer email: %w", err)
    }
    return &result, nil
}

Цена — HTTP-нагрузка на Customer-сервис при каждом событии. Преимущество — email и phone никогда не попадают в Kafka backup, в DLQ, в slog-логи. Для большинства случаев паттерн «слабая ссылка» предпочтительнее: проще, меньше точек утечки.

Что запрещено

АнтипаттернПравилоЧто взамен
kafka.Dialer без TLS в продеR-KFK-SEC-X1TLS: tlsCfg через cfg.TLS.Build()
PLAINTEXT broker URL в продеR-KFK-SEC-X1TLS / SASL_SSL
Один ClientID для всех сервисовR-KFK-SEC-X2KAFKA_CLIENT_ID per-сервис из env
PII в широковещательных топикахR-KFK-SEC-3restricted-топик или «слабая ссылка»
tls.Config без MinVersion: tls.VersionTLS12R-KFK-SEC-1явный MinVersion
Credentials (KAFKA_TLS_KEY_FILE) в кодеR-KFK-SEC-2env + Vault / Secret Manager
Пустой ServerName в tls.Config при MITM-рискеR-KFK-SEC-1явный ServerName или проверка по CA

Куда дальше

  • Конфигурация — KafkaConfig с TLSConfig через envconfig; ClientID required:"true".
  • Producer — передача Dialer в Writer; RequiredAcks: RequireAll.
  • Consumer — передача Dialer в Reader; GroupID per-сервис.
  • Event design — структуры событий без PII в payload широкого топика.
  • Observability — traceparent в Kafka headers; audit через slog.
  • Outbox publishing — маппинг eventType → topic для restricted-топиков.
  • Idempotent consumer — ACLs не заменяют идемпотентность.
  • Retry topic + DLQ — PII не попадает в DLQ при паттерне «слабая ссылка».