Если вы уже знаете, что такое топик, партиция и группа потребителей — эта статья про следующий шаг: как подключить Kafka к Spring Boot-приложению и что нужно настроить, чтобы оно работало надёжно в production.
Spring Kafka: KafkaTemplate и @KafkaListener
Библиотека spring-kafka оборачивает стандартный Java-клиент Kafka и даёт более удобный API. Для отправки сообщений используют KafkaTemplate, для приёма — аннотацию @KafkaListener.
// Отправка
@Component
@RequiredArgsConstructor
public class OrderEventPublisher {
private final KafkaTemplate<String, OrderEvent> kafka;
public void publish(OrderEvent event) {
kafka.send("orders", event.orderId().toString(), event);
}
}
// Приём
@Component
public class OrderEventListener {
@KafkaListener(topics = "orders", groupId = "billing-service")
public void handle(ConsumerRecord<String, OrderEvent> record) {
var event = record.value();
// обработка
}
}
Под @KafkaListener живёт ConcurrentKafkaListenerContainerFactory — он держит пул потоков-потребителей. По умолчанию один поток, но можно указать больше (concurrency = 3) — тогда каждый поток обслуживает свой набор партиций.
AckMode: когда Kafka «знает», что сообщение обработано
Kafka следит за тем, до какого сообщения дошёл потребитель, через offset commit. AckMode определяет, когда этот коммит происходит:
BATCH— после успешной обработки всей пачки сообщений (значение по умолчанию).RECORD— после каждого сообщения. Безопаснее при сбоях, но медленнее.MANUAL/MANUAL_IMMEDIATE— приложение само вызываетAcknowledgment.acknowledge(). Нужен, когда важно закоммитить оффсет только после успешной записи в базу.
@Bean
ConcurrentKafkaListenerContainerFactory<String, OrderEvent> kafkaListenerContainerFactory(
ConsumerFactory<String, OrderEvent> consumerFactory) {
var factory = new ConcurrentKafkaListenerContainerFactory<String, OrderEvent>();
factory.setConsumerFactory(consumerFactory);
factory.setConcurrency(3);
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
return factory;
}
Заголовки сообщений
Каждое сообщение Kafka может содержать заголовки — пары (ключ, значение) в байтах, отдельно от полезной нагрузки. Сюда кладут техническую метаинформацию, которой не место в бизнес-данных:
X-Correlation-ID/traceparent— для распределённого трейсинга.X-Event-Version— версия формата события.X-Source-Service— откуда пришло сообщение.
// При отправке
ProducerRecord<String, OrderEvent> record = new ProducerRecord<>("orders", key, event);
record.headers().add("X-Correlation-ID", correlationId.getBytes());
kafka.send(record);
// При получении
@KafkaListener(topics = "orders")
public void handle(
@Payload OrderEvent event,
@Header("X-Correlation-ID") String correlationId) {
MDC.put("correlationId", correlationId);
// обработка
}
Бизнес-данные (orderId, amount, status) — в payload. Всё техническое — в headers.
Dead Letter Queue: что делать с проблемными сообщениями
Если обработчик бросает исключение, Spring Kafka по умолчанию снова и снова пытается обработать то же самое сообщение — оффсет не двигается, потребитель встаёт. На production это означает полную остановку группы.
Решение — Dead Letter Queue (DLQ): после нескольких неудачных попыток сообщение перекладывается в отдельный топик (обычно с суффиксом .DLT), оффсет коммитится, основной потребитель продолжает работу.
Простой способ: DefaultErrorHandler
@Bean
DefaultErrorHandler errorHandler(KafkaTemplate<Object, Object> template) {
var recoverer = new DeadLetterPublishingRecoverer(template,
(record, ex) -> new TopicPartition("orders.DLT", record.partition()));
return new DefaultErrorHandler(recoverer, new FixedBackOff(1000L, 3));
}
Здесь: 3 попытки с интервалом 1 секунда, после третьей неудачи — сообщение уходит в orders.DLT. В DLT-сообщении автоматически добавляются заголовки с текстом ошибки и именем оригинального топика.
Современный способ: @RetryableTopic
С Spring Kafka 2.7+ можно сделать то же самое декларативно:
@RetryableTopic(
attempts = "4",
backoff = @Backoff(delay = 1000, multiplier = 2.0),
autoCreateTopics = "false",
dltStrategy = DltStrategy.FAIL_ON_ERROR
)
@KafkaListener(topics = "orders", groupId = "billing-service")
public void handle(OrderEvent event) {
// обработка
}
@DltHandler
public void handleDlt(OrderEvent event, @Header("kafka_dlt-exception-message") String reason) {
log.error("Не удалось обработать: order={} причина={}", event.orderId(), reason);
}
Spring автоматически создаёт топики orders-retry-0, orders-retry-1, orders-retry-2, orders-dlt. Сообщения «отстаиваются» в retry-топиках, не блокируя основной поток.
Когда что использовать
- Временная проблема (база недоступна, внешний сервис не отвечает) →
@RetryableTopicс нарастающей задержкой. - Постоянная проблема (невалидное сообщение, неверный формат) →
DefaultErrorHandlerбез повторов, сразу в DLQ. - Нужно различать типы ошибок →
DefaultErrorHandler.addNotRetryableExceptions(IllegalArgumentException.class)— для указанных исключений сразу DLQ, для остальных — повторы.
Schema Registry: как не сломать соседей при изменении схемы
Когда продьюсер и потребитель — разные сервисы разных команд, любое изменение структуры события потенциально ломает потребитель. Schema Registry решает это централизованным хранилищем схем с проверкой совместимости.
Как это работает:
- Продьюсер регистрирует схему в Schema Registry и получает числовой идентификатор (
schema_id). - В каждое сообщение он пишет 4 байта с этим идентификатором, а потом — сжатый payload.
- Потребитель читает
schema_id, один раз скачивает схему из Schema Registry (потом кэширует) и десериализует payload.
Avro, Protobuf или JSON Schema
Стандарт в Kafka-экосистеме — Avro: компактный бинарный формат с хорошей поддержкой эволюции схем через avro-maven-plugin. Protobuf берут команды с gRPC-инфраструктурой. JSON Schema читаем глазами, но занимает в разы больше места — подходит для отладки.
Режимы совместимости
Schema Registry проверяет, не сломает ли новая версия схемы уже работающих потребителей:
- BACKWARD (по умолчанию) — новый потребитель может читать старые сообщения. Можно удалять необязательные поля и добавлять необязательные с default-значением. Нельзя добавлять обязательные поля.
- FORWARD — старый потребитель может читать новые сообщения. Зеркало BACKWARD.
- FULL — оба режима одновременно. Самый строгий.
- NONE — без проверок. Только для особых случаев.
Практическое правило: BACKWARD для топиков, где потребителей много, а продьюсер один — это типичная событийная шина.
Consumer lag: почему потребитель отстаёт
Lag — разница между последним сообщением в партиции и тем, до которого дошёл потребитель. Если lag растёт — потребитель не успевает за продьюсером. Это главная метрика здоровья потребительа в production.
Типичные причины:
- Медленная обработка — каждое сообщение делает синхронный запрос к базе или внешнему сервису. Решение: уменьшить
max.poll.records, параллелизировать обработку, перейти на пакетные запросы. - Мало партиций на группу — увеличить
concurrencyконтейнера или добавить экземпляры сервиса (но не больше числа партиций). - Истекает
max.poll.interval.ms— если обработка одной пачки занимает больше 5 минут (значение по умолчанию), Kafka считает потребительа мёртвым и запускает перераспределение партиций. Решение: уменьшитьmax.poll.recordsили поднятьmax.poll.interval.ms. - Паузы JVM / GC — виртуальная машина встала на 30 секунд, lag вырос на тысячи сообщений.
Посмотреть lag вручную:
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group billing-service --describe
# TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
# orders 0 124530 124530 0
# orders 1 124450 124530 80 ← отстаёт
Метрика для мониторинга: kafka_consumergroup_lag. Алерт: lag больше N и растёт N минут подряд.
Тюнинг производительности
Kafka работает хорошо из коробки, но несколько параметров стоит знать.
Продьюсер
| Параметр | Значение по умолчанию | Когда менять |
|---|---|---|
batch.size | 16 KB | Поднять до 64–256 KB при большом объёме записи. Продьюсер ждёт, пока наберётся пачка, и отправляет её целиком. |
linger.ms | 0 | Поднять до 5–20 мс. Продьюсер ждёт N мс перед отправкой, даже если пачка неполная — больше пропускная способность, чуть больше задержка. |
compression.type | none | Включить zstd или lz4. На JSON-сообщениях экономит в 3–5 раз трафик и место на диске. |
Потребитель
| Параметр | Значение по умолчанию | Когда менять |
|---|---|---|
fetch.min.bytes | 1 байт | Поднять до 50–500 KB. Потребитель ждёт накопления данных — меньше нагрузка на брокер. |
max.poll.records | 500 | Снизить до 100–50, если каждое сообщение требует долгой обработки. |
max.poll.interval.ms | 5 минут | Поднять, если обработка пачки реально занимает столько. |
Лучший алгоритм сжатия для большинства случаев — zstd (Kafka 2.1+): сжимает почти как gzip, работает почти как lz4.
Безопасность: SASL, SSL, ACL
По умолчанию Kafka слушает открытый порт без аутентификации — в production так не оставляют. Три слоя защиты:
- SSL/TLS — шифрование соединения. Сертификаты на брокерах и клиентах.
- SASL — аутентификация.
SASL/PLAIN— логин и пароль (только поверх SSL).SASL/SCRAM-SHA-256— безопаснее: challenge-response, пароль по сети не передаётся.SASL/OAUTHBEARER— OAuth2-токены, для интеграции с провайдером идентификации. - ACL — авторизация: кому разрешено читать и писать в какой топик. Управляется через
kafka-acls.sh.
Типичная схема: один SASL-пользователь на каждый сервис, ACL ограничивают его доступ строго до нужных топиков.
# application.properties
spring.kafka.properties.security.protocol=SASL_SSL
spring.kafka.properties.sasl.mechanism=SCRAM-SHA-256
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="billing-service" password="${KAFKA_PASSWORD}";
spring.kafka.properties.ssl.truststore.location=/etc/kafka/truststore.jks
spring.kafka.properties.ssl.truststore.password=${TRUSTSTORE_PASSWORD}
KRaft: Kafka без ZooKeeper
Раньше Kafka требовала отдельного ZooKeeper-кластера для хранения метаданных: какие топики существуют, кто лидер партиции, какие ACL действуют. ZooKeeper — отдельная система, которую нужно отдельно поднимать, мониторить и чинить.
С версии 3.3 (2022) появился KRaft — собственный протокол на базе Raft внутри самих брокеров. ZooKeeper больше не нужен. В Kafka 4.x поддержка ZooKeeper убрана полностью — все новые кластеры работают только на KRaft.
Для разработчика это почти незаметно: bootstrap-servers и поведение клиента не изменились. Меняется только то, как ops-команда разворачивает и обслуживает кластер.
Коротко
KafkaTemplate— отправка,@KafkaListener— приём. Конфигурация пула потоков и AckMode — вConcurrentKafkaListenerContainerFactory.- AckMode управляет тем, когда коммитится оффсет:
BATCH(по умолчанию),RECORD,MANUAL. - Заголовки сообщений — место для технической метаинформации (trace-id, версия схемы). Бизнес-данные — в payload.
- DLQ спасает от зависания потребительа на проблемном сообщении.
@RetryableTopic— декларативный способ с готовыми retry-топиками. - Schema Registry хранит схемы и проверяет совместимость при изменениях. Режим BACKWARD — новый потребитель читает старые сообщения.
- Consumer lag — главная метрика здоровья потребительа. Растёт → потребитель не успевает.
- Тюнинг producer:
batch.size,linger.ms,compression.type=zstd. Тюнинг consumer:max.poll.records,max.poll.interval.ms. - Безопасность: SSL для шифрования, SASL для аутентификации, ACL для авторизации по топикам.
- KRaft: с Kafka 3.3+ ZooKeeper не нужен; в Kafka 4.x поддержка ZooKeeper убрана полностью.
Что почитать дальше
- Основы Kafka — устройство брокера, партиции, гарантии доставки, retention.
- Распределённые паттерны — Saga, Outbox, Idempotent Consumer.
- Resilience-паттерны — Circuit Breaker и Timeout в контексте событийных систем.