Опирается на правила:
R-KFK-SEC-1…R-KFK-SEC-3иR-KFK-SEC-X1…R-KFK-SEC-X2из Kafka Rules → раздел 9. Security.
Важно знать
- В продакшене
new Kafka({ ssl: true, sasl: { mechanism: 'scram-sha-512', username, password } })— обязательно;ssl: falseдаже с SASL недопустимо.PLAINTEXT— только в локальномdocker-compose; в CI-окружении с сетевым брокером — уже минимум TLS.clientIdвKafkaConfig— per-service-account; именно он идентифицирует сервис для ACL на брокере.- Один service-account на весь кластер: компрометация одного сервиса даёт доступ ко всем топикам — недопустимо.
- PII-поля (
phone,inn) — никогда в широковещательных топиках (orders.confirmed); толькоcustomerId, за full PII — HTTP-запрос к Customer-сервису.- Credentials (
username,password, пути к сертификатам) — env / Vault; никогда вkafka.config.tsили.env-файле в репозитории.ssl.endpoint.identification.algorithmkafkajs не выставляется отдельно: hostname-верификация включена по умолчанию приssl: true; отключать не нужно.
Kafka — общий бус между сервисами. Без security broker становится open relay: любой компонент видит все сообщения, любой может публиковать в любой топик, PII разлетается по consumer'ам. UCP формулирует три слоя защиты: транспортный (TLS), авторизационный (ACL per-сервис) и data-classification (PII в restricted topics). В Node эти слои реализуются через kafkajs-опции ssl/sasl и clientId в KafkaConfig.
TLS обязателен (R-KFK-SEC-1)
В kafkajs транспортная защита задаётся при создании клиента — один раз в KafkaModule.
// infra/kafka/kafka.module.ts
import { Kafka } from 'kafkajs';
import { ConfigService } from '@nestjs/config';
import { KafkaConfig } from '../config/kafka.config';
{
provide: Kafka,
inject: [ConfigService],
useFactory: (cfg: ConfigService<KafkaConfig>) => new Kafka({
clientId: cfg.get('clientId', { infer: true }),
brokers: cfg.get('brokers', { infer: true }).split(','),
ssl: true,
sasl: {
mechanism: 'scram-sha-512',
username: cfg.get('saslUsername', { infer: true }),
password: cfg.get('saslPassword', { infer: true }),
},
}),
}
ssl: true — kafkajs устанавливает TLS-соединение; hostname-верификация включена по умолчанию. Все сообщения, заголовки, токены, payload шифруются в транзите.
sasl.mechanism: 'scram-sha-512' — аутентификация поверх TLS. Алгоритм важен: PLAIN over TLS допустим для test, но в продакшене password оказывается в открытом виде после TLS-decrypt на брокере; SCRAM-SHA-512 хеширует credentials до передачи.
Если используются клиентские сертификаты (mTLS), kafkajs принимает ssl как объект tls.ConnectionOptions:
import { readFileSync } from 'fs';
ssl: {
rejectUnauthorized: true,
ca: [readFileSync(cfg.get('sslCaPath', { infer: true }))],
cert: readFileSync(cfg.get('sslCertPath', { infer: true })),
key: readFileSync(cfg.get('sslKeyPath', { infer: true })),
},
При mTLS отдельного SASL не нужно — identity client-сертификата достаточно для ACL.
Никогда ssl: false в продакшене
// ПЛОХО
new Kafka({ clientId: 'order-service', brokers: ['kafka-prod:9092'] })
// ssl опущен → plaintext; весь трафик в открытом виде
В сетевом capture — все сообщения, заголовки, SASL credentials. Insider-атака возможна на любом hop в сети.
ssl: false (или отсутствие параметра) — только локальный docker-compose с single-broker в изолированной сети.
KafkaConfig с SSL/SASL-полями
// infra/config/kafka.config.ts
import { IsNotEmpty, IsString } from 'class-validator';
export class KafkaConfig {
@IsString() @IsNotEmpty()
clientId: string;
@IsString() @IsNotEmpty()
brokers: string;
@IsString() @IsNotEmpty()
saslUsername: string;
@IsString() @IsNotEmpty()
saslPassword: string;
}
# .env.production (через CI/CD secrets / Vault — никогда в репозитории)
KAFKA_CLIENT_ID=order-service-prod
KAFKA_BROKERS=kafka-prod-1:9093,kafka-prod-2:9093,kafka-prod-3:9093
KAFKA_SASL_USERNAME=order-service-prod
KAFKA_SASL_PASSWORD=<из Vault>
ACL per-сервис (R-KFK-SEC-2)
Каждый сервис имеет собственный service-account, ACL ограничивает его только необходимыми топиками.
service-account: order-service-prod
ACL:
READ: payment.events, inventory.events
WRITE: orders.confirmed, orders.cancelled, orders.created
service-account: billing-service-prod
ACL:
READ: orders.confirmed
WRITE: billing.invoice.created
order-service-prod не может писать в payment.events — это публикует payment-service-prod. Если в order-service есть уязвимость или ошибка в коде — blast radius ограничен его ACL.
Идентификация client → service-account идёт через SASL-username или CN в mTLS-сертификате. В kafkajs clientId — это identity в логах брокера; для ACL важен именно SASL-username (они обычно совпадают по соглашению).
ACL управляются через kafka-acls.sh или IaC (Terraform, Strimzi operator). Проектирование ACL — DevOps/SRE; приложение использует clientId/credentials из KafkaConfig — не знает о деталях ACL.
# пример Strimzi KafkaUser
kubectl apply -f - <<EOF
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
name: order-service-prod
spec:
authentication:
type: scram-sha-512
authorization:
type: simple
acls:
- resource: { type: topic, name: orders.confirmed }
operations: [Write, Describe]
- resource: { type: topic, name: payment.events }
operations: [Read, Describe]
EOF
Один service-account на весь кластер — недопустимо
// ПЛОХО: все сервисы используют одни credentials
new Kafka({ clientId: 'app', sasl: { username: 'app', password: 'secret' } })
Последствия:
- Blast radius: компрометация
order-service(через RCE, утечку secrets) даёт attacker'у доступ ко всем топикам, включаяpayment.eventsиcustomers.pii. - Audit gap: брокер видит действия от
app— непонятно, какой именно сервис опубликовал spurious event. - Ошибки кода: bug в
order-serviceможет случайно писать в топикиbilling-service— потребитель получит «лишние» события.
PII в restricted topics (R-KFK-SEC-3)
email, phone, inn, адрес — никогда не появляются в широковещательных топиках. Два паттерна.
Паттерн 1: отдельный restricted topic
customer-service публикует два топика:
customers.events ← широкий, только customerId и non-PII метаданные
customers.events.pii ← restricted, с полным профилем
ACL для customers.events.pii:
READ: notification-service-prod, customer-support-service-prod
order-service, billing-service, analytics-service подписаны на customers.events — видят только customerId. Сервисы с обоснованной потребностью в PII получают ACL на restricted топик.
// customer-service: payload широкого топика
interface CustomerRegisteredEvent {
eventId: string;
eventType: 'CustomerRegistered';
occurredAt: string;
aggregateId: string; // customerId
tier: 'standard' | 'premium';
// email, phone — отсутствуют намеренно
}
// payload restricted топика
interface CustomerRegisteredPiiEvent extends CustomerRegisteredEvent {
email: string;
phone: string;
}
Паттерн 2: слабая ссылка
Самый распространённый и рекомендуемый подход. Широкий топик содержит только customerId; сервис, которому нужен PII, запрашивает его через HTTP.
// notification-service: consumer orders.confirmed
eachMessage: async ({ message }) => {
const event = orderConfirmedSchema.parse(JSON.parse(message.value!.toString()));
// event.customerId — есть, email — нет
const customer = await this.customerClient.getContactInfo(event.customerId);
// HTTP GET /customers/{customerId}/contact → { email, phone }
await this.emailSender.send({
to: customer.email,
subject: `Заказ #${event.aggregateId} подтверждён`,
amount: event.totalAmount,
});
},
PII никогда не оседает в Kafka: не в DLQ, не в retention-логах, не в backup брокера. Каждый запрос PII — отдельный audit-логируемый HTTP-вызов к Customer-сервису.
Цена — дополнительная нагрузка на Customer-сервис. Для большинства случаев это приемлемо; если объём велик — кеш с коротким TTL в notification-service.
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
ssl: false / отсутствие ssl в продакшене | R-KFK-SEC-X1 | ssl: true + sasl в KafkaConfig |
Один service-account (username: 'app') на весь кластер | R-KFK-SEC-X2 | per-service SASL-username + ACL |
PII (email, phone) в широковещательных топиках | R-KFK-SEC-3 | restricted topic или слабая ссылка через customerId |
sasl.mechanism: 'plain' в продакшене | R-KFK-SEC-1 | 'scram-sha-512' или mTLS |
Credentials в kafka.config.ts или .env в репозитории | R-KFK-SEC-2 | env через CI/CD secrets / Vault |
clientId одинаковый у двух разных сервисов | R-KFK-SEC-2 | уникальный clientId per-сервис, совпадает с SASL-username |
ACL только на Read, без Write на свои топики | R-KFK-SEC-2 | явный Write + Describe на исходящие топики |
Куда дальше
- Конфигурация —
KafkaConfigсssl/sasl-полями черезclass-validator, fail-fast на старте. - Producer —
idempotent: true,acks: -1, partition key. - Consumer —
autoCommit: false,groupId,fromBeginning. - Event design — payload без PII и внутренних агрегатов.
- Idempotent consumer —
processed_event, dedup поeventId. - Observability —
prom-client, lag-алерты,traceparentв headers. - Outbox publishing — relay через
@Interval,setLock. - Retry topic + DLQ — retry-топики,
x-attempt, DLQ без проглатывания.