← назад к разделу

Если вы уже знаете, что такое топик, партиция и группа потребителей — эта статья про следующий шаг: как подключить Kafka к Spring Boot-приложению и что нужно настроить, чтобы оно работало надёжно в production.

Spring Kafka: KafkaTemplate и @KafkaListener

Библиотека spring-kafka оборачивает стандартный Java-клиент Kafka и даёт более удобный API. Для отправки сообщений используют KafkaTemplate, для приёма — аннотацию @KafkaListener.

// Отправка
@Component
@RequiredArgsConstructor
public class OrderEventPublisher {
    private final KafkaTemplate<String, OrderEvent> kafka;

    public void publish(OrderEvent event) {
        kafka.send("orders", event.orderId().toString(), event);
    }
}

// Приём
@Component
public class OrderEventListener {
    @KafkaListener(topics = "orders", groupId = "billing-service")
    public void handle(ConsumerRecord<String, OrderEvent> record) {
        var event = record.value();
        // обработка
    }
}

Под @KafkaListener живёт ConcurrentKafkaListenerContainerFactory — он держит пул потоков-потребителей. По умолчанию один поток, но можно указать больше (concurrency = 3) — тогда каждый поток обслуживает свой набор партиций.

AckMode: когда Kafka «знает», что сообщение обработано

Kafka следит за тем, до какого сообщения дошёл потребитель, через offset commit. AckMode определяет, когда этот коммит происходит:

  • BATCH — после успешной обработки всей пачки сообщений (значение по умолчанию).
  • RECORD — после каждого сообщения. Безопаснее при сбоях, но медленнее.
  • MANUAL / MANUAL_IMMEDIATE — приложение само вызывает Acknowledgment.acknowledge(). Нужен, когда важно закоммитить оффсет только после успешной записи в базу.
@Bean
ConcurrentKafkaListenerContainerFactory<String, OrderEvent> kafkaListenerContainerFactory(
        ConsumerFactory<String, OrderEvent> consumerFactory) {
    var factory = new ConcurrentKafkaListenerContainerFactory<String, OrderEvent>();
    factory.setConsumerFactory(consumerFactory);
    factory.setConcurrency(3);
    factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
    return factory;
}

Заголовки сообщений

Каждое сообщение Kafka может содержать заголовки — пары (ключ, значение) в байтах, отдельно от полезной нагрузки. Сюда кладут техническую метаинформацию, которой не место в бизнес-данных:

  • X-Correlation-ID / traceparent — для распределённого трейсинга.
  • X-Event-Version — версия формата события.
  • X-Source-Service — откуда пришло сообщение.
// При отправке
ProducerRecord<String, OrderEvent> record = new ProducerRecord<>("orders", key, event);
record.headers().add("X-Correlation-ID", correlationId.getBytes());
kafka.send(record);

// При получении
@KafkaListener(topics = "orders")
public void handle(
        @Payload OrderEvent event,
        @Header("X-Correlation-ID") String correlationId) {
    MDC.put("correlationId", correlationId);
    // обработка
}

Бизнес-данные (orderId, amount, status) — в payload. Всё техническое — в headers.

Dead Letter Queue: что делать с проблемными сообщениями

Если обработчик бросает исключение, Spring Kafka по умолчанию снова и снова пытается обработать то же самое сообщение — оффсет не двигается, потребитель встаёт. На production это означает полную остановку группы.

Решение — Dead Letter Queue (DLQ): после нескольких неудачных попыток сообщение перекладывается в отдельный топик (обычно с суффиксом .DLT), оффсет коммитится, основной потребитель продолжает работу.

Простой способ: DefaultErrorHandler

@Bean
DefaultErrorHandler errorHandler(KafkaTemplate<Object, Object> template) {
    var recoverer = new DeadLetterPublishingRecoverer(template,
        (record, ex) -> new TopicPartition("orders.DLT", record.partition()));
    return new DefaultErrorHandler(recoverer, new FixedBackOff(1000L, 3));
}

Здесь: 3 попытки с интервалом 1 секунда, после третьей неудачи — сообщение уходит в orders.DLT. В DLT-сообщении автоматически добавляются заголовки с текстом ошибки и именем оригинального топика.

Современный способ: @RetryableTopic

С Spring Kafka 2.7+ можно сделать то же самое декларативно:

