Kafka (стандарт)

Контракт работы с Kafka UCP (R-KFK-*): producer/consumer, outbox, retry и DLQ, дизайн событий. Java-биндинг (Spring Kafka) — статьи, Python (aiokafka) — скиллы ucp-py-kafka-*.

Профиль Python: статьи ниже описывают Java-биндинг этого контракта. Python-биндинг (style-guide и скиллы ucp-py-*) — в репозитории скиллов ↗.

Статья внедрена в скилл AI-агента ucp-kafka-review / ucp-kafka-design / ucp-py-kafka-review / ucp-py-kafka-design

Контракт этого раздела язык-нейтрален: правила означают одно и то же на любом стеке, меняется только реализация. Биндинги: Java/Spring — статьи этого раздела; Python/FastAPI — скиллы ucp-py-kafka-* в репозитории скиллов; Go и Node — в работе.

Свод правил работы с Kafka в Java/Spring-сервисах команды UCP: producer (idempotence, partition key), consumer (groups, offset, listener), outbox-relay для надёжной публикации, idempotent consumer с dedup, retry topic + DLQ вместо blocking retry, event design, observability lag. Каждое правило идентифицируется кодом (R-KFK-PROD-1, R-KFK-CONS-X1) — скилл ucp-kafka-review цитирует эти коды в findings.

Гайд опирается на Apache Kafka 3.x + spring-kafka (Spring Boot 3+). Не покрывает: Kafka cluster admin (broker config, replication, MirrorMaker — это инфра-уровень), Kafka Streams (R-KFK-STREAM-* — отдельная тема), ksqlDB.

Связанные стандарты:

  • PG-L-020/PG-L-021 (PG runtime) — outbox-relay через FOR UPDATE SKIP LOCKED.
  • R-RES-* (Resilience) — Kafka producer/consumer тоже outbound, имеют свои timeout/retry стратегии.
  • R-ENT-*/R-AGG-*/R-EVT-* (DDD tactical) — Domain Events как payload Kafka-сообщений.
  • AUTH-19 (Auth) — money-операции через Kafka требуют идемпотентности.
  • R-VLD-CFG-* (Validation) — KafkaSettings обязательно @Validated.

Содержание

  1. Producer — R-KFK-PROD-*
  2. Consumer — R-KFK-CONS-*
  3. Outbox publishing — R-KFK-OBX-*
  4. Idempotent consumer — R-KFK-IDEM-*
  5. Retry topic + DLQ — R-KFK-RTRY-*
  6. Event design — R-KFK-EVT-*
  7. Конфигурация — R-KFK-CFG-*
  8. Observability — R-KFK-OBS-*
  9. Security — R-KFK-SEC-*
  10. Антипаттерны — сводка R-KFK-*-X*

1. Producer

Подробно для человека: Kafka Producer — idempotence, acks=all, partition key.

1.1 Обязательно

R-KFK-PROD-1 — Producer всегда идемпотентный: enable.idempotence: true. Это автоматически: acks=all, retries=Integer.MAX_VALUE, max.in.flight.requests.per.connection ≤ 5. Гарантирует exactly-once на уровне partition (по producer-id).

spring.kafka.producer:
  properties:
    enable.idempotence: true
  acks: all

R-KFK-PROD-2 — Partition key — обязателен для всех бизнес-событий. Без ключа сообщения распределяются round-robin → теряется ordering для одного aggregate. Дефолтный ключ — aggregate id:

kafkaTemplate.send("orders.confirmed", order.getId().toString(), event);

Это гарантирует что все события одного order.id приходят на один partition в правильном порядке.

R-KFK-PROD-3 — Сериализация — JSON (JsonSerializer) по умолчанию. Для bandwidth-чувствительных топиков — Avro/Protobuf через Schema Registry (но это отдельная инфра, по умолчанию не настраивается).

R-KFK-PROD-4 — Не используй KafkaTemplate.send(...) напрямую из use-case handler-а для domain-событий. События идут через outbox pattern (R-KFK-OBX-*) — иначе при rollback транзакции события уже отправлены, появляются «несуществующие» события.

