← назад к разделу

Если основы Kafka — это про устройство брокера и гарантии доставки, то эта статья — про всё, с чем сталкиваешься, когда Spring-приложение с консьюмером оказывается в проде: фреймворковая обвязка, обработка ошибок, эволюция схем, мониторинг, безопасность, тюнинг производительности.

Spring Kafka — что под капотом

spring-kafka оборачивает Java-клиент Kafka и даёт декларативную модель. Два основных бина:

// Producer
@Component
@RequiredArgsConstructor
public class OrderEventPublisher {
    private final KafkaTemplate<String, OrderEvent> kafka;

    public void publish(OrderEvent event) {
        kafka.send("orders", event.orderId().toString(), event);
    }
}

// Consumer
@Component
public class OrderEventListener {
    @KafkaListener(topics = "orders", groupId = "billing-service")
    public void handle(ConsumerRecord<String, OrderEvent> record) {
        var event = record.value();
        // ...
    }
}

Под @KafkaListener живёт ConcurrentKafkaListenerContainerFactory — фабрика контейнеров, которые держат пул потоков-консьюмеров. Один listener = один или несколько потоков (concurrency = 3 в конфигурации); каждый поток обслуживает свой набор партиций.

Конфигурация фабрики — место, где задаётся 80% поведения консьюмера:

@Bean
ConcurrentKafkaListenerContainerFactory<String, OrderEvent> kafkaListenerContainerFactory(
        ConsumerFactory<String, OrderEvent> consumerFactory) {
    var factory = new ConcurrentKafkaListenerContainerFactory<String, OrderEvent>();
    factory.setConsumerFactory(consumerFactory);
    factory.setConcurrency(3);
    factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
    factory.setCommonErrorHandler(new DefaultErrorHandler(/* recoverer */));
    return factory;
}

AckMode определяет, когда коммитится offset:

  • BATCH (default) — после успешной обработки всей пачки.
  • RECORD — после каждого сообщения. Безопаснее, но медленнее.
  • MANUAL / MANUAL_IMMEDIATE — приложение явно вызывает Acknowledgment.acknowledge(). Нужен для случаев, где надо коммитить после успешной записи в БД.

Headers — трейсинг и метаданные

Каждое сообщение Kafka содержит произвольные заголовки — пары (byte[] key, byte[] value). Это правильное место для технической метаинформации, которая не должна попадать в полезную нагрузку:

  • X-Correlation-ID / traceparent — для распределённого трейсинга (OpenTelemetry, B3).
  • X-Event-Version / X-Schema-Version — версия формата.
  • X-Source-Service — откуда пришло.
  • X-Retry-Count — счётчик повторов (Spring Retry Topic его сам ведёт).
// Producer
ProducerRecord<String, OrderEvent> record = new ProducerRecord<>("orders", event.orderId().toString(), event);
record.headers()
    .add("X-Correlation-ID", correlationId.getBytes())
    .add("X-Event-Version", "2".getBytes());
kafka.send(record);

// Consumer
@KafkaListener(topics = "orders")
public void handle(
        @Payload OrderEvent event,
        @Header("X-Correlation-ID") String correlationId,
        @Header(value = "X-Event-Version", required = false) String version) {
    MDC.put("correlationId", correlationId);
    // ...
}

Правило: всё, что не часть бизнес-факта, идёт в headers. Бизнес-факт (orderId, amount, status) — в payload.

Dead Letter Queue — что делать с poison messages

Если listener бросает исключение, Spring Kafka по умолчанию ретранслирует то же сообщение бесконечно: оффсет не двигается, потребитель крутит одно и то же. На проде это means остановка всей группы. Решение — DLQ.

Старый способ: DefaultErrorHandler с FixedBackOff + recoverer

@Bean
DefaultErrorHandler errorHandler(KafkaTemplate<Object, Object> template) {
    var recoverer = new DeadLetterPublishingRecoverer(template,
        (record, ex) -> new TopicPartition("orders.DLT", record.partition()));
    return new DefaultErrorHandler(recoverer, new FixedBackOff(1000L, 3));
}
  • При исключении: 3 попытки с интервалом 1 секунда.
  • После 3 неудач: сообщение публикуется в orders.DLT, оффсет коммитится, основной consumer движется дальше.
  • В DLT-сообщении заголовки kafka_dlt-exception-message, kafka_dlt-exception-stacktrace, kafka_dlt-original-topic — для разбора.

Современный способ: @RetryableTopic

С Spring Kafka 2.7+ доступен декларативный механизм:

@RetryableTopic(
    attempts = "4",
    backoff = @Backoff(delay = 1000, multiplier = 2.0),
    autoCreateTopics = "false",
    dltStrategy = DltStrategy.FAIL_ON_ERROR
)
@KafkaListener(topics = "orders", groupId = "billing-service")
public void handle(OrderEvent event) {
    // ...
}