@RetryableTopic(
    attempts = "4",
    backoff = @Backoff(delay = 1000, multiplier = 2.0),
    autoCreateTopics = "false",
    dltStrategy = DltStrategy.FAIL_ON_ERROR
)
@KafkaListener(topics = "orders", groupId = "billing-service")
public void handle(OrderEvent event) {
    // обработка
}

@DltHandler
public void handleDlt(OrderEvent event, @Header("kafka_dlt-exception-message") String reason) {
    log.error("Не удалось обработать: order={} причина={}", event.orderId(), reason);
}

Spring автоматически создаёт топики orders-retry-0, orders-retry-1, orders-retry-2, orders-dlt. Сообщения «отстаиваются» в retry-топиках, не блокируя основной поток.

Когда что использовать

  • Временная проблема (база недоступна, внешний сервис не отвечает) → @RetryableTopic с нарастающей задержкой.
  • Постоянная проблема (невалидное сообщение, неверный формат) → DefaultErrorHandler без повторов, сразу в DLQ.
  • Нужно различать типы ошибок → DefaultErrorHandler.addNotRetryableExceptions(IllegalArgumentException.class) — для указанных исключений сразу DLQ, для остальных — повторы.

Schema Registry: как не сломать соседей при изменении схемы

Когда продьюсер и потребитель — разные сервисы разных команд, любое изменение структуры события потенциально ломает потребитель. Schema Registry решает это централизованным хранилищем схем с проверкой совместимости.

Как это работает:

  1. Продьюсер регистрирует схему в Schema Registry и получает числовой идентификатор (schema_id).
  2. В каждое сообщение он пишет 4 байта с этим идентификатором, а потом — сжатый payload.
  3. Потребитель читает schema_id, один раз скачивает схему из Schema Registry (потом кэширует) и десериализует payload.

Avro, Protobuf или JSON Schema

Стандарт в Kafka-экосистеме — Avro: компактный бинарный формат с хорошей поддержкой эволюции схем через avro-maven-plugin. Protobuf берут команды с gRPC-инфраструктурой. JSON Schema читаем глазами, но занимает в разы больше места — подходит для отладки.

Режимы совместимости

Schema Registry проверяет, не сломает ли новая версия схемы уже работающих потребителей:

  • BACKWARD (по умолчанию) — новый потребитель может читать старые сообщения. Можно удалять необязательные поля и добавлять необязательные с default-значением. Нельзя добавлять обязательные поля.
  • FORWARD — старый потребитель может читать новые сообщения. Зеркало BACKWARD.
  • FULL — оба режима одновременно. Самый строгий.
  • NONE — без проверок. Только для особых случаев.

Практическое правило: BACKWARD для топиков, где потребителей много, а продьюсер один — это типичная событийная шина.

Consumer lag: почему потребитель отстаёт

Lag — разница между последним сообщением в партиции и тем, до которого дошёл потребитель. Если lag растёт — потребитель не успевает за продьюсером. Это главная метрика здоровья потребительа в production.

Типичные причины:

  1. Медленная обработка — каждое сообщение делает синхронный запрос к базе или внешнему сервису. Решение: уменьшить max.poll.records, параллелизировать обработку, перейти на пакетные запросы.
  2. Мало партиций на группу — увеличить concurrency контейнера или добавить экземпляры сервиса (но не больше числа партиций).
  3. Истекает max.poll.interval.ms — если обработка одной пачки занимает больше 5 минут (значение по умолчанию), Kafka считает потребительа мёртвым и запускает перераспределение партиций. Решение: уменьшить max.poll.records или поднять max.poll.interval.ms.
  4. Паузы JVM / GC — виртуальная машина встала на 30 секунд, lag вырос на тысячи сообщений.

Посмотреть lag вручную:

kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
    --group billing-service --describe
# TOPIC   PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
# orders  0          124530          124530          0
# orders  1          124450          124530          80   ← отстаёт

Метрика для мониторинга: kafka_consumergroup_lag. Алерт: lag больше N и растёт N минут подряд.

Тюнинг производительности

Kafka работает хорошо из коробки, но несколько параметров стоит знать.

Продьюсер

