Опирается на правила:
R-SHUT-KFK-1…R-SHUT-KFK-4иR-SHUT-KFK-X1из Graceful Shutdown Style Guide → раздел 3. Kafka shutdown.
Важно знать
@KafkaListenerдожимает текущий batch и коммитит offset перед остановкой.spring.kafka.listener.shutdown-timeout: 20s— max ожидание per-container.- Listener-метод не запускает долгий cascade — не уложится в shutdown-timeout.
ack-mode: BATCHилиRECORDявно — неMANUAL_IMMEDIATEбез обоснования.- Producer.flush() автоматически через
KafkaTemplate.destroy(). Raw producer — явныйclose(Duration.ofSeconds(15)).enable.auto.commit: true— потеря сообщений при shutdown.
Kafka shutdown — место, где легко потерять данные. Если consumer убит до commit offset — следующий запуск получит те же сообщения (replay). Если producer убит до flush — отправленные но не сброшенные batch теряются. UCP формулирует: дожать batch + явный flush + idempotent consumer на случай replay.
Listener container stop
R-SHUT-KFK-1: текущий batch завершается, offset коммитится.
spring:
kafka:
listener:
shutdown-timeout: 20s
ack-mode: BATCH
Что делает Spring при shutdown:
ConcurrentMessageListenerContainer.stop(timeout)вызывается на каждом listener.- Каждый consumer thread завершает обработку текущего batch.
- Если
MANUAL_IMMEDIATE— последнийack.acknowledge()коммитит offset. - Если
BATCH— Spring коммитит offset после успешной обработки всего batch. - Consumer закрывается (
KafkaConsumer.close()), отсоединение от group → rebalance.
shutdown-timeout: 20s — максимум, сколько ждать на каждый container. Если listener зависит от внешнего HTTP и тот лагает 30s — listener не уложится, Spring force-shutdown-ит, offset не закоммичен → replay при следующем старте.
@KafkaListener(
topics = "orders.confirmed",
groupId = "billing-service-confirmations",
containerFactory = "kafkaListenerContainerFactory"
)
@Transactional
public void onConfirmed(OrderConfirmedEvent event, Acknowledgment ack) {
if (!processedEventRepository.tryMarkProcessed(event.eventId(), "billing-confirmations")) {
ack.acknowledge();
return;
}
billingService.charge(event.orderId(), event.totalAmount());
ack.acknowledge();
}
Listener не запускает долгий cascade
R-SHUT-KFK-2: чистая обработка в листенере.
// ОПАСНО — chain HTTP-вызовов
@KafkaListener(...)
public void onConfirmed(OrderConfirmedEvent event, Acknowledgment ack) {
paymentClient.charge(...); // 5s + retry 30s
notificationClient.send(...); // 3s + retry 15s
analyticsClient.track(...); // 2s + retry 10s
ack.acknowledge();
}
Худший случай — 65 секунд. shutdown-timeout: 20s не успеет, force-shutdown, offset не коммитится, при перезапуске тот же event обрабатывается снова. Если payment-charge прошёл, но не было ack → второй заряд.
Идемпотентность (см. Idempotent consumer) защищает от двойной обработки, но shutdown всё равно затягивается и приводит к SIGKILL.
Корректно — chain через outbox:
@KafkaListener(...)
@Transactional
public void onConfirmed(OrderConfirmedEvent event, Acknowledgment ack) {
if (!processedEventRepository.tryMarkProcessed(event.eventId(), "billing")) {
ack.acknowledge();
return;
}
billingService.recordChargeIntent(event.orderId(), event.totalAmount());
outboxRepository.append(new ChargePaymentRequested(event.orderId(), event.totalAmount()));
ack.acknowledge();
}
Listener делает локальную транзакцию + outbox event. Реальный HTTP charge — в отдельном task-queue worker. Listener завершается за < 100ms, shutdown safe.
ack-mode явный
R-SHUT-KFK-3: BATCH или RECORD, не MANUAL_IMMEDIATE по умолчанию.
spring:
kafka:
listener:
ack-mode: BATCH
| ack-mode | Когда commit | Поведение при shutdown |
|---|---|---|
| BATCH | После всего batch | Optimal: либо весь batch обработан и committed, либо весь не committed (replay) |
| RECORD | После каждого record | Точно, но overhead на commit |
| MANUAL | ack.acknowledge() | Точно, контролируется кодом |
| MANUAL_IMMEDIATE | ack.acknowledge() сразу | Точно, immediate commit |
| AUTO (auto-commit) | Каждые N секунд асинхронно | Опасно — см. ниже |
Для большинства случаев — BATCH: один commit per batch, минимум overhead, поведение при shutdown предсказуемо.
MANUAL_IMMEDIATE нужен только когда хочется гарантировать commit сразу после каждого ack — обычно для money с critical reliability. Но в UCP при money используется outbox + idempotent consumer, что устраняет необходимость в MANUAL_IMMEDIATE.
Producer flush
R-SHUT-KFK-4: автоматически через KafkaTemplate.destroy().
@Configuration
public class KafkaConfig {
@Bean
KafkaTemplate<String, Object> kafkaTemplate(ProducerFactory<String, Object> pf) {
return new KafkaTemplate<>(pf);
}
}
Spring Boot при shutdown вызывает KafkaTemplate.destroy(), который вызывает Producer.close(Duration.ofSeconds(30)). Это flush все pending batches на broker, потом close connection.
Если используется raw KafkaProducer (редко, обычно не нужно в UCP):
@Bean(destroyMethod = "close")
KafkaProducer<String, Object> rawProducer() {
return new KafkaProducer<>(producerProperties());
}
@PreDestroy
public void close() {
producer.close(Duration.ofSeconds(15));
}
destroyMethod = "close" вызывает дефолтный close() — Spring 5 автоматически распознаёт Closeable. Явный close(Duration.ofSeconds(15)) даёт явный timeout для flush.
Без flush — отправленные producer.send(...) но ещё не batch-отправленные сообщения потеряются.
Что запрещено
enable.auto.commit: true
R-SHUT-KFK-X1: классический антипаттерн.
# КАТАСТРОФА
spring:
kafka:
consumer:
enable-auto-commit: true
auto-commit-interval-ms: 5000
Сценарий потери:
- Auto-commit thread каждые 5s коммитит offset.
- Listener получил batch с offset 100-200, начал обработку.
- На offset 150 — SIGTERM.
- Auto-commit thread (если успел) уже зафиксировал offset 200 как «обработано».
- Force shutdown, offset 200 committed, реально обработано 100-149.
- При следующем старте consumer читает с offset 200 → записи 150-200 потеряны.
Корректно — enable-auto-commit: false + manual ack или BATCH. Уже запрещено в Kafka style guide (R-KFK-CONS-X1), здесь — повторение для shutdown-контекста.
Что запрещено — таблица
| Антипаттерн | Правило | Что взамен |
|---|---|---|
enable.auto.commit: true | R-SHUT-KFK-X1 | false + manual / BATCH ack |
shutdown-timeout не задан или > 30s | R-SHUT-KFK-1 | 15-25s, помещается в budget |
| Long cascade в listener (HTTP + retry > 20s) | R-SHUT-KFK-2 | outbox + task-queue |
ack-mode: AUTO без обоснования | R-SHUT-KFK-3 | BATCH дефолт |
Raw KafkaProducer без close(Duration) | R-SHUT-KFK-4 | destroyMethod = "close" |
Listener без @Transactional | R-SHUT-KFK-1 | TX обертка для consistency |
Producer.send без .get() для critical messages | R-SHUT-KFK-4 | .get() или async с error handling |
MANUAL_IMMEDIATE для high-throughput | R-SHUT-KFK-3 | BATCH для производительности |
Куда дальше
- Graceful Shutdown → раздел 3. Kafka shutdown — нормативные формулировки.
- JVM/Spring конфигурация — общий graceful Spring.
- Идемпотентность in-flight — replay-safety.
- Бюджеты и observability — Kafka в общем 60s budget.
- Kafka → consumer — manual ack details.
- Kafka → idempotent consumer —
processed_eventдля replay. - Kafka → outbox publishing — выносим cascade из listener.