---
title: "Kafka Security в Node — SSL/SASL, ACL per-сервис и restricted PII topics"
nav_title: "Security"
excerpt: "kafkajs ssl+sasl обязательны в проде, PLAINTEXT — только локально. Per-service ACL ограничивает blast radius, PII — restricted topic."
keywords: "kafkajs SSL SASL NestJS, Kafka ACL per-service, PII restricted topic kafkajs, R-KFK-SEC NestJS, scram-sha-512 kafkajs, blast radius Kafka, clientId Kafka ACL"
focus_keyword: "kafkajs SSL SASL ACL security"
tags: ["kafka", "node", "typescript", "nestjs", "security", "R-KFK-SEC"]
---

# Kafka Security в Node — SSL/SASL, ACL per-сервис и restricted PII topics

> **Опирается на правила:** `R-KFK-SEC-1` … `R-KFK-SEC-3` и `R-KFK-SEC-X1` … `R-KFK-SEC-X2` из Kafka Rules → [раздел 9. Security](/standards/backend/kafka/#9-security).

> **Важно знать**
> - В продакшене `new Kafka({ ssl: true, sasl: { mechanism: 'scram-sha-512', username, password } })` — обязательно; `ssl: false` даже с SASL недопустимо.
> - `PLAINTEXT` — только в локальном `docker-compose`; в CI-окружении с сетевым брокером — уже минимум TLS.
> - `clientId` в `KafkaConfig` — per-service-account; именно он идентифицирует сервис для ACL на брокере.
> - Один service-account на весь кластер: компрометация одного сервиса даёт доступ ко всем топикам — недопустимо.
> - PII-поля (`email`, `phone`, `inn`) — никогда в широковещательных топиках (`orders.confirmed`); только `customerId`, за full PII — HTTP-запрос к Customer-сервису.
> - Credentials (`username`, `password`, пути к сертификатам) — env / Vault; никогда в `kafka.config.ts` или `.env`-файле в репозитории.
> - `ssl.endpoint.identification.algorithm` kafkajs не выставляется отдельно: hostname-верификация включена по умолчанию при `ssl: true`; отключать не нужно.

Kafka — общий бус между сервисами. Без security broker становится open relay: любой компонент видит все сообщения, любой может публиковать в любой топик, PII разлетается по consumer'ам. UCP формулирует три слоя защиты: транспортный (TLS), авторизационный (ACL per-сервис) и data-classification (PII в restricted topics). В Node эти слои реализуются через kafkajs-опции `ssl`/`sasl` и `clientId` в `KafkaConfig`.

## TLS обязателен (`R-KFK-SEC-1`)

В kafkajs транспортная защита задаётся при создании клиента — один раз в `KafkaModule`.

```ts
// infra/kafka/kafka.module.ts
import { Kafka } from 'kafkajs';
import { ConfigService } from '@nestjs/config';
import { KafkaConfig } from '../config/kafka.config';

{
  provide: Kafka,
  inject: [ConfigService],
  useFactory: (cfg: ConfigService<KafkaConfig>) => new Kafka({
    clientId: cfg.get('clientId', { infer: true }),
    brokers:  cfg.get('brokers',  { infer: true }).split(','),
    ssl:  true,
    sasl: {
      mechanism: 'scram-sha-512',
      username:  cfg.get('saslUsername', { infer: true }),
      password:  cfg.get('saslPassword', { infer: true }),
    },
  }),
}
```

`ssl: true` — kafkajs устанавливает TLS-соединение; hostname-верификация включена по умолчанию. Все сообщения, заголовки, токены, payload шифруются в транзите.

`sasl.mechanism: 'scram-sha-512'` — аутентификация поверх TLS. Алгоритм важен: `PLAIN` over TLS допустим для test, но в продакшене password оказывается в открытом виде после TLS-decrypt на брокере; `SCRAM-SHA-512` хеширует credentials до передачи.

Если используются клиентские сертификаты (mTLS), kafkajs принимает `ssl` как объект `tls.ConnectionOptions`:

```ts
import { readFileSync } from 'fs';

ssl: {
  rejectUnauthorized: true,
  ca:   [readFileSync(cfg.get('sslCaPath',   { infer: true }))],
  cert:  readFileSync(cfg.get('sslCertPath', { infer: true })),
  key:   readFileSync(cfg.get('sslKeyPath',  { infer: true })),
},
```

При mTLS отдельного SASL не нужно — identity client-сертификата достаточно для ACL.

### Никогда `ssl: false` в продакшене

```ts
// ПЛОХО
new Kafka({ clientId: 'order-service', brokers: ['kafka-prod:9092'] })
// ssl опущен → plaintext; весь трафик в открытом виде
```

В сетевом capture — все сообщения, заголовки, SASL credentials. Insider-атака возможна на любом hop в сети.

`ssl: false` (или отсутствие параметра) — только локальный `docker-compose` с single-broker в изолированной сети.

### `KafkaConfig` с SSL/SASL-полями

```ts
// infra/config/kafka.config.ts
import { IsNotEmpty, IsString } from 'class-validator';

export class KafkaConfig {
  @IsString() @IsNotEmpty()
  clientId: string;

  @IsString() @IsNotEmpty()
  brokers: string;

  @IsString() @IsNotEmpty()
  saslUsername: string;

  @IsString() @IsNotEmpty()
  saslPassword: string;
}
```

```dotenv
# .env.production (через CI/CD secrets / Vault — никогда в репозитории)
KAFKA_CLIENT_ID=order-service-prod
KAFKA_BROKERS=kafka-prod-1:9093,kafka-prod-2:9093,kafka-prod-3:9093
KAFKA_SASL_USERNAME=order-service-prod
KAFKA_SASL_PASSWORD=<из Vault>
```

## ACL per-сервис (`R-KFK-SEC-2`)

Каждый сервис имеет собственный service-account, ACL ограничивает его только необходимыми топиками.

```
service-account: order-service-prod
  ACL:
    READ:   payment.events, inventory.events
    WRITE:  orders.confirmed, orders.cancelled, orders.created

service-account: billing-service-prod
  ACL:
    READ:   orders.confirmed
    WRITE:  billing.invoice.created
```

`order-service-prod` не может писать в `payment.events` — это публикует `payment-service-prod`. Если в `order-service` есть уязвимость или ошибка в коде — blast radius ограничен его ACL.

Идентификация client → service-account идёт через SASL-username или CN в mTLS-сертификате. В kafkajs `clientId` — это identity в логах брокера; для ACL важен именно SASL-username (они обычно совпадают по соглашению).

ACL управляются через `kafka-acls.sh` или IaC (Terraform, Strimzi operator). Проектирование ACL — DevOps/SRE; приложение использует `clientId`/credentials из `KafkaConfig` — не знает о деталях ACL.

```bash
# пример Strimzi KafkaUser
kubectl apply -f - <<EOF
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaUser
metadata:
  name: order-service-prod
spec:
  authentication:
    type: scram-sha-512
  authorization:
    type: simple
    acls:
      - resource: { type: topic, name: orders.confirmed }
        operations: [Write, Describe]
      - resource: { type: topic, name: payment.events }
        operations: [Read, Describe]
EOF
```

### Один service-account на весь кластер — недопустимо

```ts
// ПЛОХО: все сервисы используют одни credentials
new Kafka({ clientId: 'app', sasl: { username: 'app', password: 'secret' } })
```

Последствия:
- **Blast radius**: компрометация `order-service` (через RCE, утечку secrets) даёт attacker'у доступ ко всем топикам, включая `payment.events` и `customers.pii`.
- **Audit gap**: брокер видит действия от `app` — непонятно, какой именно сервис опубликовал spurious event.
- **Ошибки кода**: bug в `order-service` может случайно писать в топики `billing-service` — потребитель получит «лишние» события.

## PII в restricted topics (`R-KFK-SEC-3`)

`email`, `phone`, `inn`, адрес — никогда не появляются в широковещательных топиках. Два паттерна.

### Паттерн 1: отдельный restricted topic

`customer-service` публикует два топика:

```
customers.events          ← широкий, только customerId и non-PII метаданные
customers.events.pii      ← restricted, с полным профилем
```

ACL для `customers.events.pii`:
```
READ: notification-service-prod, customer-support-service-prod
```

`order-service`, `billing-service`, `analytics-service` подписаны на `customers.events` — видят только `customerId`. Сервисы с обоснованной потребностью в PII получают ACL на restricted топик.

```ts
// customer-service: payload широкого топика
interface CustomerRegisteredEvent {
  eventId:       string;
  eventType:     'CustomerRegistered';
  occurredAt:    string;
  aggregateId:   string;   // customerId
  tier:          'standard' | 'premium';
  // email, phone — отсутствуют намеренно
}

// payload restricted топика
interface CustomerRegisteredPiiEvent extends CustomerRegisteredEvent {
  email:   string;
  phone:   string;
}
```

### Паттерн 2: слабая ссылка

Самый распространённый и рекомендуемый подход. Широкий топик содержит только `customerId`; сервис, которому нужен PII, запрашивает его через HTTP.

```ts
// notification-service: consumer orders.confirmed
eachMessage: async ({ message }) => {
  const event = orderConfirmedSchema.parse(JSON.parse(message.value!.toString()));
  // event.customerId — есть, email — нет

  const customer = await this.customerClient.getContactInfo(event.customerId);
  // HTTP GET /customers/{customerId}/contact → { email, phone }

  await this.emailSender.send({
    to:      customer.email,
    subject: `Заказ #${event.aggregateId} подтверждён`,
    amount:  event.totalAmount,
  });
},
```

PII никогда не оседает в Kafka: не в DLQ, не в retention-логах, не в backup брокера. Каждый запрос PII — отдельный audit-логируемый HTTP-вызов к Customer-сервису.

Цена — дополнительная нагрузка на Customer-сервис. Для большинства случаев это приемлемо; если объём велик — кеш с коротким TTL в notification-service.

## Что запрещено

| Антипаттерн | Правило | Что взамен |
|---|---|---|
| `ssl: false` / отсутствие `ssl` в продакшене | `R-KFK-SEC-X1` | `ssl: true` + `sasl` в `KafkaConfig` |
| Один service-account (`username: 'app'`) на весь кластер | `R-KFK-SEC-X2` | per-service SASL-username + ACL |
| PII (`email`, `phone`) в широковещательных топиках | `R-KFK-SEC-3` | restricted topic или слабая ссылка через `customerId` |
| `sasl.mechanism: 'plain'` в продакшене | `R-KFK-SEC-1` | `'scram-sha-512'` или mTLS |
| Credentials в `kafka.config.ts` или `.env` в репозитории | `R-KFK-SEC-2` | env через CI/CD secrets / Vault |
| `clientId` одинаковый у двух разных сервисов | `R-KFK-SEC-2` | уникальный `clientId` per-сервис, совпадает с SASL-username |
| ACL только на `Read`, без `Write` на свои топики | `R-KFK-SEC-2` | явный `Write` + `Describe` на исходящие топики |

## Куда дальше

- [Конфигурация](/standards/backend/kafka/node/configuration.md) — `KafkaConfig` с `ssl`/`sasl`-полями через `class-validator`, fail-fast на старте.
- [Producer](/standards/backend/kafka/node/producer.md) — `idempotent: true`, `acks: -1`, partition key.
- [Consumer](/standards/backend/kafka/node/consumer.md) — `autoCommit: false`, `groupId`, `fromBeginning`.
- [Event design](/standards/backend/kafka/node/event-design.md) — payload без PII и внутренних агрегатов.
- [Idempotent consumer](/standards/backend/kafka/node/idempotent-consumer.md) — `processed_event`, dedup по `eventId`.
- [Observability](/standards/backend/kafka/node/observability.md) — `prom-client`, lag-алерты, `traceparent` в headers.
- [Outbox publishing](/standards/backend/kafka/node/outbox-publishing.md) — relay через `@Interval`, `setLock`.
- [Retry topic + DLQ](/standards/backend/kafka/node/retry-and-dlq.md) — retry-топики, `x-attempt`, DLQ без проглатывания.