Допустимо прямой kafkaTemplate.send(...) только для:

  • Технических событий (audit-log в дополнение к *_audit_log таблице).
  • Метрик / health-сигналов.
  • Команд другим сервисам, не имеющих транзакционного контекста (например запрос на отчёт от admin-инструмента).

1.2 Запрещено

R-KFK-PROD-X1 — enable.idempotence: false в проде. Без идемпотентности retry на стороне producer создаёт дубликаты.

R-KFK-PROD-X2 — acks: 0 или acks: 1. 0 = fire-and-forget (потеря данных при broker rebalance); 1 = ack от leader без репликации (потеря при failure leader до replication). Только acks: all.

R-KFK-PROD-X3 — Send без partition key для бизнес-событий. Round-robin = потеря порядка для aggregate.

R-KFK-PROD-X4 — KafkaTemplate.send(...) из use-case handler в одной транзакции с DB-операцией. Kafka и Postgres не могут участвовать в одной 2PC-транзакции (Kafka не поддерживает XA). При rollback БД событие в Kafka уже опубликовано — несоответствие. Используй outbox.


2. Consumer

Подробно для человека: Kafka Consumer — manual ack, group.id, concurrency и max.poll.interval.

2.1 Обязательно

R-KFK-CONS-1 — Каждый consumer имеет уникальный group.id в формате <service>-<consumer-purpose>:

spring.kafka.consumer:
  group-id: order-service-payments-listener

Один consumer-group = одна логическая роль. Не делай общий group.id: order-service для всех listener-ов сервиса — потеряется ребалансинг по конкретной задаче.

R-KFK-CONS-2 — Manual ackspring.kafka.listener.ack-mode: MANUAL_IMMEDIATE (или MANUAL). Auto-commit (enable.auto.commit: true) опасен: offset коммитится по таймеру независимо от успеха обработки → при крэше consumer теряет события или дублирует. Manual ack = коммитим только после успешной обработки.

@KafkaListener(topics = "orders.confirmed", groupId = "billing-service")
public void handle(ConsumerRecord<String, OrderConfirmedEvent> record, Acknowledgment ack) {
    try {
        processEvent(record.value());
        ack.acknowledge();   // только после успешной обработки
    } catch (RetryableException e) {
        // не ack-аем — событие повторно дойдёт после next poll
        throw e;
    } catch (NonRetryableException e) {
        ack.acknowledge();   // ack-аем чтобы не зацикливаться, но шлём в DLQ (R-KFK-RTRY-*)
        dlqProducer.send(e, record);
    }
}

R-KFK-CONS-3 — Listener-метод обязательно idempotent (см. R-KFK-IDEM-*). Сообщение может прийти 2+ раз — это норма Kafka (at-least-once); duplicate-detection — на стороне consumer.

R-KFK-CONS-4 — auto.offset.reset: earliest для critical-consumer'ов. latest (дефолт Spring) пропускает события если consumer-group новая или сильно отстал — недопустимо для денег / orders. earliest — начинать с самого старого retained-сообщения.

R-KFK-CONS-5 — Concurrency — настраивается per-listener:

@KafkaListener(topics = "orders.confirmed", groupId = "billing", concurrency = "3")

concurrency ≤ числа partition'ов топика. Иначе лишние consumer-instance бездействуют.

R-KFK-CONS-6 — max.poll.interval.ms ≥ ожидаемого времени обработки batch + buffer. Default 5 минут. Если обработка одного сообщения может занять > 5 минут — увеличить, иначе Kafka считает consumer dead и rebalance-ит.

2.2 Запрещено

R-KFK-CONS-X1 — enable.auto.commit: true в проде. Авто-коммит = offset продвигается до того, как обработка завершилась. Crash → потеря данных.

R-KFK-CONS-X2 — Listener вызывает Thread.sleep(...) или другие blocking-операции > 1s в одном цикле обработки. Блокирует poll-цикл, может привести к rebalance.

R-KFK-CONS-X3 — group.id отсутствует или одинаковый для двух разных listener-методов в одном сервисе. Без явного group.id Spring создаёт случайный — нет ребалансинга между pods.