@DltHandler
public void dlt(OrderEvent event, @Header("kafka_dlt-exception-message") String reason) {
    log.error("Dead letter: order={} reason={}", event.orderId(), reason);
    // alert, отправить в Sentry, etc.
}

Spring создаёт топики orders-retry-0 (1s), orders-retry-1 (2s), orders-retry-2 (4s), orders-dlt. Сообщения «отстаиваются» в retry-топиках, не блокируя основной поток.

Когда что брать

  • Логика временной ошибки (БД недоступна, внешний API лежит) → @RetryableTopic с exponential backoff.
  • Логика неустранимой ошибки (невалидное сообщение, кривой JSON, отсутствует обязательное поле) → DefaultErrorHandler с BackOff = ZERO (0 ретраев) → сразу в DLQ.
  • МиксDefaultErrorHandler различает по типу исключения: addNotRetryableExceptions(IllegalArgumentException.class) сразу шлёт в DLQ, остальное ретраит.

Schema Registry — как не сломать соседей

Когда продьюсер и консьюмер живут в разных репозиториях разных команд, изменение схемы события — главный источник инцидентов. Schema Registry решает проблему централизованным хранилищем схем с проверкой совместимости.

┌──────────┐    1. сериализует с schema_id           ┌──────────┐
│ Producer │ ───────────────────────────────────►   │  Kafka   │
└──────────┘    [schema_id (4 byte) | payload ]      └──────────┘
     │                                                     │
     ▼                                                     ▼
┌─────────────────┐                            ┌──────────┐
│ Schema Registry │ ◄────── 2. читает schema   │ Consumer │
└─────────────────┘         по schema_id       └──────────┘

Сериализатор пишет в payload schema_id (4 байта), а саму схему хранит Schema Registry. Десериализатор получает schema_id, скачивает схему один раз (потом кэширует) и парсит.

Avro vs Protobuf vs JSON Schema

AvroProtobufJSON Schema
Размер сообщенияКомпактный (binary)Компактный (binary)Большой (текст)
Эволюция схемыХорошая, через резерв полейХорошая, через reserved тегиХорошая, через additionalProperties
IDE-поддержка в JavaЧерез avro-maven-pluginЧерез protoc плагиныЧерез jackson-jsonSchema
Читаемость в логахНизкаяНизкаяВысокая (JSON виден глазами)
СообществоЗрелое в Kafka-экосистемеСамый зрелый формат вне KafkaПростейший, но громоздкий

Стандарт в Kafka-экосистеме — Avro. Protobuf берут команды с gRPC-инфраструктурой. JSON — для отладки и pet-проектов.

Compatibility modes

Schema Registry проверяет совместимость новой схемы со старыми по выбранному режиму:

  • BACKWARD (default) — новый консьюмер может читать старые сообщения. Можно: удалять необязательные поля, добавлять необязательные с default. Нельзя: добавлять обязательные.
  • FORWARD — старый консьюмер может читать новые сообщения. Зеркало BACKWARD.
  • FULL — оба сразу. Самый строгий.
  • NONE — никаких проверок. Только для legacy-сценариев.

Практическое правило: BACKWARD для топиков, где консьюмеров много, продьюсер один (типично — событийная шина). FORWARD для топиков, где наоборот.

Consumer lag — что это и почему важно

Lag = (offset последнего сообщения в партиции) - (offset последнего закоммиченного у консьюмер-группы).

Если lag растёт — консьюмер не успевает за продьюсером. Прокси-метрика «здоровья» консьюмера в проде.

Типичные причины:

  1. Медленный обработчик — каждое сообщение делает синхронный HTTP-вызов или БД-операцию. Лечится: батчинг, параллелизация, асинхронные клиенты.
  2. Слишком много партиций на одного потребителя — увеличить concurrency контейнера или добавить экземпляры сервиса.
  3. Долгие транзакции / max.poll.interval.ms — если обработка одной пачки идёт больше 5 минут (default), Kafka считает консьюмер мёртвым и инициирует rebalance. Решение: уменьшить max.poll.records или поднять max.poll.interval.ms.
  4. GC-паузы / OOM — JVM встал на 30 секунд, lag за это время вырос на тысячи.
  5. Деплой / rolling restart — на время рестарта lag растёт, потом догоняет.

Мониторить: метрика kafka_consumergroup_lag (через JMX exporter или Burrow). Алерты на: lag > N + растёт N минут подряд.

Команда отладки:

kafka-consumer-groups.sh --bootstrap-server localhost:9092 \
    --group billing-service --describe
# GROUP            TOPIC   PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG
# billing-service  orders  0          124530          124530          0
# billing-service  orders  1          124450          124530          80   ← отстаёт

Performance tuning — настройки, которые реально влияют

Producer

ПараметрDefaultКогда менять
batch.size16 KBПоднять до 64–256 KB при write-heavy. Producer ждёт, пока наберётся batch, потом шлёт пачкой.
linger.ms0Поднять до 5–20 мс. Producer ждёт N мс перед отправкой даже неполного batch'а — больше throughput, чуть больше latency.
compression.typenoneВключить lz4 или zstd. На JSON-сообщениях ~3–5× меньше трафика и места на диске.
acksall (с 3.0)Не трогать. См. основы.
enable.idempotencetrue (с 3.0)Не трогать. См. основы.

