Опирается на правила: R-KFK-CONS-1R-KFK-CONS-6 и R-KFK-CONS-X1R-KFK-CONS-X4 из Kafka Style Guide → раздел 2. Consumer.

Важно знать

  • Уникальный group.id формата <service>-<consumer-purpose>. Один group = одна логическая роль.
  • Manual ack обязателен (ack-mode: MANUAL_IMMEDIATE). Auto-commit = потеря данных при crash.
  • Listener idempotent — at-least-once delivery, duplicate-detection на стороне consumer.
  • auto.offset.reset: earliest для critical-consumer'ов. latest пропускает события.
  • Concurrency ≤ числа partition'ов — иначе лишние consumer-instance простаивают.
  • max.poll.interval.ms ≥ ожидаемого времени обработки batch + buffer.
  • Thread.sleep > 1s в listener — блокирует poll-цикл, провоцирует rebalance.
  • HTTP к внешней системе без CB/Bulkhead — listener зависает, rebalance, дубликаты.

Consumer — точка, где сервис принимает факт из внешнего мира. Главные ошибки здесь — потерянное событие (broker отдал, мы не обработали) и rebalance loop (consumer не успевает poll-нуть, Kafka считает его dead, переотдаёт partition'ы).

group.id per logical purpose

R-KFK-CONS-1: формат <service>-<consumer-purpose>.

spring:
  kafka:
    consumer:
      group-id: order-service-payments-listener

Или per-listener через аннотацию:

@KafkaListener(
    topics = "payment.events",
    groupId = "order-service-payments-listener"
)
public void onPaymentEvent(PaymentEvent event, Acknowledgment ack) { ... }

@KafkaListener(
    topics = "inventory.events",
    groupId = "order-service-inventory-listener"
)
public void onInventoryEvent(InventoryEvent event, Acknowledgment ack) { ... }

Один consumer-group = одна логическая роль. Каждый group имеет независимый offset, независимый rebalance.

Если бы был общий group.id: order-service для всех listener-ов — Kafka бы распределяла partitions между instance-ами одного group, не понимая, какой listener на каком partition. При rebalance одного listener-а ребаланс-ятся все.

Manual ack

R-KFK-CONS-2: ack-mode: MANUAL_IMMEDIATE (или MANUAL).

spring:
  kafka:
    listener:
      ack-mode: MANUAL_IMMEDIATE
    consumer:
      enable-auto-commit: false
@KafkaListener(topics = "orders.confirmed", groupId = "billing-service")
public void handle(ConsumerRecord<String, OrderConfirmedEvent> record, Acknowledgment ack) {
    try {
        processEvent(record.value());
        ack.acknowledge();
    } catch (RetryableException e) {
        throw e;
    } catch (NonRetryableException e) {
        dlqProducer.send(record, e);
        ack.acknowledge();
    }
}

Логика: ack.acknowledge() коммитит offset только после успешной обработки. Если listener бросает exception до ack — следующий poll получит то же сообщение снова. Это даёт at-least-once: ни одно событие не теряется, дубликаты возможны и обрабатываются через Idempotent consumer.

MANUAL_IMMEDIATE коммитит сразу при ack.acknowledge(). MANUAL — собирает acks и коммитит batch-ом. Для большинства случаев MANUAL_IMMEDIATE проще и предсказуемее.

Listener idempotent

R-KFK-CONS-3: сообщение может прийти 2+ раз — это норма at-least-once.

Сценарии дубликатов:

  • Producer retry с enable.idempotence: false (но в UCP всегда true).
  • Consumer rebalance до ack.acknowledge().
  • DLQ replay.
  • Schema redeploy с offset reset.

Защита — processed_event таблица в PG, проверка eventId перед обработкой. Подробнее — Idempotent consumer.

auto.offset.reset: earliest

R-KFK-CONS-4: для critical-consumer'ов.

spring:
  kafka:
    consumer:
      auto-offset-reset: earliest

Когда срабатывает: новая consumer-group без сохранённого offset, или offset устарел (вышел за retention).

  • latest (Spring default) — начать с самого нового. Пропускаем все сообщения, которые были до старта consumer-а. Для денег/orders — катастрофа.
  • earliest — начать с самого старого retained-сообщения. Прочитаем всё. Для critical — обязательно.

Для analytics или metrics, где старые данные не нужны — latest приемлемо. Но это исключение.

Concurrency ≤ partitions

R-KFK-CONS-5: настраивается per-listener.

@KafkaListener(
    topics = "orders.confirmed",
    groupId = "billing-service",
    concurrency = "3"
)
public void handle(...) { ... }

Concurrency = количество thread-ов внутри одного pod-а, обрабатывающих partitions параллельно.

Правило: concurrency × pods ≤ partitions. Иначе лишние threads простаивают (Kafka назначает partition только одному consumer в группе).

Если у топика 10 partitions, у нас 2 pods, и хотим максимальный параллелизм — concurrency=5 на каждом pod = 10 thread'ов = 10 partitions. Больше не имеет смысла.

max.poll.interval.ms

R-KFK-CONS-6: лимит времени между poll-ами.

spring:
  kafka:
    consumer:
      properties:
        max.poll.interval.ms: 600000   # 10 минут
        max.poll.records: 50

Default max.poll.interval.ms = 5 минут. Если обработка одного poll (max.poll.records записей) занимает дольше — Kafka считает consumer dead, rebalance-ит.

Сценарий поломки:

  1. Listener получил 500 записей.
  2. Обрабатывает в цикле, каждая запись 800ms (DB-запросы).
  3. Через 5 минут Kafka rebalance: «consumer не отвечал, забираем partition».
  4. Новый consumer получает те же записи → дубликаты.

Варианты:

  • Уменьшить max.poll.records (по умолчанию 500, можно 50).
  • Увеличить max.poll.interval.ms (10-15 минут).
  • Сделать обработку быстрее (batch SQL, parallel processing).

Что запрещено

enable.auto.commit: true в проде

R-KFK-CONS-X1: offset коммитится по таймеру независимо от успеха.

Сценарий: auto-commit interval 5 секунд. Listener получил 100 записей, обрабатывает медленно. Через 5 секунд commit offset за все 100 — но обработано только 20. Crash на 21-й → restart → consumer думает «я обработал все 100», читает с 101. 80 записей потеряны.

Manual ack — единственная защита.

Thread.sleep > 1s в listener

R-KFK-CONS-X2: блокирует poll-цикл.

// КАТАСТРОФА
@KafkaListener(...)
public void handle(...) {
    for (int i = 0; i < 3; i++) {
        try {
            externalApi.call(...);
            break;
        } catch (Exception e) {
            Thread.sleep(60_000);
        }
    }
}

Через 3 минуты Kafka rebalance, partition отбирают, дубликаты гарантированы.

Альтернатива — retry topic с non-blocking delay. Подробнее — Retry topic + DLQ.

group.id отсутствует или общий

R-KFK-CONS-X3: без явного group.id Spring генерирует случайный UUID при каждом старте. Нет sharing-а partitions между pods, нет offset persistence — каждый рестарт всё с начала.

Общий group.id для разных listener-ов размывает rebalance-логику.

HTTP к внешней системе без CB/Bulkhead

R-KFK-CONS-X4: если listener делает HTTP-вызов и внешняя система лежит — RestClient зависает на 30 секунд timeout по умолчанию.

@KafkaListener(...)
public void handle(...) {
    externalService.call(...);
}

Через 30s × 500 records / poll = 15000 секунд = 4+ часа на один poll. Rebalance гарантирован, дубликаты гарантированы, downstream получает шторм повторов.

Защита — @CircuitBreaker + @Bulkhead + timeout. См. Resilience → bulkhead и Resilience → circuit breaker.

Что запрещено — таблица

АнтипаттернПравилоЧто взамен
enable.auto.commit: trueR-KFK-CONS-X1manual ack MANUAL_IMMEDIATE
Thread.sleep > 1s в listenerR-KFK-CONS-X2retry topic
group.id отсутствует или общийR-KFK-CONS-X3<service>-<purpose>
HTTP без CB/Bulkhead в listenerR-KFK-CONS-X4@CircuitBreaker + bulkhead + timeout
auto.offset.reset: latest для criticalR-KFK-CONS-4earliest
Concurrency > partitionsR-KFK-CONS-5≤ partitions
max.poll.records=500 для тяжёлой обработкиR-KFK-CONS-650-100 + max.poll.interval.ms
Listener без try/catch на NonRetryableR-KFK-CONS-2DLQ + ack

Куда дальше

  • Kafka → раздел 2. Consumer — нормативные формулировки.
  • Idempotent consumer — processed_event, dedup.
  • Retry topic + DLQ — non-blocking retry.
  • Configuration — full application.yml.
  • Resilience → bulkhead — изоляция HTTP-вызовов из consumer.
  • Resilience → circuit breaker — защита от cascading.
  • Observability — consumer lag alerts.