← назад к разделу

При остановке приложения Kafka-потребитель и продюсер нужно завершать аккуратно. Иначе одна из двух бед: либо сообщения обрабатываются повторно при следующем запуске, либо часть сообщений, которые уже отправил продюсер, так и не добирается до брокера.

Разберём оба случая по порядку.

Что происходит с потребителем при остановке

Kafka-потребитель читает сообщения пачками. После обработки он фиксирует прогресс в брокере — это называется «закоммитить offset». Если приложение убить до коммита, при следующем запуске потребитель прочитает те же сообщения заново (replay).

Spring управляет потребителями через ConcurrentMessageListenerContainer. При получении сигнала остановки он вызывает stop(timeout) на каждом контейнере: ждёт, пока текущая пачка доберётся до конца, затем коммитит offset и закрывает KafkaConsumer.

Ключевая настройка — время ожидания:

spring:
  kafka:
    listener:
      shutdown-timeout: 20s
      ack-mode: BATCH

shutdown-timeout: 20s — максимум, сколько Spring ждёт завершения текущей пачки на каждый контейнер. Если listener не укладывается — Spring прерывает его принудительно, offset не фиксируется, и при следующем старте сообщения придут снова.

Почему нельзя делать долгие операции внутри listener

Listener обрабатывает сообщение и укладывается в shutdown-timeout. Проблема возникает, когда внутри listener идут несколько HTTP-вызовов с повторными попытками:

// Опасно — суммарное время 65s, в shutdown-timeout 20s не влезет
@KafkaListener(...)
public void onConfirmed(OrderConfirmedEvent event, Acknowledgment ack) {
    paymentClient.charge(...);        // 5s + повторы 30s
    notificationClient.send(...);     // 3s + повторы 15s
    analyticsClient.track(...);       // 2s + повторы 10s
    ack.acknowledge();
}

В худшем случае listener занимает 65 секунд. shutdown-timeout: 20s истечёт, Spring прервёт обработку, offset не зафиксируется. При перезапуске событие придёт снова — если paymentClient.charge уже прошёл, получится двойное списание.

Решение — listener делает только быструю локальную работу, а «тяжёлые» операции откладывает через outbox-паттерн:

@KafkaListener(...)
@Transactional
public void onConfirmed(OrderConfirmedEvent event, Acknowledgment ack) {
    if (!processedEventRepository.tryMarkProcessed(event.eventId(), "billing")) {
        ack.acknowledge();
        return;
    }
    billingService.recordChargeIntent(event.orderId(), event.totalAmount());
    outboxRepository.append(
        new ChargePaymentRequested(event.orderId(), event.totalAmount())
    );
    ack.acknowledge();
}

Listener сохраняет намерение в базу и кладёт событие в outbox — всё в одной локальной транзакции, завершается за миллисекунды. Реальный HTTP-вызов к платёжному сервису делает отдельный процессор outbox. Shutdown проходит чисто.

Режим подтверждения сообщений — ack-mode

ack-mode определяет, когда именно потребитель сообщает брокеру «я обработал эти сообщения». Это важно для поведения при остановке.

ack-modeКогда фиксируется offsetПоведение при shutdown
BATCHПосле всей пачкиЛибо вся пачка закоммичена, либо нет — чисто
RECORDПосле каждого сообщенияТочно, но больше запросов к брокеру
MANUALПри явном ack.acknowledge()Контролируется кодом
AUTOАвтоматически каждые N секундОпасен при shutdown

Для большинства случаев подходит BATCH: один коммит на всю пачку, минимум накладных расходов, при остановке поведение предсказуемо — либо вся пачка обработана и зафиксирована, либо придёт на повтор целиком.

Самая опасная ошибка — enable.auto.commit

Есть настройка, которая кажется удобной, но при остановке приложения приводит к потере сообщений:

# Опасно
spring:
  kafka:
    consumer:
      enable-auto-commit: true
      auto-commit-interval-ms: 5000

Вот как происходит потеря. Отдельный поток автоматически каждые 5 секунд фиксирует текущий offset — независимо от того, обработаны ли сообщения на самом деле. При остановке приложения этот поток может зафиксировать offset 200, хотя реально обработаны только сообщения 100–149. После перезапуска потребитель начнёт читать с 200 — сообщения 150–200 потеряны навсегда.

Правильно — enable-auto-commit: false и управлять коммитами явно через ack-mode: BATCH или MANUAL.

Продюсер — flush при остановке

Kafka-продюсер не отправляет каждое сообщение немедленно: он накапливает их в буфере и сбрасывает пачками. Если приложение остановить до сброса — накопленные сообщения пропадут.

Spring Boot при остановке автоматически вызывает KafkaTemplate.destroy(), который делает Producer.close(Duration.ofSeconds(30)). Это сбрасывает все накопленные сообщения на брокер и закрывает соединение. Если вы используете стандартный KafkaTemplate — ничего дополнительно настраивать не нужно.

Если в проекте используется KafkaProducer напрямую (нестандартная ситуация), нужно явно задать destroyMethod:

@Bean(destroyMethod = "")
KafkaProducer<String, Object> rawProducer() {
    return new KafkaProducer<>(producerProperties());
}

@PreDestroy
public void close() {
    producer.close(Duration.ofSeconds(15)); // явный flush с таймаутом
}

Без явного вызова close(Duration) — накопленные сообщения потеряются.

Коротко

  • При остановке Spring вызывает ConcurrentMessageListenerContainer.stop(timeout) — потребитель дожидается конца текущей пачки и коммитит offset.
  • spring.kafka.listener.shutdown-timeout: 20s — сколько ждать завершения на каждый контейнер; если listener не успевает, offset не фиксируется и сообщения придут повторно.
  • Listener должен завершаться быстро — тяжёлые операции с повторами выносят в outbox, иначе приложение не уложится в timeout.
  • ack-mode: BATCH — рекомендуемый режим: один коммит на пачку, при остановке поведение предсказуемо.
  • enable.auto.commit: true — опасен: offset может зафиксироваться раньше реальной обработки, при shutdown сообщения теряются безвозвратно.
  • KafkaTemplate автоматически сбрасывает буфер продюсера при остановке; для прямого KafkaProducer нужен явный close(Duration).

Что почитать дальше

  • JVM/Spring конфигурация — базовая настройка graceful shutdown в Spring Boot.
  • Идемпотентность при остановке — как защититься от повторной обработки при replay.
  • Бюджеты и наблюдаемость — Kafka в общем бюджете времени на shutdown.