Consumer

ПараметрDefaultКогда менять
fetch.min.bytes1 байтПоднять до 50–500 KB. Consumer ждёт, пока наберётся достаточно данных — меньше нагрузка на брокер.
max.poll.records500Снижать до 100–50, если каждое сообщение требует долгой обработки. Иначе max.poll.interval.ms истечёт.
max.poll.interval.ms5 minПоднимать до 10–15 min, если обработка пачки реально занимает столько.
session.timeout.ms45sНе трогать без причины. Слишком маленькое = ложные rebalance, слишком большое = долго не замечается мёртвый консьюмер.

Compression — выбор алгоритма

СкоростьСжатиеКогда
noneмаксимумОчень редко, только для крошечных сообщений
gzipнизкая4–5×Когда CPU не узкое место, нужно минимум трафика
snappyвысокая2–3×Сбалансированный (default до 3.0)
lz4очень высокая2–3×Скорость важнее сжатия
zstdвысокая4–5×Лучший общий выбор с Kafka 2.1+

Универсально — zstd на современных версиях.

Security: SASL, SSL, ACL

Без security Kafka слушает open port — на проде так не работает. Три слоя:

  • SSL/TLS — шифрование канала. Сертификаты на брокерах и клиентах, mTLS для service-to-service.
  • SASL — аутентификация клиентов. Механизмы: SASL/PLAIN (логин-пароль, только поверх SSL), SASL/SCRAM-SHA-256 (challenge-response, безопаснее plain), SASL/OAUTHBEARER (OAuth2-токены, для интеграции с IdP).
  • ACL — авторизация: какому пользователю можно читать/писать в какой топик. Хранятся в ZooKeeper или KRaft метаданных. Управление через kafka-acls.sh.

Типичная prod-конфигурация: TLS обязателен, SASL/SCRAM или OAUTHBEARER, ACL ограничивают доступ по принципу «один сервис = один SASL-пользователь = свой список топиков».

# spring.kafka.properties — пример
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required \
    username="billing-service" password="${KAFKA_PASSWORD}";
ssl.truststore.location=/etc/kafka/truststore.jks
ssl.truststore.password=${TRUSTSTORE_PASSWORD}

KRaft — Kafka без ZooKeeper

Долгое время Kafka требовала отдельного ZooKeeper-кластера для хранения метаданных (топики, партиции, ACL, smelt offsets). С 3.3 (2022) KRaft стал GA — собственный консенсус-протокол на базе Raft внутри Kafka-брокеров, без ZooKeeper.

Что важно знать:

  • В Kafka 4.x (2024–25) поддержка ZooKeeper полностью убрана. Все новые кластеры — KRaft only.
  • Миграция с ZK на KRaft — задача operations, не разработчика. Со стороны клиента ничего не меняется: bootstrap-servers тот же, поведение то же.
  • В KRaft часть брокеров играют роль controllers (хранят метаданные, выбирают лидера через Raft). Минимум 3 controller'а для кворума, отдельно или совмещённые с broker'ами.

Для разработчика смысл такой: на собесе спросят — «знаешь, что ZooKeeper больше не нужен?». Знать ответ.

Backpressure: что делать когда consumer не успевает

Когда обработка медленнее, чем поток сообщений, варианты:

  1. Параллелизация на уровне партиций — увеличить concurrency listener'а (но не больше числа партиций топика).
  2. Параллелизация внутри партиции — Spring Kafka 3.0+ поддерживает @KafkaListener(...containerFactory = "...") с виртуальными потоками (Java 21+).
  3. pause-resume — программно останавливать консьюмер, когда буфер забит:
    @KafkaListener(id = "billing", ...)
    public void handle(ConsumerRecord<String, OrderEvent> record, Acknowledgment ack) {
        if (backlog.size() > 1000) {
            Objects.requireNonNull(registry.getListenerContainer("billing")).pause();
        }
        backlog.add(record);
        ack.acknowledge();
    }
    
  4. Увеличить число партиций топика — но это требует координации с продьюсером и не помогает retroactively на старых сообщениях.
  5. Сменить дизайн — если CPU-bound обработка тормозит на одной партиции, надо менять не Kafka, а архитектуру (вынести тяжёлое в отдельный сервис, переиспользовать кэши, async-IO).

Что почитать дальше

  • Основы Kafka — устройство брокера, гарантии доставки, replication, retention.
  • Kafka Style Guide — правила работы с Kafka в Spring-сервисах (коды R-KAFKA-*).
  • Распределённые паттерны — Saga, Outbox, Idempotent Consumer.
  • Resilience-патерны — Circuit Breaker, Timeout для Kafka-обвязки.
  • Distributed Patterns Style Guide — правила выживания между БД и Kafka.