R-KFK-CONS-X4 — HTTP-вызов к внешней системе из listener без CB/Bulkhead (R-RES-WHERE-1). Если внешняя система лежит, listener зависает → rebalance → дубликаты.


3. Outbox publishing

Подробно для человека: Outbox publishing — атомарная публикация Kafka-событий из write-handler.

Транзакционная публикация событий: запись в БД и публикация в Kafka — атомарны. Реализуется через outbox pattern (см. ucp-pg-runtime-designoutbox-relay сценарий).

3.1 Обязательно

R-KFK-OBX-1 — Domain events публикуются через outbox-relay, не напрямую kafkaTemplate.send(...) из handler. В handler:

@Transactional
public Order handle(ConfirmOrderCommand cmd) {
    var order = orderRepository.findById(cmd.orderId(), SelectMode.FOR_UPDATE).orElseThrow();
    order.confirm();
    orderRepository.save(order);

    // Запись в outbox в той же транзакции:
    outboxRepository.append(OutboxEvent.builder()
        .aggregateType("Order")
        .aggregateId(order.getId())
        .eventType("OrderConfirmedEvent")
        .payload(jsonbHelper.serialize(new OrderConfirmedEvent(order)))
        .build());

    return order;
}

Атомарность: либо обе записи коммитятся (БД commit), либо обе откатываются (БД rollback). Kafka сам публикуется отдельно — outbox-relay (см. R-KFK-OBX-2).

R-KFK-OBX-2 — Outbox-relay — отдельный @Component с @Scheduled, который читает unpublished events с FOR UPDATE SKIP LOCKED (см. PG-L-021), публикует в Kafka через KafkaTemplate.send(...), помечает published_at. Реализация — ucp-pg-runtime-design (outbox сценарий).

R-KFK-OBX-3 — Topic name в outbox derives от eventType или aggregateType:

  • Дефолт: <service>.<aggregate-type>.<event-name>order-service.order.confirmed.
  • Альтернативно: один topic на aggregate с разными event-types в payload — нагляднее для consumer-side filtering.

R-KFK-OBX-4 — Outbox-relay обрабатывает batch (10–50 events за раз), не по одному. Это снижает overhead на DB-poll и Kafka-roundtrip. См. пример в pg-runtime-style-guide.md.

3.2 Запрещено

R-KFK-OBX-X1 — kafkaTemplate.send(...) из @Transactional-метода, особенно где есть DB-операция. Kafka commit не откатывается с DB rollback → inconsistent published events.

R-KFK-OBX-X2 — @TransactionalEventListener для отправки в Kafka без outbox. Обработчик срабатывает после commit → если процесс упал между commit и publish, событие потерялось.

R-KFK-OBX-X3 — Outbox без published_at колонки или без partial-индекса WHERE published_at IS NULL. Полный scan таблицы — тормоза.


4. Idempotent consumer

Подробно для человека: Idempotent consumer — processed_event таблица и dedup по eventId.

Kafka гарантирует at-least-once delivery. Consumer должен сам справляться с дубликатами.

4.1 Обязательно

R-KFK-IDEM-1 — Каждое событие имеет уникальный eventId (UUID v7) в payload или header'е. Consumer проверяет, обрабатывалось ли уже:

@KafkaListener(...)
public void handle(OrderConfirmedEvent event, Acknowledgment ack) {
    if (processedEventRepository.exists(event.eventId())) {
        ack.acknowledge();
        return;       // duplicate, skip
    }
    processOrder(event);
    processedEventRepository.markProcessed(event.eventId(), Instant.now());
    ack.acknowledge();
}

R-KFK-IDEM-2 — processed_event таблица — DDL под PG-T-*:

CREATE TABLE processed_event (
    event_id      uuid PRIMARY KEY,
    consumer_group text NOT NULL,
    processed_at  timestamptz NOT NULL DEFAULT now()
);

PRIMARY KEY на event_id — UNIQUE constraint обеспечивает дедупликацию даже под race conditions. TTL через partition + drop_old (для долгоживущих топиков) или background-job, удаляющий старые записи.

