Опирается на правила:
R-KFK-SEC-1…R-KFK-SEC-3иR-KFK-SEC-X1…R-KFK-SEC-X2из Kafka Style Guide → раздел 9. Security.
Важно знать
- TLS обязателен в проде:
security_protocol="SSL"или"SASL_SSL"."PLAINTEXT"— только локальная разработка.ssl_check_hostname=True(дефолт aiokafka) — не отключать: без этого возможен MITM.- ACL per-сервис через уникальный
client_idвKafkaSettings; права только на нужные топики.- Один service-account на весь кластер — нет blast-radius containment: компрометация одного сервиса даёт доступ ко всем топикам.
- PII — отдельные restricted топики с узким ACL, либо «слабая ссылка»: в широком топике
customer_id, PII подгружается через customer-service.- Credentials (
ssl_password, SASL-пароль) — только через env, никогда вsettings.py/pyproject.tomlбезpydantic-settings.- SASL/PLAIN over PLAINTEXT запрещено; SASL/SCRAM-SHA-512 over TLS (
SASL_SSL) — допустимо когда client-сертификаты тяжело раздавать.
Kafka — общий бус между сервисами. Без security broker становится open relay: любой компонент видит все сообщения, любой может публиковать в любой topic, PII разлетается по подписчикам. UCP формулирует три слоя защиты: транспортный (TLS), авторизационный (ACL), data-classification (PII в restricted topics).
TLS обязателен
R-KFK-SEC-1: cross-network — только зашифрованно.
KafkaSettings через pydantic-settings:
from pydantic import Field
from pydantic_settings import BaseSettings
class KafkaSettings(BaseSettings):
brokers: str = Field(..., alias="KAFKA_BROKERS")
client_id: str = Field(..., alias="KAFKA_CLIENT_ID")
ssl_cafile: str | None = Field(None, alias="KAFKA_SSL_CAFILE")
ssl_certfile: str | None = Field(None, alias="KAFKA_SSL_CERTFILE")
ssl_keyfile: str | None = Field(None, alias="KAFKA_SSL_KEYFILE")
ssl_password: str | None = Field(None, alias="KAFKA_SSL_PASSWORD")
sasl_mechanism: str | None = Field(None, alias="KAFKA_SASL_MECHANISM")
sasl_plain_username: str | None = Field(None, alias="KAFKA_SASL_USERNAME")
sasl_plain_password: str | None = Field(None, alias="KAFKA_SASL_PASSWORD")
model_config = {"env_file": ".env", "populate_by_name": True}
Producer с mTLS (mutual TLS):
import ssl
from aiokafka import AIOKafkaProducer
def build_ssl_context(settings: KafkaSettings) -> ssl.SSLContext:
ctx = ssl.create_default_context(ssl.Purpose.SERVER_AUTH, cafile=settings.ssl_cafile)
if settings.ssl_certfile and settings.ssl_keyfile:
ctx.load_cert_chain(
certfile=settings.ssl_certfile,
keyfile=settings.ssl_keyfile,
password=settings.ssl_password,
)
return ctx
async def build_producer(settings: KafkaSettings) -> AIOKafkaProducer:
ssl_ctx = build_ssl_context(settings)
return AIOKafkaProducer(
bootstrap_servers=settings.brokers,
client_id=settings.client_id, # identity для ACL (R-KFK-SEC-2)
security_protocol="SSL",
ssl_context=ssl_ctx,
enable_idempotence=True, # R-KFK-PROD-1
value_serializer=lambda v: json.dumps(v).encode(),
)
ssl_check_hostname=True — дефолт ssl.SSLContext; не отключать. Broker проверяет, что hostname в сертификате совпадает с адресом подключения. Без этого MITM возможен.
SASL/SCRAM-SHA-512 поверх TLS
Когда клиентские сертификаты тяжело раздавать — SASL_SSL с SCRAM-SHA-512:
async def build_producer_sasl(settings: KafkaSettings) -> AIOKafkaProducer:
ssl_ctx = ssl.create_default_context(cafile=settings.ssl_cafile)
return AIOKafkaProducer(
bootstrap_servers=settings.brokers,
client_id=settings.client_id,
security_protocol="SASL_SSL",
sasl_mechanism="SCRAM-SHA-512",
sasl_plain_username=settings.sasl_plain_username,
sasl_plain_password=settings.sasl_plain_password,
ssl_context=ssl_ctx,
enable_idempotence=True,
)
SASL/PLAIN over TLS (SASL_SSL + PLAIN) — допустимо в test-окружении; не для prod (password в открытом виде внутри TLS-сессии).
Никогда PLAINTEXT в проде
# ЗАПРЕЩЕНО (R-KFK-SEC-X1)
producer = AIOKafkaProducer(
bootstrap_servers="kafka:9092",
security_protocol="PLAINTEXT", # весь payload в сети — открытым текстом
)
В сетевом перехвате — все сообщения, headers, токены. Только локальная разработка (docker-compose, изолированная сеть).
ACL per-сервис
R-KFK-SEC-2: каждый сервис имеет отдельный client_id; ACL только на нужные топики.
service-account: order-service-prod
ACL READ: payment.events, inventory.events
ACL WRITE: orders.created, orders.confirmed, orders.cancelled
service-account: billing-service-prod
ACL READ: orders.confirmed
ACL WRITE: billing.invoice.created
order-service-prod не может писать в payment.events — только payment-service-prod. Идентификация через CN в mTLS-сертификате (CN=order-service-prod) или SASL-username.
client_id в KafkaSettings — это идентификатор, который application передаёт в Kafka. Он же используется при заведении ACL в IaC (Terraform/Pulumi). Application не управляет ACL — это DevOps/SRE; application лишь называет себя правильно.
# Пример: разные KafkaSettings для разных сервисов
# .env (order-service)
# KAFKA_CLIENT_ID=order-service-prod
# .env (billing-service)
# KAFKA_CLIENT_ID=billing-service-prod
Никогда один service-account
# ЗАПРЕЩЕНО (R-KFK-SEC-X2)
# Все сервисы с одним kafka_username="app" и ACL READ+WRITE *
# Компрометация billing-service → attacker читает payment.events
Что ломается при одном аккаунте:
- Blast radius — компрометация одного сервиса даёт доступ ко всем топикам.
- Audit — невозможно понять, кто опубликовал spurious event.
- Случайные ошибки — bug в
order-serviceпишет вpayment.events, downstream обрабатывает «лишнее».
PII в restricted topics
R-KFK-SEC-3: два паттерна.
Паттерн 1 — restricted topic
customer-service публикует:
customer.events ← широкий, только customer_id
customer.events.pii ← restricted, email/phone/address
ACL для customer.events.pii:
READ: notification-service, customer-support-service
# CustomerRegistered — широкий топик, без PII
@dataclass(frozen=True)
class CustomerRegistered:
event_id: UUID
occurred_at: datetime
customer_id: UUID # только id, не email/phone (R-KFK-EVT-X3)
tier: str
# Публикация в широкий топик — любой сервис может читать
await producer.send_and_wait(
topic="customer.events",
key=str(event.customer_id).encode(),
value=dataclasses.asdict(event),
)
# CustomerPIIUpdated — restricted топик
@dataclass(frozen=True)
class CustomerPIIUpdated:
event_id: UUID
occurred_at: datetime
customer_id: UUID
email: str
phone: str | None
# Публикация в restricted топик — узкий ACL
await producer.send_and_wait(
topic="customer.events.pii",
key=str(event.customer_id).encode(),
value=dataclasses.asdict(event),
)
Паттерн 2 — слабая ссылка
PII никогда не появляется в Kafka. Когда notification-service нужен email — отдельный HTTP-запрос:
async def handle_order_confirmed(
event: OrderConfirmed,
customer_client: CustomerServiceClient,
notifier: EmailNotifier,
) -> None:
# В широком топике только customer_id (R-KFK-SEC-3)
contact = await customer_client.get_contact(event.customer_id)
await notifier.send_order_confirmed(contact.email, event.order_id, event.total)
Каждый запрос PII — отдельный audit-log на стороне customer-service. PII не попадает в Kafka backup, DLQ, logs.
Цена — дополнительная HTTP-нагрузка. Для большинства случаев слабая ссылка предпочтительнее: меньше поверхность атаки, проще выполнение требований GDPR.
Credentials через env
Никогда в коде:
# ЗАПРЕЩЕНО
producer = AIOKafkaProducer(
sasl_plain_password="supersecret", # в git-истории навсегда
)
Только через pydantic-settings из env / Vault:
# .env (не коммитить) / Kubernetes Secret / Vault Agent
KAFKA_BROKERS=kafka.internal:9093
KAFKA_CLIENT_ID=order-service-prod
KAFKA_SSL_CAFILE=/etc/ssl/kafka/ca.crt
KAFKA_SSL_CERTFILE=/etc/ssl/kafka/order-service.crt
KAFKA_SSL_KEYFILE=/etc/ssl/kafka/order-service.key
settings = KafkaSettings() # читает из env; если поле пустое — ValidationError на старте
pydantic-settings с Field(..., alias="KAFKA_SSL_PASSWORD") — если переменная не задана, приложение падает при старте (аналог R-KFK-CFG-4), а не в рантайме при попытке подключения.
Что запрещено
| Антипаттерн | Правило | Что взамен |
|---|---|---|
security_protocol="PLAINTEXT" в проде | R-KFK-SEC-X1 | "SSL" или "SASL_SSL" |
Один client_id/service-account на весь кластер | R-KFK-SEC-X2 | per-сервис client_id + per-сервис ACL |
| PII в широковещательных топиках | R-KFK-SEC-3 | restricted topic или слабая ссылка (customer_id) |
ssl_check_hostname=False | R-KFK-SEC-1 | дефолт True; отключать только в тестах |
| SASL/PLAIN over PLAINTEXT | R-KFK-SEC-1 | минимум SASL_SSL |
Credentials в коде / settings.py без Field(alias=...) из env | R-KFK-SEC-2 | pydantic-settings + env / Vault |
ACL только на READ, без WRITE | R-KFK-SEC-2 | права на обе операции + --operation Create при создании топиков |
Куда дальше
- Конфигурация —
KafkaSettingsчерезpydantic-settings, credentials через env. - Event design — PII не в payload broadcast-топиков (
R-KFK-EVT-X3). - Producer — идемпотентный producer, partition key, JSON-сериализация.
- Consumer — manual commit,
group_id,auto_offset_reset. - Idempotent consumer — дедупликация по
event_id. - Observability —
traceparentв headers, lag alerts, DLQ monitoring. - Outbox publishing — domain-события через outbox, не прямой
send. - Retry topic + DLQ — retry-топики, DLQ, max-attempts.