Опирается на правила: R-KFK-SEC-1R-KFK-SEC-3 и R-KFK-SEC-X1R-KFK-SEC-X2 из Kafka Rules → раздел 9. Security.

Важно знать

  • В продакшене new Kafka({ ssl: true, sasl: { mechanism: 'scram-sha-512', username, password } }) — обязательно; ssl: false даже с SASL недопустимо.
  • PLAINTEXT — только в локальном docker-compose; в CI-окружении с сетевым брокером — уже минимум TLS.
  • clientId в KafkaConfig — per-service-account; именно он идентифицирует сервис для ACL на брокере.
  • Один service-account на весь кластер: компрометация одного сервиса даёт доступ ко всем топикам — недопустимо.
  • PII-поля (email, phone, inn) — никогда в широковещательных топиках (orders.confirmed); только customerId, за full PII — HTTP-запрос к Customer-сервису.
  • Credentials (username, password, пути к сертификатам) — env / Vault; никогда в kafka.config.ts или .env-файле в репозитории.
  • ssl.endpoint.identification.algorithm kafkajs не выставляется отдельно: hostname-верификация включена по умолчанию при ssl: true; отключать не нужно.

Kafka — общий бус между сервисами. Без security broker становится open relay: любой компонент видит все сообщения, любой может публиковать в любой топик, PII разлетается по consumer'ам. UCP формулирует три слоя защиты: транспортный (TLS), авторизационный (ACL per-сервис) и data-classification (PII в restricted topics). В Node эти слои реализуются через kafkajs-опции ssl/sasl и clientId в KafkaConfig.

TLS обязателен (R-KFK-SEC-1)

В kafkajs транспортная защита задаётся при создании клиента — один раз в KafkaModule.

// infra/kafka/kafka.module.ts
import { Kafka } from 'kafkajs';
import { ConfigService } from '@nestjs/config';
import { KafkaConfig } from '../config/kafka.config';

{
  provide: Kafka,
  inject: [ConfigService],
  useFactory: (cfg: ConfigService<KafkaConfig>) => new Kafka({
    clientId: cfg.get('clientId', { infer: true }),
    brokers:  cfg.get('brokers',  { infer: true }).split(','),
    ssl:  true,
    sasl: {
      mechanism: 'scram-sha-512',
      username:  cfg.get('saslUsername', { infer: true }),
      password:  cfg.get('saslPassword', { infer: true }),
    },
  }),
}

ssl: true — kafkajs устанавливает TLS-соединение; hostname-верификация включена по умолчанию. Все сообщения, заголовки, токены, payload шифруются в транзите.

sasl.mechanism: 'scram-sha-512' — аутентификация поверх TLS. Алгоритм важен: PLAIN over TLS допустим для test, но в продакшене password оказывается в открытом виде после TLS-decrypt на брокере; SCRAM-SHA-512 хеширует credentials до передачи.

Если используются клиентские сертификаты (mTLS), kafkajs принимает ssl как объект tls.ConnectionOptions:

import { readFileSync } from 'fs';

ssl: {
  rejectUnauthorized: true,
  ca:   [readFileSync(cfg.get('sslCaPath',   { infer: true }))],
  cert:  readFileSync(cfg.get('sslCertPath', { infer: true })),
  key:   readFileSync(cfg.get('sslKeyPath',  { infer: true })),
},

При mTLS отдельного SASL не нужно — identity client-сертификата достаточно для ACL.

Никогда ssl: false в продакшене

// ПЛОХО
new Kafka({ clientId: 'order-service', brokers: ['kafka-prod:9092'] })
// ssl опущен → plaintext; весь трафик в открытом виде

В сетевом capture — все сообщения, заголовки, SASL credentials. Insider-атака возможна на любом hop в сети.

ssl: false (или отсутствие параметра) — только локальный docker-compose с single-broker в изолированной сети.

KafkaConfig с SSL/SASL-полями

// infra/config/kafka.config.ts
import { IsNotEmpty, IsString } from 'class-validator';

export class KafkaConfig {
  @IsString() @IsNotEmpty()
  clientId: string;

  @IsString() @IsNotEmpty()
  brokers: string;

  @IsString() @IsNotEmpty()
  saslUsername: string;

  @IsString() @IsNotEmpty()
  saslPassword: string;
}
# .env.production (через CI/CD secrets / Vault — никогда в репозитории)
KAFKA_CLIENT_ID=order-service-prod
KAFKA_BROKERS=kafka-prod-1:9093,kafka-prod-2:9093,kafka-prod-3:9093
KAFKA_SASL_USERNAME=order-service-prod
KAFKA_SASL_PASSWORD=<из Vault>

ACL per-сервис (R-KFK-SEC-2)

Каждый сервис имеет собственный service-account, ACL ограничивает его только необходимыми топиками.

service-account: order-service-prod
  ACL:
    READ:   payment.events, inventory.events
    WRITE:  orders.confirmed, orders.cancelled, orders.created

service-account: billing-service-prod
  ACL:
    READ:   orders.confirmed
    WRITE:  billing.invoice.created