R-KFK-IDEM-3 — Записи в processed_event и бизнес-результат — в одной транзакции. Если процесс упал между бизнес-update и mark-processed → следующий poll увидит «не processed» и обработает повторно (что ОК, потому что бизнес-операция была идемпотентной по event_id).

Альтернатива: использовать event_id как Idempotency-Key для downstream-команды (см. AUTH-19).

R-KFK-IDEM-4 — Для money-операций — двойная защита: event_id + Idempotency-Key на downstream HTTP вызовах. Любая retry-петля не должна привести к двойному списанию.

4.2 Запрещено

R-KFK-IDEM-X1 — Listener без проверки eventId для consumer'ов, где duplicate приведёт к проблеме. Default Kafka — at-least-once; полагаться на «обычно срабатывает один раз» опасно.

R-KFK-IDEM-X2 — Использовать Kafka offset как dedup-ключ. Offset зависит от consumer-group; при добавлении нового consumer-group все события приходят как «впервые».


5. Retry topic + DLQ

Подробно для человека: Retry topic + DLQ — non-blocking retry с возрастающим backoff.

Blocking-retry в listener (Thread.sleep(...) + retry в цикле) — антипаттерн. Используй non-blocking retry topic + DLQ.

5.1 Обязательно

R-KFK-RTRY-1 — Retry topics — отдельные топики с возрастающим delay:

  • orders.confirmed — основной.
  • orders.confirmed.retry-1m — retry через 1 мин.
  • orders.confirmed.retry-10m — retry через 10 мин.
  • orders.confirmed.dlq — окончательный fail после N попыток.

Spring Kafka supports это через @RetryableTopic:

@RetryableTopic(
    attempts = "4",
    backoff = @Backoff(delay = 60_000, multiplier = 10),
    autoCreateTopics = "false",      // топики создаются явно через Liquibase-аналог или admin
    include = {RetryableException.class},
    exclude = {NonRetryableException.class}
)
@KafkaListener(topics = "orders.confirmed")
public void handle(OrderConfirmedEvent event, Acknowledgment ack) { ... }

R-KFK-RTRY-2 — Retry только для transient failures:

  • Сетевые ошибки (IOException, ConnectException).
  • 5xx от downstream HTTP.
  • Database timeout.

Не retry:

  • 4xx от downstream (контрактная ошибка).
  • IllegalArgumentException / NullPointerException — баг, retry не поможет.
  • Validation failures.

R-KFK-RTRY-3 — DLQ-monitoring — alert если в DLQ за час > N сообщений. Без алерта DLQ становится свалкой, и проблемы не замечают.

R-KFK-RTRY-4 — Replay из DLQ — отдельная админская операция (manual review + re-publish в основной топик). Не автоматическая (могут быть genuine bug).

5.2 Запрещено

R-KFK-RTRY-X1 — Blocking retry в listener через Thread.sleep(N) или @Retryable Spring-Retry с большой задержкой. Блокирует poll-цикл.

R-KFK-RTRY-X2 — Игнорирование исключения в listener (try { ... } catch (Exception e) { log.error(...); ack.acknowledge(); }). Событие потеряно, никто не узнает что не обработали.

R-KFK-RTRY-X3 — Retry topic без max-attempts. Бесконечный retry = lock-step с проблемной системой.

R-KFK-RTRY-X4 — DLQ без monitoring. Alert на размер очереди — обязательно.


6. Event design

Подробно для человека: Event design — past tense, eventId UUID v7 и forward-compatible schema.

6.1 Обязательно

R-KFK-EVT-1 — Имя события — глагол в прошедшем времени: OrderConfirmed, PaymentFailed, UserRegistered. Не ConfirmOrder (это команда), не OrderConfirmation (это noun).

R-KFK-EVT-2 — Payload содержит:

  • eventId — UUID v7, уникален.
  • eventType — string id, версионированный (order.confirmed.v1).
  • occurredAtOffsetDateTime (когда произошло событие, не когда опубликовано).
  • aggregateType, aggregateId — для маршрутизации/дедупа.
  • Бизнес-данные — необходимые consumer-у для обработки.
  • Не PII в payload для топиков с broad consumer-base. Если PII — отдельный «full event» topic с restricted access (см. R-KFK-SEC-2).

