Опирается на правила: R-KFK-SEC-1R-KFK-SEC-3 и R-KFK-SEC-X1R-KFK-SEC-X2 из Kafka Style Guide → раздел 9. Security.

Важно знать

  • TLS обязателен в проде: security_protocol="SSL" или "SASL_SSL". "PLAINTEXT" — только локальная разработка.
  • ssl_check_hostname=True (дефолт aiokafka) — не отключать: без этого возможен MITM.
  • ACL per-сервис через уникальный client_id в KafkaSettings; права только на нужные топики.
  • Один service-account на весь кластер — нет blast-radius containment: компрометация одного сервиса даёт доступ ко всем топикам.
  • PII — отдельные restricted топики с узким ACL, либо «слабая ссылка»: в широком топике customer_id, PII подгружается через customer-service.
  • Credentials (ssl_password, SASL-пароль) — только через env, никогда в settings.py / pyproject.toml без pydantic-settings.
  • SASL/PLAIN over PLAINTEXT запрещено; SASL/SCRAM-SHA-512 over TLS (SASL_SSL) — допустимо когда client-сертификаты тяжело раздавать.

Kafka — общий бус между сервисами. Без security broker становится open relay: любой компонент видит все сообщения, любой может публиковать в любой topic, PII разлетается по подписчикам. UCP формулирует три слоя защиты: транспортный (TLS), авторизационный (ACL), data-classification (PII в restricted topics).

TLS обязателен

R-KFK-SEC-1: cross-network — только зашифрованно.

KafkaSettings через pydantic-settings:

from pydantic import Field
from pydantic_settings import BaseSettings

class KafkaSettings(BaseSettings):
    brokers: str = Field(..., alias="KAFKA_BROKERS")
    client_id: str = Field(..., alias="KAFKA_CLIENT_ID")
    ssl_cafile: str | None = Field(None, alias="KAFKA_SSL_CAFILE")
    ssl_certfile: str | None = Field(None, alias="KAFKA_SSL_CERTFILE")
    ssl_keyfile: str | None = Field(None, alias="KAFKA_SSL_KEYFILE")
    ssl_password: str | None = Field(None, alias="KAFKA_SSL_PASSWORD")
    sasl_mechanism: str | None = Field(None, alias="KAFKA_SASL_MECHANISM")
    sasl_plain_username: str | None = Field(None, alias="KAFKA_SASL_USERNAME")
    sasl_plain_password: str | None = Field(None, alias="KAFKA_SASL_PASSWORD")

    model_config = {"env_file": ".env", "populate_by_name": True}

Producer с mTLS (mutual TLS):

import ssl
from aiokafka import AIOKafkaProducer

def build_ssl_context(settings: KafkaSettings) -> ssl.SSLContext:
    ctx = ssl.create_default_context(ssl.Purpose.SERVER_AUTH, cafile=settings.ssl_cafile)
    if settings.ssl_certfile and settings.ssl_keyfile:
        ctx.load_cert_chain(
            certfile=settings.ssl_certfile,
            keyfile=settings.ssl_keyfile,
            password=settings.ssl_password,
        )
    return ctx

async def build_producer(settings: KafkaSettings) -> AIOKafkaProducer:
    ssl_ctx = build_ssl_context(settings)
    return AIOKafkaProducer(
        bootstrap_servers=settings.brokers,
        client_id=settings.client_id,       # identity для ACL (R-KFK-SEC-2)
        security_protocol="SSL",
        ssl_context=ssl_ctx,
        enable_idempotence=True,            # R-KFK-PROD-1
        value_serializer=lambda v: json.dumps(v).encode(),
    )

ssl_check_hostname=True — дефолт ssl.SSLContext; не отключать. Broker проверяет, что hostname в сертификате совпадает с адресом подключения. Без этого MITM возможен.

SASL/SCRAM-SHA-512 поверх TLS

Когда клиентские сертификаты тяжело раздавать — SASL_SSL с SCRAM-SHA-512:

async def build_producer_sasl(settings: KafkaSettings) -> AIOKafkaProducer:
    ssl_ctx = ssl.create_default_context(cafile=settings.ssl_cafile)
    return AIOKafkaProducer(
        bootstrap_servers=settings.brokers,
        client_id=settings.client_id,
        security_protocol="SASL_SSL",
        sasl_mechanism="SCRAM-SHA-512",
        sasl_plain_username=settings.sasl_plain_username,
        sasl_plain_password=settings.sasl_plain_password,
        ssl_context=ssl_ctx,
        enable_idempotence=True,
    )

SASL/PLAIN over TLS (SASL_SSL + PLAIN) — допустимо в test-окружении; не для prod (password в открытом виде внутри TLS-сессии).

Никогда PLAINTEXT в проде

# ЗАПРЕЩЕНО (R-KFK-SEC-X1)
producer = AIOKafkaProducer(
    bootstrap_servers="kafka:9092",
    security_protocol="PLAINTEXT",   # весь payload в сети — открытым текстом
)

В сетевом перехвате — все сообщения, headers, токены. Только локальная разработка (docker-compose, изолированная сеть).

ACL per-сервис

R-KFK-SEC-2: каждый сервис имеет отдельный client_id; ACL только на нужные топики.

service-account: order-service-prod
  ACL READ:   payment.events, inventory.events
  ACL WRITE:  orders.created, orders.confirmed, orders.cancelled

service-account: billing-service-prod
  ACL READ:   orders.confirmed
  ACL WRITE:  billing.invoice.created

