Если основы Kafka — это про устройство брокера и гарантии доставки, то эта статья — про всё, с чем сталкиваешься, когда Spring-приложение с консьюмером оказывается в проде: фреймворковая обвязка, обработка ошибок, эволюция схем, мониторинг, безопасность, тюнинг производительности.
Spring Kafka — что под капотом
spring-kafka оборачивает Java-клиент Kafka и даёт декларативную модель. Два основных бина:
// Producer
@Component
@RequiredArgsConstructor
public class OrderEventPublisher {
private final KafkaTemplate<String, OrderEvent> kafka;
public void publish(OrderEvent event) {
kafka.send("orders", event.orderId().toString(), event);
}
}
// Consumer
@Component
public class OrderEventListener {
@KafkaListener(topics = "orders", groupId = "billing-service")
public void handle(ConsumerRecord<String, OrderEvent> record) {
var event = record.value();
// ...
}
}
Под @KafkaListener живёт ConcurrentKafkaListenerContainerFactory — фабрика контейнеров, которые держат пул потоков-консьюмеров. Один listener = один или несколько потоков (concurrency = 3 в конфигурации); каждый поток обслуживает свой набор партиций.
Конфигурация фабрики — место, где задаётся 80% поведения консьюмера:
@Bean
ConcurrentKafkaListenerContainerFactory<String, OrderEvent> kafkaListenerContainerFactory(
ConsumerFactory<String, OrderEvent> consumerFactory) {
var factory = new ConcurrentKafkaListenerContainerFactory<String, OrderEvent>();
factory.setConsumerFactory(consumerFactory);
factory.setConcurrency(3);
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
factory.setCommonErrorHandler(new DefaultErrorHandler(/* recoverer */));
return factory;
}
AckMode определяет, когда коммитится offset:
BATCH(default) — после успешной обработки всей пачки.RECORD— после каждого сообщения. Безопаснее, но медленнее.MANUAL/MANUAL_IMMEDIATE— приложение явно вызываетAcknowledgment.acknowledge(). Нужен для случаев, где надо коммитить после успешной записи в БД.
Headers — трейсинг и метаданные
Каждое сообщение Kafka содержит произвольные заголовки — пары (byte[] key, byte[] value). Это правильное место для технической метаинформации, которая не должна попадать в полезную нагрузку:
X-Correlation-ID/traceparent— для распределённого трейсинга (OpenTelemetry, B3).X-Event-Version/X-Schema-Version— версия формата.X-Source-Service— откуда пришло.X-Retry-Count— счётчик повторов (Spring Retry Topic его сам ведёт).
// Producer
ProducerRecord<String, OrderEvent> record = new ProducerRecord<>("orders", event.orderId().toString(), event);
record.headers()
.add("X-Correlation-ID", correlationId.getBytes())
.add("X-Event-Version", "2".getBytes());
kafka.send(record);
// Consumer
@KafkaListener(topics = "orders")
public void handle(
@Payload OrderEvent event,
@Header("X-Correlation-ID") String correlationId,
@Header(value = "X-Event-Version", required = false) String version) {
MDC.put("correlationId", correlationId);
// ...
}
Правило: всё, что не часть бизнес-факта, идёт в headers. Бизнес-факт (orderId, amount, status) — в payload.
Dead Letter Queue — что делать с poison messages
Если listener бросает исключение, Spring Kafka по умолчанию ретранслирует то же сообщение бесконечно: оффсет не двигается, потребитель крутит одно и то же. На проде это means остановка всей группы. Решение — DLQ.
Старый способ: DefaultErrorHandler с FixedBackOff + recoverer
@Bean
DefaultErrorHandler errorHandler(KafkaTemplate<Object, Object> template) {
var recoverer = new DeadLetterPublishingRecoverer(template,
(record, ex) -> new TopicPartition("orders.DLT", record.partition()));
return new DefaultErrorHandler(recoverer, new FixedBackOff(1000L, 3));
}
- При исключении: 3 попытки с интервалом 1 секунда.
- После 3 неудач: сообщение публикуется в
orders.DLT, оффсет коммитится, основной consumer движется дальше. - В DLT-сообщении заголовки
kafka_dlt-exception-message,kafka_dlt-exception-stacktrace,kafka_dlt-original-topic— для разбора.
Современный способ: @RetryableTopic
С Spring Kafka 2.7+ доступен декларативный механизм:
@RetryableTopic(
attempts = "4",
backoff = @Backoff(delay = 1000, multiplier = 2.0),
autoCreateTopics = "false",
dltStrategy = DltStrategy.FAIL_ON_ERROR
)
@KafkaListener(topics = "orders", groupId = "billing-service")
public void handle(OrderEvent event) {
// ...
}
@DltHandler
public void dlt(OrderEvent event, @Header("kafka_dlt-exception-message") String reason) {
log.error("Dead letter: order={} reason={}", event.orderId(), reason);
// alert, отправить в Sentry, etc.
}
Spring создаёт топики orders-retry-0 (1s), orders-retry-1 (2s), orders-retry-2 (4s), orders-dlt. Сообщения «отстаиваются» в retry-топиках, не блокируя основной поток.
Когда что брать
- Логика временной ошибки (БД недоступна, внешний API лежит) →
@RetryableTopicс exponential backoff. - Логика неустранимой ошибки (невалидное сообщение, кривой JSON, отсутствует обязательное поле) →
DefaultErrorHandlerсBackOff = ZERO(0 ретраев) → сразу в DLQ. - Микс —
DefaultErrorHandlerразличает по типу исключения:addNotRetryableExceptions(IllegalArgumentException.class)сразу шлёт в DLQ, остальное ретраит.
Schema Registry — как не сломать соседей
Когда продьюсер и консьюмер живут в разных репозиториях разных команд, изменение схемы события — главный источник инцидентов. Schema Registry решает проблему централизованным хранилищем схем с проверкой совместимости.
┌──────────┐ 1. сериализует с schema_id ┌──────────┐
│ Producer │ ───────────────────────────────────► │ Kafka │
└──────────┘ [schema_id (4 byte) | payload ] └──────────┘
│ │
▼ ▼
┌─────────────────┐ ┌──────────┐
│ Schema Registry │ ◄────── 2. читает schema │ Consumer │
└─────────────────┘ по schema_id └──────────┘
Сериализатор пишет в payload schema_id (4 байта), а саму схему хранит Schema Registry. Десериализатор получает schema_id, скачивает схему один раз (потом кэширует) и парсит.
Avro vs Protobuf vs JSON Schema
| Avro | Protobuf | JSON Schema | |
|---|---|---|---|
| Размер сообщения | Компактный (binary) | Компактный (binary) | Большой (текст) |
| Эволюция схемы | Хорошая, через резерв полей | Хорошая, через reserved теги | Хорошая, через additionalProperties |
| IDE-поддержка в Java | Через avro-maven-plugin | Через protoc плагины | Через jackson-jsonSchema |
| Читаемость в логах | Низкая | Низкая | Высокая (JSON виден глазами) |
| Сообщество | Зрелое в Kafka-экосистеме | Самый зрелый формат вне Kafka | Простейший, но громоздкий |
Стандарт в Kafka-экосистеме — Avro. Protobuf берут команды с gRPC-инфраструктурой. JSON — для отладки и pet-проектов.
Compatibility modes
Schema Registry проверяет совместимость новой схемы со старыми по выбранному режиму:
- BACKWARD (default) — новый консьюмер может читать старые сообщения. Можно: удалять необязательные поля, добавлять необязательные с default. Нельзя: добавлять обязательные.
- FORWARD — старый консьюмер может читать новые сообщения. Зеркало BACKWARD.
- FULL — оба сразу. Самый строгий.
- NONE — никаких проверок. Только для legacy-сценариев.
Практическое правило: BACKWARD для топиков, где консьюмеров много, продьюсер один (типично — событийная шина). FORWARD для топиков, где наоборот.
Consumer lag — что это и почему важно
Lag = (offset последнего сообщения в партиции) - (offset последнего закоммиченного у консьюмер-группы).
Если lag растёт — консьюмер не успевает за продьюсером. Прокси-метрика «здоровья» консьюмера в проде.
Типичные причины:
- Медленный обработчик — каждое сообщение делает синхронный HTTP-вызов или БД-операцию. Лечится: батчинг, параллелизация, асинхронные клиенты.
- Слишком много партиций на одного потребителя — увеличить
concurrencyконтейнера или добавить экземпляры сервиса. - Долгие транзакции /
max.poll.interval.ms— если обработка одной пачки идёт больше 5 минут (default), Kafka считает консьюмер мёртвым и инициирует rebalance. Решение: уменьшитьmax.poll.recordsили поднятьmax.poll.interval.ms. - GC-паузы / OOM — JVM встал на 30 секунд, lag за это время вырос на тысячи.
- Деплой / rolling restart — на время рестарта lag растёт, потом догоняет.
Мониторить: метрика kafka_consumergroup_lag (через JMX exporter или Burrow). Алерты на: lag > N + растёт N минут подряд.
Команда отладки:
kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
--group billing-service --describe
# GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG
# billing-service orders 0 124530 124530 0
# billing-service orders 1 124450 124530 80 ← отстаёт
Performance tuning — настройки, которые реально влияют
Producer
| Параметр | Default | Когда менять |
|---|---|---|
batch.size | 16 KB | Поднять до 64–256 KB при write-heavy. Producer ждёт, пока наберётся batch, потом шлёт пачкой. |
linger.ms | 0 | Поднять до 5–20 мс. Producer ждёт N мс перед отправкой даже неполного batch'а — больше throughput, чуть больше latency. |
compression.type | none | Включить lz4 или zstd. На JSON-сообщениях ~3–5× меньше трафика и места на диске. |
acks | all (с 3.0) | Не трогать. См. основы. |
enable.idempotence | true (с 3.0) | Не трогать. См. основы. |
Consumer
| Параметр | Default | Когда менять |
|---|---|---|
fetch.min.bytes | 1 байт | Поднять до 50–500 KB. Consumer ждёт, пока наберётся достаточно данных — меньше нагрузка на брокер. |
max.poll.records | 500 | Снижать до 100–50, если каждое сообщение требует долгой обработки. Иначе max.poll.interval.ms истечёт. |
max.poll.interval.ms | 5 min | Поднимать до 10–15 min, если обработка пачки реально занимает столько. |
session.timeout.ms | 45s | Не трогать без причины. Слишком маленькое = ложные rebalance, слишком большое = долго не замечается мёртвый консьюмер. |
Compression — выбор алгоритма
| Скорость | Сжатие | Когда | |
|---|---|---|---|
none | максимум | 1× | Очень редко, только для крошечных сообщений |
gzip | низкая | 4–5× | Когда CPU не узкое место, нужно минимум трафика |
snappy | высокая | 2–3× | Сбалансированный (default до 3.0) |
lz4 | очень высокая | 2–3× | Скорость важнее сжатия |
zstd | высокая | 4–5× | Лучший общий выбор с Kafka 2.1+ |
Универсально — zstd на современных версиях.
Security: SASL, SSL, ACL
Без security Kafka слушает open port — на проде так не работает. Три слоя:
- SSL/TLS — шифрование канала. Сертификаты на брокерах и клиентах, mTLS для service-to-service.
- SASL — аутентификация клиентов. Механизмы:
SASL/PLAIN(логин-пароль, только поверх SSL),SASL/SCRAM-SHA-256(challenge-response, безопаснее plain),SASL/OAUTHBEARER(OAuth2-токены, для интеграции с IdP). - ACL — авторизация: какому пользователю можно читать/писать в какой топик. Хранятся в ZooKeeper или KRaft метаданных. Управление через
kafka-acls.sh.
Типичная prod-конфигурация: TLS обязателен, SASL/SCRAM или OAUTHBEARER, ACL ограничивают доступ по принципу «один сервис = один SASL-пользователь = свой список топиков».
# spring.kafka.properties — пример
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
username="billing-service" password="${KAFKA_PASSWORD}";
ssl.truststore.location=/etc/kafka/truststore.jks
ssl.truststore.password=${TRUSTSTORE_PASSWORD}
KRaft — Kafka без ZooKeeper
Долгое время Kafka требовала отдельного ZooKeeper-кластера для хранения метаданных (топики, партиции, ACL, smelt offsets). С 3.3 (2022) KRaft стал GA — собственный консенсус-протокол на базе Raft внутри Kafka-брокеров, без ZooKeeper.
Что важно знать:
- В Kafka 4.x (2024–25) поддержка ZooKeeper полностью убрана. Все новые кластеры — KRaft only.
- Миграция с ZK на KRaft — задача operations, не разработчика. Со стороны клиента ничего не меняется: bootstrap-servers тот же, поведение то же.
- В KRaft часть брокеров играют роль controllers (хранят метаданные, выбирают лидера через Raft). Минимум 3 controller'а для кворума, отдельно или совмещённые с broker'ами.
Для разработчика смысл такой: на собесе спросят — «знаешь, что ZooKeeper больше не нужен?». Знать ответ.
Backpressure: что делать когда consumer не успевает
Когда обработка медленнее, чем поток сообщений, варианты:
- Параллелизация на уровне партиций — увеличить
concurrencylistener'а (но не больше числа партиций топика). - Параллелизация внутри партиции — Spring Kafka 3.0+ поддерживает
@KafkaListener(...containerFactory = "...")с виртуальными потоками (Java 21+). pause-resume— программно останавливать консьюмер, когда буфер забит:@KafkaListener(id = "billing", ...) public void handle(ConsumerRecord<String, OrderEvent> record, Acknowledgment ack) { if (backlog.size() > 1000) { Objects.requireNonNull(registry.getListenerContainer("billing")).pause(); } backlog.add(record); ack.acknowledge(); }- Увеличить число партиций топика — но это требует координации с продьюсером и не помогает retroactively на старых сообщениях.
- Сменить дизайн — если CPU-bound обработка тормозит на одной партиции, надо менять не Kafka, а архитектуру (вынести тяжёлое в отдельный сервис, переиспользовать кэши, async-IO).
Что почитать дальше
- Основы Kafka — устройство брокера, гарантии доставки, replication, retention.
- Kafka Style Guide — правила работы с Kafka в Spring-сервисах (коды
R-KAFKA-*). - Распределённые паттерны — Saga, Outbox, Idempotent Consumer.
- Resilience-патерны — Circuit Breaker, Timeout для Kafka-обвязки.
- Distributed Patterns Style Guide — правила выживания между БД и Kafka.