R-KFK-EVT-3 — Forward-compatible schema: добавление новых полей — non-breaking. Удаление / переименование — breaking, требует нового eventType.v2. См. R-VER-5 (REST forward-compat).

R-KFK-EVT-4 — Domain event как Java record в core/<bc>/domain/event/:

public record OrderConfirmedEvent(
    UUID eventId,
    String eventType,
    OffsetDateTime occurredAt,
    Long orderId,
    Long customerId,
    Money totalAmount
) {
    public static OrderConfirmedEvent from(Order order) {
        return new OrderConfirmedEvent(
            UuidV7.generate(),
            "order.confirmed.v1",
            order.getConfirmedAt(),
            order.getId(),
            order.getCustomerId(),
            order.getTotalAmount()
        );
    }
}

6.2 Запрещено

R-KFK-EVT-X1 — Имя события — команда (ConfirmOrder вместо OrderConfirmed). Команды и события — разные концепты в DDD.

R-KFK-EVT-X2 — Внутренние объекты в payload (Aggregate, Entity целиком). Сериализация может включить нестабильные внутренние поля, ломает forward-compat.

R-KFK-EVT-X3 — PII в широковещательных топиках (orders.confirmed — все consumer'ы видят email, phone). Нужен отдельный restricted-topic или по customerId подгрузка PII через сервис.

R-KFK-EVT-X4 — Breaking change без версии в eventType. Старые consumer'ы при regenerate схемы перестанут работать.


7. Конфигурация

Подробно для человека: Конфигурация Kafka — application.yml, trusted-packages и missing-topics-fatal.

7.1 Обязательно

R-KFK-CFG-1 — Через @ConfigurationProperties + @Validated (R-VLD-CFG-*):

@ConfigurationProperties("kafka")
@Validated
public record KafkaSettings(
    @NotBlank String bootstrapServers,
    @NotNull Producer producer,
    @NotEmpty Map<String, ConsumerConfig> consumers
) {
    public record Producer(
        @NotNull Duration requestTimeout,
        @Min(0) int retryBackoffMs
    ) {}

    public record ConsumerConfig(
        @NotBlank String groupId,
        @NotEmpty List<@NotBlank String> topics,
        @Min(1) int concurrency,
        @NotNull Duration maxPollInterval
    ) {}
}

R-KFK-CFG-2 — application.yml:

spring.kafka:
  bootstrap-servers: ${KAFKA_BROKERS:localhost:9092}
  producer:
    acks: all
    properties:
      enable.idempotence: true
    key-serializer: org.apache.kafka.common.serialization.StringSerializer
    value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
  consumer:
    auto-offset-reset: earliest
    enable-auto-commit: false
    key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
    properties:
      spring.json.trusted.packages: ru.example.events.*
  listener:
    ack-mode: MANUAL_IMMEDIATE
    missing-topics-fatal: true       # fail-fast если топика нет

R-KFK-CFG-3 — spring.json.trusted.packages — explicit allow-list. По умолчанию Spring блокирует deserialization (security). Указывай только пакеты с event-records.

R-KFK-CFG-4 — missing-topics-fatal: true в проде. Сервис не должен стартовать если ожидаемый топик не существует — ловим конфигурационные ошибки на старте.

7.2 Запрещено

R-KFK-CFG-X1 — spring.json.trusted.packages: '*' — security risk (deserialization gadgets из произвольных классов).

R-KFK-CFG-X2 — bootstrap-servers hard-coded в коде или yml без env-substitution. Нельзя катить разные кластеры (test/prod).


8. Observability

Подробно для человека: Observability Kafka — consumer lag alerts, traceparent и DLQ-size.

8.1 Обязательно

R-KFK-OBS-1 — Spring Kafka автоматически экспортирует через Micrometer:

  • kafka_consumer_records_consumed_total{client_id,topic}
  • kafka_consumer_lag (records behind) — главный health-сигнал.
  • kafka_consumer_records_lag_max{topic,partition}
  • kafka_producer_record_send_total{topic}
  • kafka_producer_record_error_total{topic}

R-KFK-OBS-2 — Alert на consumer lag: если kafka_consumer_lag > N для критичных topic'ов в течение 5 минут → инцидент. Threshold зависит от пропускной способности (для money-events — 1000; для analytics — 100000).

R-KFK-OBS-3 — Tracing через traceparent (см. R-HDR-4 REST). Producer кладёт current traceparent в Kafka headers; consumer извлекает и продолжает trace. Spring Kafka + OTel автоконфиг это делает.

R-KFK-OBS-4 — DLQ-size alert (см. R-KFK-RTRY-3).

8.2 Запрещено

R-KFK-OBS-X1 — Отсутствие consumer-lag alerts. Без них «пропадание» сообщений замечается через жалобы пользователей.


9. Security

Подробно для человека: Kafka Security — TLS, ACLs per-сервис и restricted PII topics.

9.1 Обязательно

R-KFK-SEC-1 — В прод-кластере TLS (security.protocol: SSL) — обязательно для cross-network communication. SASL/PLAIN over plaintext запрещено.

R-KFK-SEC-2 — ACL'ы на топики — каждый сервис имеет ACL на чтение/запись только тех топиков, что ему нужны. Проектирование — DevOps/SRE, использование — clientId per-сервис в KafkaSettings.

R-KFK-SEC-3 — PII-данные — отдельные топики с restricted ACL, либо паттерн «слабая ссылка»: в широком топике только customerId, full PII consumer запрашивает у Customer-сервиса.

9.2 Запрещено

R-KFK-SEC-X1 — PLAINTEXT в проде. Только в локальной разработке.

R-KFK-SEC-X2 — Один service-account на весь кластер. ACL'ы по сервисам — для blast-radius containment.


10. Антипаттерны

АнтипаттернПравилоКорректно
enable.idempotence: falseR-KFK-PROD-X1true всегда
acks: 0 или acks: 1R-KFK-PROD-X2acks: all
Send без partition keyR-KFK-PROD-X3aggregate id как key
kafkaTemplate.send в @Transactional с DB-операциейR-KFK-PROD-X4, R-KFK-OBX-X1через outbox
@TransactionalEventListener для KafkaR-KFK-OBX-X2outbox-relay
Outbox без partial-indexR-KFK-OBX-X3WHERE published_at IS NULL
enable.auto.commit: trueR-KFK-CONS-X1MANUAL_IMMEDIATE ack
Thread.sleep(N) в listenerR-KFK-CONS-X2, R-KFK-RTRY-X1retry topic
group.id отсутствует/общийR-KFK-CONS-X3<service>-<purpose>
HTTP к внешней системе из listener без CBR-KFK-CONS-X4@CircuitBreaker
Listener без проверки eventIdR-KFK-IDEM-X1dedup через processed_event
Offset как dedup-ключR-KFK-IDEM-X2eventId UUID v7
Retry topic без max-attemptsR-KFK-RTRY-X33-5 attempts → DLQ
DLQ без monitoringR-KFK-RTRY-X4, R-KFK-OBS-X1alert на размер
Игнорирование exception в listenerR-KFK-RTRY-X2DLQ + alert
Имя события — команда (ConfirmOrder)R-KFK-EVT-X1OrderConfirmed
Aggregate целиком в payloadR-KFK-EVT-X2бизнес-поля + IDs
PII в широковещательных топикахR-KFK-EVT-X3, R-KFK-SEC-3restricted topic / по ID
Breaking change без версииR-KFK-EVT-X4eventType.v2
spring.json.trusted.packages: '*'R-KFK-CFG-X1explicit allow-list
Hard-coded bootstrap-serversR-KFK-CFG-X2env-substitution
PLAINTEXT в продеR-KFK-SEC-X1TLS / SASL
Один service-account на кластерR-KFK-SEC-X2per-сервис ACLs

Финальная сводка: правил «Обязательно» — около 30, «Запрещено» — около 25.