order-service-prod не может писать в payment.events — только payment-service-prod. Идентификация через CN в mTLS-сертификате (CN=order-service-prod) или SASL-username.

client_id в KafkaSettings — это идентификатор, который application передаёт в Kafka. Он же используется при заведении ACL в IaC (Terraform/Pulumi). Application не управляет ACL — это DevOps/SRE; application лишь называет себя правильно.

# Пример: разные KafkaSettings для разных сервисов
# .env (order-service)
# KAFKA_CLIENT_ID=order-service-prod

# .env (billing-service)
# KAFKA_CLIENT_ID=billing-service-prod

Никогда один service-account

# ЗАПРЕЩЕНО (R-KFK-SEC-X2)
# Все сервисы с одним kafka_username="app" и ACL READ+WRITE *
# Компрометация billing-service → attacker читает payment.events

Что ломается при одном аккаунте:

  • Blast radius — компрометация одного сервиса даёт доступ ко всем топикам.
  • Audit — невозможно понять, кто опубликовал spurious event.
  • Случайные ошибки — bug в order-service пишет в payment.events, downstream обрабатывает «лишнее».

PII в restricted topics

R-KFK-SEC-3: два паттерна.

Паттерн 1 — restricted topic

customer-service публикует:
  customer.events           ← широкий, только customer_id
  customer.events.pii       ← restricted, email/phone/address

ACL для customer.events.pii:
  READ: notification-service, customer-support-service
# CustomerRegistered — широкий топик, без PII
@dataclass(frozen=True)
class CustomerRegistered:
    event_id: UUID
    occurred_at: datetime
    customer_id: UUID          # только id, не email/phone (R-KFK-EVT-X3)
    tier: str

# Публикация в широкий топик — любой сервис может читать
await producer.send_and_wait(
    topic="customer.events",
    key=str(event.customer_id).encode(),
    value=dataclasses.asdict(event),
)
# CustomerPIIUpdated — restricted топик
@dataclass(frozen=True)
class CustomerPIIUpdated:
    event_id: UUID
    occurred_at: datetime
    customer_id: UUID
    email: str
    phone: str | None

# Публикация в restricted топик — узкий ACL
await producer.send_and_wait(
    topic="customer.events.pii",
    key=str(event.customer_id).encode(),
    value=dataclasses.asdict(event),
)

Паттерн 2 — слабая ссылка

PII никогда не появляется в Kafka. Когда notification-service нужен email — отдельный HTTP-запрос:

async def handle_order_confirmed(
    event: OrderConfirmed,
    customer_client: CustomerServiceClient,
    notifier: EmailNotifier,
) -> None:
    # В широком топике только customer_id (R-KFK-SEC-3)
    contact = await customer_client.get_contact(event.customer_id)
    await notifier.send_order_confirmed(contact.email, event.order_id, event.total)

Каждый запрос PII — отдельный audit-log на стороне customer-service. PII не попадает в Kafka backup, DLQ, logs.

Цена — дополнительная HTTP-нагрузка. Для большинства случаев слабая ссылка предпочтительнее: меньше поверхность атаки, проще выполнение требований GDPR.

Credentials через env

Никогда в коде:

# ЗАПРЕЩЕНО
producer = AIOKafkaProducer(
    sasl_plain_password="supersecret",  # в git-истории навсегда
)

Только через pydantic-settings из env / Vault:

# .env (не коммитить) / Kubernetes Secret / Vault Agent
KAFKA_BROKERS=kafka.internal:9093
KAFKA_CLIENT_ID=order-service-prod
KAFKA_SSL_CAFILE=/etc/ssl/kafka/ca.crt
KAFKA_SSL_CERTFILE=/etc/ssl/kafka/order-service.crt
KAFKA_SSL_KEYFILE=/etc/ssl/kafka/order-service.key
settings = KafkaSettings()   # читает из env; если поле пустое — ValidationError на старте

pydantic-settings с Field(..., alias="KAFKA_SSL_PASSWORD") — если переменная не задана, приложение падает при старте (аналог R-KFK-CFG-4), а не в рантайме при попытке подключения.

Что запрещено

АнтипаттернПравилоЧто взамен
security_protocol="PLAINTEXT" в продеR-KFK-SEC-X1"SSL" или "SASL_SSL"
Один client_id/service-account на весь кластерR-KFK-SEC-X2per-сервис client_id + per-сервис ACL
PII в широковещательных топикахR-KFK-SEC-3restricted topic или слабая ссылка (customer_id)
ssl_check_hostname=FalseR-KFK-SEC-1дефолт True; отключать только в тестах
SASL/PLAIN over PLAINTEXTR-KFK-SEC-1минимум SASL_SSL
Credentials в коде / settings.py без Field(alias=...) из envR-KFK-SEC-2pydantic-settings + env / Vault
ACL только на READ, без WRITER-KFK-SEC-2права на обе операции + --operation Create при создании топиков

Куда дальше

  • Конфигурация — KafkaSettings через pydantic-settings, credentials через env.
  • Event design — PII не в payload broadcast-топиков (R-KFK-EVT-X3).
  • Producer — идемпотентный producer, partition key, JSON-сериализация.
  • Consumer — manual commit, group_id, auto_offset_reset.
  • Idempotent consumer — дедупликация по event_id.
  • Observability — traceparent в headers, lag alerts, DLQ monitoring.
  • Outbox publishing — domain-события через outbox, не прямой send.
  • Retry topic + DLQ — retry-топики, DLQ, max-attempts.