ПараметрЗначение по умолчаниюКогда менять
batch.size16 KBПоднять до 64–256 KB при большом объёме записи. Продьюсер ждёт, пока наберётся пачка, и отправляет её целиком.
linger.ms0Поднять до 5–20 мс. Продьюсер ждёт N мс перед отправкой, даже если пачка неполная — больше пропускная способность, чуть больше задержка.
compression.typenoneВключить zstd или lz4. На JSON-сообщениях экономит в 3–5 раз трафик и место на диске.

Потребитель

ПараметрЗначение по умолчаниюКогда менять
fetch.min.bytes1 байтПоднять до 50–500 KB. Потребитель ждёт накопления данных — меньше нагрузка на брокер.
max.poll.records500Снизить до 100–50, если каждое сообщение требует долгой обработки.
max.poll.interval.ms5 минутПоднять, если обработка пачки реально занимает столько.

Лучший алгоритм сжатия для большинства случаев — zstd (Kafka 2.1+): сжимает почти как gzip, работает почти как lz4.

Безопасность: SASL, SSL, ACL

По умолчанию Kafka слушает открытый порт без аутентификации — в production так не оставляют. Три слоя защиты:

  • SSL/TLS — шифрование соединения. Сертификаты на брокерах и клиентах.
  • SASL — аутентификация. SASL/PLAIN — логин и пароль (только поверх SSL). SASL/SCRAM-SHA-256 — безопаснее: challenge-response, пароль по сети не передаётся. SASL/OAUTHBEARER — OAuth2-токены, для интеграции с провайдером идентификации.
  • ACL — авторизация: кому разрешено читать и писать в какой топик. Управляется через kafka-acls.sh.

Типичная схема: один SASL-пользователь на каждый сервис, ACL ограничивают его доступ строго до нужных топиков.

# application.properties
spring.kafka.properties.security.protocol=SASL_SSL
spring.kafka.properties.sasl.mechanism=SCRAM-SHA-256
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
    username="billing-service" password="${KAFKA_PASSWORD}";
spring.kafka.properties.ssl.truststore.location=/etc/kafka/truststore.jks
spring.kafka.properties.ssl.truststore.password=${TRUSTSTORE_PASSWORD}

KRaft: Kafka без ZooKeeper

Раньше Kafka требовала отдельного ZooKeeper-кластера для хранения метаданных: какие топики существуют, кто лидер партиции, какие ACL действуют. ZooKeeper — отдельная система, которую нужно отдельно поднимать, мониторить и чинить.

С версии 3.3 (2022) появился KRaft — собственный протокол на базе Raft внутри самих брокеров. ZooKeeper больше не нужен. В Kafka 4.x поддержка ZooKeeper убрана полностью — все новые кластеры работают только на KRaft.

Для разработчика это почти незаметно: bootstrap-servers и поведение клиента не изменились. Меняется только то, как ops-команда разворачивает и обслуживает кластер.

Коротко

  • KafkaTemplate — отправка, @KafkaListener — приём. Конфигурация пула потоков и AckMode — в ConcurrentKafkaListenerContainerFactory.
  • AckMode управляет тем, когда коммитится оффсет: BATCH (по умолчанию), RECORD, MANUAL.
  • Заголовки сообщений — место для технической метаинформации (trace-id, версия схемы). Бизнес-данные — в payload.
  • DLQ спасает от зависания потребительа на проблемном сообщении. @RetryableTopic — декларативный способ с готовыми retry-топиками.
  • Schema Registry хранит схемы и проверяет совместимость при изменениях. Режим BACKWARD — новый потребитель читает старые сообщения.
  • Consumer lag — главная метрика здоровья потребительа. Растёт → потребитель не успевает.
  • Тюнинг producer: batch.size, linger.ms, compression.type=zstd. Тюнинг consumer: max.poll.records, max.poll.interval.ms.
  • Безопасность: SSL для шифрования, SASL для аутентификации, ACL для авторизации по топикам.
  • KRaft: с Kafka 3.3+ ZooKeeper не нужен; в Kafka 4.x поддержка ZooKeeper убрана полностью.

Что почитать дальше

  • Основы Kafka — устройство брокера, партиции, гарантии доставки, retention.
  • Распределённые паттерны — Saga, Outbox, Idempotent Consumer.
  • Resilience-паттерны — Circuit Breaker и Timeout в контексте событийных систем.