Опирается на правила:
R-KFK-SEC-1,R-KFK-SEC-2,R-KFK-SEC-3,R-KFK-SEC-X1,R-KFK-SEC-X2из Kafka Style Guide → раздел 9. Security.
Важно знать
- TLS в проде обязателен —
kafka.Dialerс заполненнымTLS *tls.Config; без него —PLAINTEXT.ssl.endpoint.identification.algorithmаналог вkafka-go—tls.Config.ServerName; без него MITM возможен.- per-service
ClientID— единственный способ идентификации сервиса в ACLs; общийClientIDломает изоляцию.- PII — в широком топике только
customer_id; email и phone — через restricted-топик или HTTP к Customer-сервису.PLAINTEXTдопустим только вdocker-composeлокальной разработки.- Один
ClientIDна кластер — компрометация любого сервиса открывает все топики.KafkaConfig.TLSзагружается из env (KAFKA_TLS_CERT_FILE,KAFKA_TLS_KEY_FILE,KAFKA_TLS_CA_FILE); credentials не в коде.- mTLS даёт автоматическую identity-привязку для ACLs через
CNclient-сертификата.
Kafka — общий бус между сервисами. Без security broker становится open relay: любой компонент видит все сообщения, любой может публиковать в любой топик, PII разлетается по всем подписчикам. UCP формулирует три слоя защиты: транспортный (TLS через kafka.Dialer), авторизационный (ACLs, ClientID), data-classification (PII в restricted топиках).
TLS через kafka.Dialer
R-KFK-SEC-1 — в проде весь трафик к брокеру — зашифрованно.
В kafka-go TLS настраивается через kafka.Dialer, который передаётся как в Writer, так и в Reader:
// infra/config/kafka.go
type TLSConfig struct {
CertFile string `envconfig:"KAFKA_TLS_CERT_FILE"`
KeyFile string `envconfig:"KAFKA_TLS_KEY_FILE"`
CAFile string `envconfig:"KAFKA_TLS_CA_FILE"`
}
func (c TLSConfig) Build() (*tls.Config, error) {
if c.CertFile == "" {
return nil, nil // локальная разработка без TLS
}
cert, err := tls.LoadX509KeyPair(c.CertFile, c.KeyFile)
if err != nil {
return nil, fmt.Errorf("load client cert: %w", err)
}
pool := x509.NewCertPool()
ca, err := os.ReadFile(c.CAFile)
if err != nil {
return nil, fmt.Errorf("read CA: %w", err)
}
if !pool.AppendCertsFromPEM(ca) {
return nil, errors.New("failed to parse CA cert")
}
return &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: pool,
MinVersion: tls.VersionTLS12,
}, nil
}
// infra/kafka/dialer.go
func NewDialer(cfg KafkaConfig) (*kafka.Dialer, error) {
tlsCfg, err := cfg.TLS.Build()
if err != nil {
return nil, fmt.Errorf("build TLS: %w", err)
}
return &kafka.Dialer{
Timeout: 10 * time.Second,
DualStack: true,
TLS: tlsCfg, // nil = PLAINTEXT (только локально)
}, nil
}
Dialer передаётся в Writer и Reader при конструировании:
// infra/kafka/writer.go
func NewOrderWriter(cfg KafkaConfig, dialer *kafka.Dialer) *kafka.Writer {
return kafka.NewWriter(kafka.WriterConfig{
Brokers: cfg.Brokers,
Topic: cfg.Topics.OrdersConfirmed,
Dialer: dialer,
RequiredAcks: int(kafka.RequireAll),
Balancer: &kafka.Hash{},
})
}
// infra/kafka/reader.go
func NewOrderConfirmedReader(cfg KafkaConfig, dialer *kafka.Dialer) *kafka.Reader {
return kafka.NewReader(kafka.ReaderConfig{
Brokers: cfg.Brokers,
Topic: cfg.Topics.OrdersConfirmed,
GroupID: "billing-order-confirmed",
Dialer: dialer,
CommitInterval: 0,
StartOffset: kafka.FirstOffset,
})
}
mTLS — Certificates в tls.Config даёт автоматическую identity-привязку: брокер видит CN=order-service-prod в client-сертификате и применяет ACL именно для этого principal.
Альтернатива — SASL/SCRAM-SHA-512 поверх TLS. В kafka-go — через kafka.Dialer.SASLMechanism:
mechanism, err := scram.Mechanism(scram.SHA512, username, password)
if err != nil {
return nil, fmt.Errorf("scram mechanism: %w", err)
}
dialer := &kafka.Dialer{
TLS: tlsCfg,
SASLMechanism: mechanism,
}
Никогда PLAINTEXT в проде
// КАТАСТРОФА: dialer без TLS в проде
dialer := &kafka.Dialer{
Timeout: 10 * time.Second,
// TLS: nil — всё в открытом виде
}
В сетевом capture — все сообщения, headers, traceparent, payload заказов — в открытом виде. PLAINTEXT допустим только в docker-compose при локальной разработке с изолированной сетью.
per-service ClientID
R-KFK-SEC-2 — каждый сервис идентифицируется отдельным ClientID; ACLs назначаются по нему.
// infra/config/kafka.go
type KafkaConfig struct {
Brokers []string `envconfig:"KAFKA_BROKERS" required:"true"`
ClientID string `envconfig:"KAFKA_CLIENT_ID" required:"true"` // R-KFK-SEC-2
Topics TopicsConfig
TLS TLSConfig
}
// infra/kafka/writer.go
func NewOrderWriter(cfg KafkaConfig, dialer *kafka.Dialer) *kafka.Writer {
return kafka.NewWriter(kafka.WriterConfig{
Brokers: cfg.Brokers,
Topic: cfg.Topics.OrdersConfirmed,
Dialer: dialer,
// ClientID идёт через Dialer.ClientID или WriterConfig в зависимости от версии kafka-go;
// если поле недоступно напрямую — через кастомный Transport:
})
}
В среде order-service-prod переменная KAFKA_CLIENT_ID=order-service-prod. Брокер видит этот идентификатор в каждом запросе и применяет соответствующий ACL:
Principal: order-service-prod
READ: payment.events, inventory.events
WRITE: order-service.order.confirmed, order-service.order.created
billing-service-prod не может писать в order-service.order.confirmed — нет ACL. Даже если bug в коде или инъекция попытается это сделать — брокер отклонит запрос.
ACLs описываются в IaC (Terraform, Pulumi), не руками. Проектирование — DevOps/SRE; в коде Go — только ClientID из KafkaConfig.
PII в restricted топиках
R-KFK-SEC-3 — два паттерна изоляции персональных данных.
Паттерн 1: restricted топик
customer-service публикует:
- customer.events ← широкий (customer_id only)
- customer.events.full ← restricted (email, phone, address)
ACL для customer.events.full:
READ: notification-service, customer-support-service
Go-структуры событий явно разделены:
// core/customer/event/customer_registered.go — широкий топик
type CustomerRegistered struct {
EventID string `json:"event_id"`
OccurredAt time.Time `json:"occurred_at"`
CustomerID string `json:"customer_id"` // только id — без PII
EventType string `json:"event_type"`
}
// core/customer/event/customer_registered_full.go — restricted топик
type CustomerRegisteredFull struct {
EventID string `json:"event_id"`
OccurredAt time.Time `json:"occurred_at"`
CustomerID string `json:"customer_id"`
Email string `json:"email"`
Phone string `json:"phone"`
EventType string `json:"event_type"`
}
Outbox-relay направляет по отдельным топикам через маппинг eventType → topic.
Паттерн 2: «слабая ссылка»
PII никогда не попадает в Kafka. Подписчик получает только customer_id и подгружает нужные поля через HTTP:
// adapters/in/kafka/order_confirmed_consumer.go
func (c *OrderConfirmedConsumer) handleMessage(ctx context.Context, evt OrderConfirmedEvent) error {
// evt содержит только order_id, customer_id, total_amount
customer, err := c.customerClient.GetEmail(ctx, evt.CustomerID)
if err != nil {
return fmt.Errorf("get customer email %s: %w", evt.CustomerID, err)
}
return c.notifications.SendOrderConfirmed(ctx, customer.Email, evt.OrderID)
}
// adapters/out/http/customer_client.go
type CustomerClient struct {
base string
client *http.Client
}
func (c *CustomerClient) GetEmail(ctx context.Context, customerID string) (*CustomerEmail, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet,
c.base+"/internal/customers/"+customerID+"/email", nil)
if err != nil {
return nil, fmt.Errorf("build request: %w", err)
}
resp, err := c.client.Do(req)
if err != nil {
return nil, fmt.Errorf("customer email request: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("customer service: status %d", resp.StatusCode)
}
var result CustomerEmail
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, fmt.Errorf("decode customer email: %w", err)
}
return &result, nil
}
Цена — HTTP-нагрузка на Customer-сервис при каждом событии. Преимущество — email и phone никогда не попадают в Kafka backup, в DLQ, в slog-логи. Для большинства случаев паттерн «слабая ссылка» предпочтительнее: проще, меньше точек утечки.
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
kafka.Dialer без TLS в проде | R-KFK-SEC-X1 | TLS: tlsCfg через cfg.TLS.Build() |
PLAINTEXT broker URL в проде | R-KFK-SEC-X1 | TLS / SASL_SSL |
Один ClientID для всех сервисов | R-KFK-SEC-X2 | KAFKA_CLIENT_ID per-сервис из env |
| PII в широковещательных топиках | R-KFK-SEC-3 | restricted-топик или «слабая ссылка» |
tls.Config без MinVersion: tls.VersionTLS12 | R-KFK-SEC-1 | явный MinVersion |
Credentials (KAFKA_TLS_KEY_FILE) в коде | R-KFK-SEC-2 | env + Vault / Secret Manager |
Пустой ServerName в tls.Config при MITM-риске | R-KFK-SEC-1 | явный ServerName или проверка по CA |
Куда дальше
- Конфигурация —
KafkaConfigсTLSConfigчерезenvconfig;ClientID required:"true". - Producer — передача
DialerвWriter;RequiredAcks: RequireAll. - Consumer — передача
DialerвReader;GroupIDper-сервис. - Event design — структуры событий без PII в payload широкого топика.
- Observability —
traceparentв Kafka headers; audit через slog. - Outbox publishing — маппинг
eventType → topicдля restricted-топиков. - Idempotent consumer — ACLs не заменяют идемпотентность.
- Retry topic + DLQ — PII не попадает в DLQ при паттерне «слабая ссылка».