Опирается на правила: R-SHUT-KFK-1R-SHUT-KFK-4 и R-SHUT-KFK-X1 из Graceful Shutdown Style Guide → раздел 3. Kafka shutdown.

Важно знать

  • @KafkaListener дожимает текущий batch и коммитит offset перед остановкой.
  • spring.kafka.listener.shutdown-timeout: 20s — max ожидание per-container.
  • Listener-метод не запускает долгий cascade — не уложится в shutdown-timeout.
  • ack-mode: BATCH или RECORD явно — не MANUAL_IMMEDIATE без обоснования.
  • Producer.flush() автоматически через KafkaTemplate.destroy(). Raw producer — явный close(Duration.ofSeconds(15)).
  • enable.auto.commit: true — потеря сообщений при shutdown.

Kafka shutdown — место, где легко потерять данные. Если consumer убит до commit offset — следующий запуск получит те же сообщения (replay). Если producer убит до flush — отправленные но не сброшенные batch теряются. UCP формулирует: дожать batch + явный flush + idempotent consumer на случай replay.

Listener container stop

R-SHUT-KFK-1: текущий batch завершается, offset коммитится.

spring:
  kafka:
    listener:
      shutdown-timeout: 20s
      ack-mode: BATCH

Что делает Spring при shutdown:

  1. ConcurrentMessageListenerContainer.stop(timeout) вызывается на каждом listener.
  2. Каждый consumer thread завершает обработку текущего batch.
  3. Если MANUAL_IMMEDIATE — последний ack.acknowledge() коммитит offset.
  4. Если BATCH — Spring коммитит offset после успешной обработки всего batch.
  5. Consumer закрывается (KafkaConsumer.close()), отсоединение от group → rebalance.

shutdown-timeout: 20s — максимум, сколько ждать на каждый container. Если listener зависит от внешнего HTTP и тот лагает 30s — listener не уложится, Spring force-shutdown-ит, offset не закоммичен → replay при следующем старте.

@KafkaListener(
    topics = "orders.confirmed",
    groupId = "billing-service-confirmations",
    containerFactory = "kafkaListenerContainerFactory"
)
@Transactional
public void onConfirmed(OrderConfirmedEvent event, Acknowledgment ack) {
    if (!processedEventRepository.tryMarkProcessed(event.eventId(), "billing-confirmations")) {
        ack.acknowledge();
        return;
    }
    billingService.charge(event.orderId(), event.totalAmount());
    ack.acknowledge();
}

Listener не запускает долгий cascade

R-SHUT-KFK-2: чистая обработка в листенере.

// ОПАСНО — chain HTTP-вызовов
@KafkaListener(...)
public void onConfirmed(OrderConfirmedEvent event, Acknowledgment ack) {
    paymentClient.charge(...);          // 5s + retry 30s
    notificationClient.send(...);       // 3s + retry 15s
    analyticsClient.track(...);         // 2s + retry 10s
    ack.acknowledge();
}

Худший случай — 65 секунд. shutdown-timeout: 20s не успеет, force-shutdown, offset не коммитится, при перезапуске тот же event обрабатывается снова. Если payment-charge прошёл, но не было ack → второй заряд.

Идемпотентность (см. Idempotent consumer) защищает от двойной обработки, но shutdown всё равно затягивается и приводит к SIGKILL.

Корректно — chain через outbox:

@KafkaListener(...)
@Transactional
public void onConfirmed(OrderConfirmedEvent event, Acknowledgment ack) {
    if (!processedEventRepository.tryMarkProcessed(event.eventId(), "billing")) {
        ack.acknowledge();
        return;
    }
    billingService.recordChargeIntent(event.orderId(), event.totalAmount());
    outboxRepository.append(new ChargePaymentRequested(event.orderId(), event.totalAmount()));
    ack.acknowledge();
}

Listener делает локальную транзакцию + outbox event. Реальный HTTP charge — в отдельном task-queue worker. Listener завершается за < 100ms, shutdown safe.

ack-mode явный

R-SHUT-KFK-3: BATCH или RECORD, не MANUAL_IMMEDIATE по умолчанию.

