Опирается на правила:
R-KFK-CONS-1…R-KFK-CONS-6иR-KFK-CONS-X1…R-KFK-CONS-X4из Kafka Style Guide → раздел 2. Consumer.
Важно знать
- Уникальный
group.idформата<service>-<consumer-purpose>. Один group = одна логическая роль.- Manual ack обязателен (
ack-mode: MANUAL_IMMEDIATE). Auto-commit = потеря данных при crash.- Listener idempotent — at-least-once delivery, duplicate-detection на стороне consumer.
auto.offset.reset: earliestдля critical-consumer'ов.latestпропускает события.- Concurrency ≤ числа partition'ов — иначе лишние consumer-instance простаивают.
max.poll.interval.ms≥ ожидаемого времени обработки batch + buffer.Thread.sleep > 1sв listener — блокирует poll-цикл, провоцирует rebalance.- HTTP к внешней системе без CB/Bulkhead — listener зависает, rebalance, дубликаты.
Consumer — точка, где сервис принимает факт из внешнего мира. Главные ошибки здесь — потерянное событие (broker отдал, мы не обработали) и rebalance loop (consumer не успевает poll-нуть, Kafka считает его dead, переотдаёт partition'ы).
group.id per logical purpose
R-KFK-CONS-1: формат <service>-<consumer-purpose>.
spring:
kafka:
consumer:
group-id: order-service-payments-listener
Или per-listener через аннотацию:
@KafkaListener(
topics = "payment.events",
groupId = "order-service-payments-listener"
)
public void onPaymentEvent(PaymentEvent event, Acknowledgment ack) { ... }
@KafkaListener(
topics = "inventory.events",
groupId = "order-service-inventory-listener"
)
public void onInventoryEvent(InventoryEvent event, Acknowledgment ack) { ... }
Один consumer-group = одна логическая роль. Каждый group имеет независимый offset, независимый rebalance.
Если бы был общий group.id: order-service для всех listener-ов — Kafka бы распределяла partitions между instance-ами одного group, не понимая, какой listener на каком partition. При rebalance одного listener-а ребаланс-ятся все.
Manual ack
R-KFK-CONS-2: ack-mode: MANUAL_IMMEDIATE (или MANUAL).
spring:
kafka:
listener:
ack-mode: MANUAL_IMMEDIATE
consumer:
enable-auto-commit: false
@KafkaListener(topics = "orders.confirmed", groupId = "billing-service")
public void handle(ConsumerRecord<String, OrderConfirmedEvent> record, Acknowledgment ack) {
try {
processEvent(record.value());
ack.acknowledge();
} catch (RetryableException e) {
throw e;
} catch (NonRetryableException e) {
dlqProducer.send(record, e);
ack.acknowledge();
}
}
Логика: ack.acknowledge() коммитит offset только после успешной обработки. Если listener бросает exception до ack — следующий poll получит то же сообщение снова. Это даёт at-least-once: ни одно событие не теряется, дубликаты возможны и обрабатываются через Idempotent consumer.
MANUAL_IMMEDIATE коммитит сразу при ack.acknowledge(). MANUAL — собирает acks и коммитит batch-ом. Для большинства случаев MANUAL_IMMEDIATE проще и предсказуемее.
Listener idempotent
R-KFK-CONS-3: сообщение может прийти 2+ раз — это норма at-least-once.
Сценарии дубликатов:
- Producer retry с
enable.idempotence: false(но в UCP всегдаtrue). - Consumer rebalance до
ack.acknowledge(). - DLQ replay.
- Schema redeploy с offset reset.
Защита — processed_event таблица в PG, проверка eventId перед обработкой. Подробнее — Idempotent consumer.
auto.offset.reset: earliest
R-KFK-CONS-4: для critical-consumer'ов.
spring:
kafka:
consumer:
auto-offset-reset: earliest
Когда срабатывает: новая consumer-group без сохранённого offset, или offset устарел (вышел за retention).
latest(Spring default) — начать с самого нового. Пропускаем все сообщения, которые были до старта consumer-а. Для денег/orders — катастрофа.earliest— начать с самого старого retained-сообщения. Прочитаем всё. Для critical — обязательно.
Для analytics или metrics, где старые данные не нужны — latest приемлемо. Но это исключение.
Concurrency ≤ partitions
R-KFK-CONS-5: настраивается per-listener.
@KafkaListener(
topics = "orders.confirmed",
groupId = "billing-service",
concurrency = "3"
)
public void handle(...) { ... }
Concurrency = количество thread-ов внутри одного pod-а, обрабатывающих partitions параллельно.
Правило: concurrency × pods ≤ partitions. Иначе лишние threads простаивают (Kafka назначает partition только одному consumer в группе).
Если у топика 10 partitions, у нас 2 pods, и хотим максимальный параллелизм — concurrency=5 на каждом pod = 10 thread'ов = 10 partitions. Больше не имеет смысла.
max.poll.interval.ms
R-KFK-CONS-6: лимит времени между poll-ами.
spring:
kafka:
consumer:
properties:
max.poll.interval.ms: 600000 # 10 минут
max.poll.records: 50
Default max.poll.interval.ms = 5 минут. Если обработка одного poll (max.poll.records записей) занимает дольше — Kafka считает consumer dead, rebalance-ит.
Сценарий поломки:
- Listener получил 500 записей.
- Обрабатывает в цикле, каждая запись 800ms (DB-запросы).
- Через 5 минут Kafka rebalance: «consumer не отвечал, забираем partition».
- Новый consumer получает те же записи → дубликаты.
Варианты:
- Уменьшить
max.poll.records(по умолчанию 500, можно 50). - Увеличить
max.poll.interval.ms(10-15 минут). - Сделать обработку быстрее (batch SQL, parallel processing).
Что запрещено
enable.auto.commit: true в проде
R-KFK-CONS-X1: offset коммитится по таймеру независимо от успеха.
Сценарий: auto-commit interval 5 секунд. Listener получил 100 записей, обрабатывает медленно. Через 5 секунд commit offset за все 100 — но обработано только 20. Crash на 21-й → restart → consumer думает «я обработал все 100», читает с 101. 80 записей потеряны.
Manual ack — единственная защита.
Thread.sleep > 1s в listener
R-KFK-CONS-X2: блокирует poll-цикл.
// КАТАСТРОФА
@KafkaListener(...)
public void handle(...) {
for (int i = 0; i < 3; i++) {
try {
externalApi.call(...);
break;
} catch (Exception e) {
Thread.sleep(60_000);
}
}
}
Через 3 минуты Kafka rebalance, partition отбирают, дубликаты гарантированы.
Альтернатива — retry topic с non-blocking delay. Подробнее — Retry topic + DLQ.
group.id отсутствует или общий
R-KFK-CONS-X3: без явного group.id Spring генерирует случайный UUID при каждом старте. Нет sharing-а partitions между pods, нет offset persistence — каждый рестарт всё с начала.
Общий group.id для разных listener-ов размывает rebalance-логику.
HTTP к внешней системе без CB/Bulkhead
R-KFK-CONS-X4: если listener делает HTTP-вызов и внешняя система лежит — RestClient зависает на 30 секунд timeout по умолчанию.
@KafkaListener(...)
public void handle(...) {
externalService.call(...);
}
Через 30s × 500 records / poll = 15000 секунд = 4+ часа на один poll. Rebalance гарантирован, дубликаты гарантированы, downstream получает шторм повторов.
Защита — @CircuitBreaker + @Bulkhead + timeout. См. Resilience → bulkhead и Resilience → circuit breaker.
Что запрещено — таблица
| Антипаттерн | Правило | Что взамен |
|---|---|---|
enable.auto.commit: true | R-KFK-CONS-X1 | manual ack MANUAL_IMMEDIATE |
Thread.sleep > 1s в listener | R-KFK-CONS-X2 | retry topic |
group.id отсутствует или общий | R-KFK-CONS-X3 | <service>-<purpose> |
| HTTP без CB/Bulkhead в listener | R-KFK-CONS-X4 | @CircuitBreaker + bulkhead + timeout |
auto.offset.reset: latest для critical | R-KFK-CONS-4 | earliest |
| Concurrency > partitions | R-KFK-CONS-5 | ≤ partitions |
max.poll.records=500 для тяжёлой обработки | R-KFK-CONS-6 | 50-100 + max.poll.interval.ms |
| Listener без try/catch на NonRetryable | R-KFK-CONS-2 | DLQ + ack |
Куда дальше
- Kafka → раздел 2. Consumer — нормативные формулировки.
- Idempotent consumer —
processed_event, dedup. - Retry topic + DLQ — non-blocking retry.
- Configuration — full
application.yml. - Resilience → bulkhead — изоляция HTTP-вызовов из consumer.
- Resilience → circuit breaker — защита от cascading.
- Observability — consumer lag alerts.