order-service-prod не может писать в payment.events — это публикует payment-service-prod. Если в order-service есть уязвимость или ошибка в коде — blast radius ограничен его ACL.

Идентификация client → service-account идёт через SASL-username или CN в mTLS-сертификате. В kafkajs clientId — это identity в логах брокера; для ACL важен именно SASL-username (они обычно совпадают по соглашению).

ACL управляются через kafka-acls.sh или IaC (Terraform, Strimzi operator). Проектирование ACL — DevOps/SRE; приложение использует clientId/credentials из KafkaConfig — не знает о деталях ACL.

# пример Strimzi KafkaUser
kubectl apply -f - <<EOF
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
  name: order-service-prod
spec:
  authentication:
    type: scram-sha-512
  authorization:
    type: simple
    acls:
      - resource: { type: topic, name: orders.confirmed }
        operations: [Write, Describe]
      - resource: { type: topic, name: payment.events }
        operations: [Read, Describe]
EOF

Один service-account на весь кластер — недопустимо

// ПЛОХО: все сервисы используют одни credentials
new Kafka({ clientId: 'app', sasl: { username: 'app', password: 'secret' } })

Последствия:

  • Blast radius: компрометация order-service (через RCE, утечку secrets) даёт attacker'у доступ ко всем топикам, включая payment.events и customers.pii.
  • Audit gap: брокер видит действия от app — непонятно, какой именно сервис опубликовал spurious event.
  • Ошибки кода: bug в order-service может случайно писать в топики billing-service — потребитель получит «лишние» события.

PII в restricted topics (R-KFK-SEC-3)

email, phone, inn, адрес — никогда не появляются в широковещательных топиках. Два паттерна.

Паттерн 1: отдельный restricted topic

customer-service публикует два топика:

customers.events          ← широкий, только customerId и non-PII метаданные
customers.events.pii      ← restricted, с полным профилем

ACL для customers.events.pii:

READ: notification-service-prod, customer-support-service-prod

order-service, billing-service, analytics-service подписаны на customers.events — видят только customerId. Сервисы с обоснованной потребностью в PII получают ACL на restricted топик.

// customer-service: payload широкого топика
interface CustomerRegisteredEvent {
  eventId:       string;
  eventType:     'CustomerRegistered';
  occurredAt:    string;
  aggregateId:   string;   // customerId
  tier:          'standard' | 'premium';
  // email, phone — отсутствуют намеренно
}

// payload restricted топика
interface CustomerRegisteredPiiEvent extends CustomerRegisteredEvent {
  email:   string;
  phone:   string;
}

Паттерн 2: слабая ссылка

Самый распространённый и рекомендуемый подход. Широкий топик содержит только customerId; сервис, которому нужен PII, запрашивает его через HTTP.

// notification-service: consumer orders.confirmed
eachMessage: async ({ message }) => {
  const event = orderConfirmedSchema.parse(JSON.parse(message.value!.toString()));
  // event.customerId — есть, email — нет

  const customer = await this.customerClient.getContactInfo(event.customerId);
  // HTTP GET /customers/{customerId}/contact → { email, phone }

  await this.emailSender.send({
    to:      customer.email,
    subject: `Заказ #${event.aggregateId} подтверждён`,
    amount:  event.totalAmount,
  });
},

PII никогда не оседает в Kafka: не в DLQ, не в retention-логах, не в backup брокера. Каждый запрос PII — отдельный audit-логируемый HTTP-вызов к Customer-сервису.

Цена — дополнительная нагрузка на Customer-сервис. Для большинства случаев это приемлемо; если объём велик — кеш с коротким TTL в notification-service.

Что запрещено

АнтипаттернПравилоЧто взамен
ssl: false / отсутствие ssl в продакшенеR-KFK-SEC-X1ssl: true + sasl в KafkaConfig
Один service-account (username: 'app') на весь кластерR-KFK-SEC-X2per-service SASL-username + ACL
PII (email, phone) в широковещательных топикахR-KFK-SEC-3restricted topic или слабая ссылка через customerId
sasl.mechanism: 'plain' в продакшенеR-KFK-SEC-1'scram-sha-512' или mTLS
Credentials в kafka.config.ts или .env в репозиторииR-KFK-SEC-2env через CI/CD secrets / Vault
clientId одинаковый у двух разных сервисовR-KFK-SEC-2уникальный clientId per-сервис, совпадает с SASL-username
ACL только на Read, без Write на свои топикиR-KFK-SEC-2явный Write + Describe на исходящие топики

Куда дальше

  • КонфигурацияKafkaConfig с ssl/sasl-полями через class-validator, fail-fast на старте.
  • Produceridempotent: true, acks: -1, partition key.
  • ConsumerautoCommit: false, groupId, fromBeginning.
  • Event design — payload без PII и внутренних агрегатов.
  • Idempotent consumerprocessed_event, dedup по eventId.
  • Observabilityprom-client, lag-алерты, traceparent в headers.
  • Outbox publishing — relay через @Interval, setLock.
  • Retry topic + DLQ — retry-топики, x-attempt, DLQ без проглатывания.