spring:
  kafka:
    listener:
      ack-mode: BATCH
ack-modeКогда commitПоведение при shutdown
BATCHПосле всего batchOptimal: либо весь batch обработан и committed, либо весь не committed (replay)
RECORDПосле каждого recordТочно, но overhead на commit
MANUALack.acknowledge()Точно, контролируется кодом
MANUAL_IMMEDIATEack.acknowledge() сразуТочно, immediate commit
AUTO (auto-commit)Каждые N секунд асинхронноОпасно — см. ниже

Для большинства случаев — BATCH: один commit per batch, минимум overhead, поведение при shutdown предсказуемо.

MANUAL_IMMEDIATE нужен только когда хочется гарантировать commit сразу после каждого ack — обычно для money с critical reliability. Но в UCP при money используется outbox + idempotent consumer, что устраняет необходимость в MANUAL_IMMEDIATE.

Producer flush

R-SHUT-KFK-4: автоматически через KafkaTemplate.destroy().

@Configuration
public class KafkaConfig {

    @Bean
    KafkaTemplate<String, Object> kafkaTemplate(ProducerFactory<String, Object> pf) {
        return new KafkaTemplate<>(pf);
    }
}

Spring Boot при shutdown вызывает KafkaTemplate.destroy(), который вызывает Producer.close(Duration.ofSeconds(30)). Это flush все pending batches на broker, потом close connection.

Если используется raw KafkaProducer (редко, обычно не нужно в UCP):

@Bean(destroyMethod = "close")
KafkaProducer<String, Object> rawProducer() {
    return new KafkaProducer<>(producerProperties());
}

@PreDestroy
public void close() {
    producer.close(Duration.ofSeconds(15));
}

destroyMethod = "close" вызывает дефолтный close() — Spring 5 автоматически распознаёт Closeable. Явный close(Duration.ofSeconds(15)) даёт явный timeout для flush.

Без flush — отправленные producer.send(...) но ещё не batch-отправленные сообщения потеряются.

Что запрещено

enable.auto.commit: true

R-SHUT-KFK-X1: классический антипаттерн.

# КАТАСТРОФА
spring:
  kafka:
    consumer:
      enable-auto-commit: true
      auto-commit-interval-ms: 5000

Сценарий потери:

  1. Auto-commit thread каждые 5s коммитит offset.
  2. Listener получил batch с offset 100-200, начал обработку.
  3. На offset 150 — SIGTERM.
  4. Auto-commit thread (если успел) уже зафиксировал offset 200 как «обработано».
  5. Force shutdown, offset 200 committed, реально обработано 100-149.
  6. При следующем старте consumer читает с offset 200 → записи 150-200 потеряны.

Корректно — enable-auto-commit: false + manual ack или BATCH. Уже запрещено в Kafka style guide (R-KFK-CONS-X1), здесь — повторение для shutdown-контекста.

Что запрещено — таблица

АнтипаттернПравилоЧто взамен
enable.auto.commit: trueR-SHUT-KFK-X1false + manual / BATCH ack
shutdown-timeout не задан или > 30sR-SHUT-KFK-115-25s, помещается в budget
Long cascade в listener (HTTP + retry > 20s)R-SHUT-KFK-2outbox + task-queue
ack-mode: AUTO без обоснованияR-SHUT-KFK-3BATCH дефолт
Raw KafkaProducer без close(Duration)R-SHUT-KFK-4destroyMethod = "close"
Listener без @TransactionalR-SHUT-KFK-1TX обертка для consistency
Producer.send без .get() для critical messagesR-SHUT-KFK-4.get() или async с error handling
MANUAL_IMMEDIATE для high-throughputR-SHUT-KFK-3BATCH для производительности

Куда дальше

  • Graceful Shutdown → раздел 3. Kafka shutdown — нормативные формулировки.
  • JVM/Spring конфигурация — общий graceful Spring.
  • Идемпотентность in-flight — replay-safety.
  • Бюджеты и observability — Kafka в общем 60s budget.
  • Kafka → consumer — manual ack details.
  • Kafka → idempotent consumer — processed_event для replay.
  • Kafka → outbox publishing — выносим cascade из listener.