← назад к разделу

RabbitMQ — популярный брокер сообщений. Работать с ним из Java можно напрямую через его Java-клиент, но это много низкоуровневого кода: управление соединениями, потоками, ack/nack вручную. Spring AMQP (библиотека spring-rabbit) берёт этот код на себя и даёт удобный Spring-способ отправлять и получать сообщения.

Подключение

Одна зависимость в Gradle добавляет всё нужное:

// build.gradle.kts
dependencies {
    implementation("org.springframework.boot:spring-boot-starter-amqp")
}

Базовая конфигурация в application.yml:

spring:
  rabbitmq:
    host: rabbit
    port: 5672
    username: billing-service
    password: ${RABBIT_PASSWORD}
    virtual-host: /prod

Spring Boot автоматически создаст соединение и настроит все нужные бины.

Как отправить сообщение: RabbitTemplate

Раньше, чтобы опубликовать сообщение, нужно было вручную открывать канал, сериализовать объект, вызывать методы протокола AMQP. Spring AMQP делает это одним вызовом.

Главный инструмент для отправки — RabbitTemplate. Его создаёт Spring Boot автоматически, достаточно внедрить как зависимость:

@Component
@RequiredArgsConstructor
public class OrderEventPublisher {
    private final RabbitTemplate rabbit;

    public void publish(OrderCreatedEvent event) {
        rabbit.convertAndSend("orders", "order.created", event);
    }
}

convertAndSend(exchange, routingKey, payload) — основной метод. Три параметра:

  • "orders" — имя exchange (куда шлём);
  • "order.created" — routing key (по нему exchange решает, в какую очередь направить);
  • event — объект, который станет телом сообщения.

Зачем менять сериализацию

По умолчанию Spring AMQP сериализует объект через Java-сериализацию. Это работает, но у такого подхода есть проблема: другой сервис (или другой язык) не сможет прочитать сообщение. Формат Java-сериализации понимает только Java.

Решение — переключиться на JSON. Для этого нужно объявить конвертер и указать его шаблону:

@Configuration
public class RabbitConfig {

    @Bean
    MessageConverter jacksonConverter(ObjectMapper mapper) {
        return new Jackson2JsonMessageConverter(mapper);
    }

    @Bean
    RabbitTemplate rabbitTemplate(ConnectionFactory cf, MessageConverter converter) {
        var t = new RabbitTemplate(cf);
        t.setMessageConverter(converter);
        return t;
    }
}

После этого payload уходит в виде JSON. Spring автоматически добавит заголовок content-type: application/json — получатель поймёт, как десериализовать.

Как получить сообщение: @RabbitListener

Подписаться на очередь проще всего через аннотацию @RabbitListener:

@Component
public class OrderEventListener {

    @RabbitListener(queues = "orders.fulfillment", concurrency = "3-10")
    public void handle(OrderCreatedEvent event) {
        // обрабатываем событие
    }
}

Spring запустит пул потоков, которые будут читать из очереди и вызывать метод. Параметр concurrency = "3-10" задаёт диапазон: минимум 3 потока постоянно, и до 10 при увеличении нагрузки.

Объект OrderCreatedEvent Spring десериализует автоматически из JSON, если настроен MessageConverter.

Как объявить очередь, exchange и binding

Обычно очередь создаётся вручную в RabbitMQ или через Infrastructure as Code. Но Spring AMQP умеет создавать их при старте приложения — это удобно для разработки и простых сетапов:

@Configuration
public class RabbitTopology {

    @Bean
    Queue ordersFulfillment() {
        return QueueBuilder
            .durable("orders.fulfillment")
            .quorum()
            .withArgument("x-dead-letter-exchange", "orders.dlx")
            .build();
    }

    @Bean
    DirectExchange ordersExchange() {
        return new DirectExchange("orders", true, false);
    }

    @Bean
    Binding binding(Queue ordersFulfillment, DirectExchange ordersExchange) {
        return BindingBuilder
            .bind(ordersFulfillment)
            .to(ordersExchange)
            .with("order.created");
    }
}

Spring видит эти бины и при старте проверит, существуют ли они в брокере. Если нет — создаст.

Что происходит при ошибке

По умолчанию, если ваш метод-listener бросает исключение, Spring возвращает сообщение обратно в очередь — и оно тут же приходит снова. Это создаёт бесконечный цикл: сообщение обрабатывается, падает, возвращается, снова обрабатывается.

Чтобы этого избежать, настраивают два механизма вместе: retry (повторные попытки) и Dead Letter Exchange (место, куда уходят сообщения после всех неудачных попыток).

Повторные попытки

Если ошибка временная (база данных на секунду недоступна, внешний сервис ответил 503), имеет смысл попробовать несколько раз с паузой между попытками:

@Bean
RetryOperationsInterceptor retryInterceptor() {
    return RetryInterceptorBuilder.stateless()
        .maxAttempts(4)
        .backOffOptions(1000L, 2.0, 30000L)  // начать с 1с, удваивать, максимум 30с
        .recoverer(new RejectAndDontRequeueRecoverer())
        .build();
}

@Bean
SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
        ConnectionFactory cf, MessageConverter converter) {
    var factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(cf);
    factory.setMessageConverter(converter);
    factory.setAdviceChain(retryInterceptor());
    factory.setDefaultRequeueRejected(false);
    return factory;
}

Что здесь происходит:

  • Spring сделает 4 попытки с экспоненциальной паузой (1с, 2с, 4с).
  • После 4-й неудачной попытки RejectAndDontRequeueRecoverer скажет брокеру «не возвращай это сообщение» — и оно уйдёт в Dead Letter Exchange.
  • setDefaultRequeueRejected(false) — на случай исключений, которые retry не перехватил.

Dead Letter Exchange

Dead Letter Exchange (DLX) — это специальный exchange в RabbitMQ, куда попадают «мёртвые» сообщения: те, которые не удалось обработать. Оттуда их можно проанализировать, отправить уведомление или попробовать обработать по-другому.

Настройка DLX на уровне очереди:

@Bean
Queue ordersMain() {
    return QueueBuilder
        .durable("orders.fulfillment")
        .quorum()
        .withArgument("x-dead-letter-exchange", "orders.dlx")
        .withArgument("x-dead-letter-routing-key", "order.failed")
        .build();
}

@Bean DirectExchange ordersDlx() {
    return new DirectExchange("orders.dlx", true, false);
}

@Bean
Queue ordersDlq() {
    return QueueBuilder.durable("orders.dlq").quorum().build();
}

@Bean
Binding dlqBinding(Queue ordersDlq, DirectExchange ordersDlx) {
    return BindingBuilder.bind(ordersDlq).to(ordersDlx).with("order.failed");
}

Поток при ошибке: основная очередь → исключение → reject без возврата → DLX → очередь orders.dlq.

В очереди orders.dlq обычно сидит отдельный listener, который логирует проблему, сохраняет сообщение в базу для ручного разбора или отправляет алерт:

@RabbitListener(queues = "orders.dlq")
public void inspect(
        OrderCreatedEvent event,
        @Header("x-death") List<Map<String, Object>> deaths) {
    log.error("Не удалось обработать orderId={}, попыток={}",
        event.orderId(), deaths.size());
}

Заголовок x-death добавляет брокер — там история попыток: сколько раз сообщение возвращалось и по какой причине.

Режимы подтверждения

RabbitMQ работает на принципе подтверждений: брокер держит сообщение до тех пор, пока consumer не скажет «получил» (ack) или «не смог» (nack). Это защита от потери сообщений.

Spring AMQP поддерживает три режима:

  • AUTO (по умолчанию) — Spring сам отправляет ack после успешного возврата из метода; при исключении — nack. Самый простой режим.
  • MANUAL — вы сами управляете ack/nack через объект Channel. Даёт полный контроль, но усложняет код.
  • NONE — брокер считает сообщение доставленным сразу, без ожидания подтверждения. Для критичных данных не подходит.

Стандартный рецепт: AUTO + setDefaultRequeueRejected(false) + DLX. Этого достаточно для большинства случаев.

Заголовки в сообщениях

Технические метаданные (идентификатор запроса, версия события) удобно передавать через заголовки сообщения, а не смешивать с бизнес-данными в теле:

// При отправке
public void publish(OrderEvent event, String correlationId) {
    var message = MessageBuilder
        .withBody(jsonOf(event))
        .setContentType("application/json")
        .setHeader("X-Correlation-ID", correlationId)
        .setHeader("X-Event-Version", "2")
        .build();
    rabbit.send("orders", "order.created", message);
}

// При получении
@RabbitListener(queues = "orders.fulfillment")
public void handle(
        @Payload OrderEvent event,
        @Header("X-Correlation-ID") String correlationId) {
    MDC.put("correlationId", correlationId);
    // ...
}

Целостный пример

Вот как выглядит минимальный рабочий сетап: топология, listener и обработчик мёртвых сообщений вместе:

@Configuration
public class OrdersTopology {

    @Bean DirectExchange ordersEx()   { return new DirectExchange("orders", true, false); }
    @Bean DirectExchange ordersDlx()  { return new DirectExchange("orders.dlx", true, false); }

    @Bean
    Queue ordersFulfillment() {
        return QueueBuilder.durable("orders.fulfillment").quorum()
            .withArgument("x-dead-letter-exchange", "orders.dlx")
            .withArgument("x-dead-letter-routing-key", "order.failed")
            .build();
    }

    @Bean Queue ordersDlq() { return QueueBuilder.durable("orders.dlq").quorum().build(); }

    @Bean
    Binding bindFulfillment(Queue ordersFulfillment, DirectExchange ordersEx) {
        return BindingBuilder.bind(ordersFulfillment).to(ordersEx).with("order.created");
    }

    @Bean
    Binding bindDlq(Queue ordersDlq, DirectExchange ordersDlx) {
        return BindingBuilder.bind(ordersDlq).to(ordersDlx).with("order.failed");
    }
}

@Component
@RequiredArgsConstructor
class OrderListener {

    private final OrderHandler handler;

    @RabbitListener(queues = "orders.fulfillment", concurrency = "3-10")
    public void on(OrderCreatedEvent event) {
        handler.process(event);
    }
}

@Component
class DlqInspector {
    @RabbitListener(queues = "orders.dlq")
    public void inspect(
            OrderCreatedEvent event,
            @Header("x-death") List<Map<String, Object>> deaths) {
        log.error("Не удалось обработать: orderId={}", event.orderId());
    }
}

Коротко

  • Spring AMQP (spring-boot-starter-amqp) оборачивает RabbitMQ-клиент и убирает низкоуровневый код.
  • RabbitTemplate — для отправки; основной метод convertAndSend(exchange, routingKey, payload).
  • По умолчанию сериализация — Java, для межсервисного взаимодействия нужен Jackson2JsonMessageConverter.
  • @RabbitListener — для получения; параметр concurrency задаёт размер пула потоков.
  • При ошибке без настройки возникает бесконечный цикл — нужны setDefaultRequeueRejected(false) и DLX.
  • RetryTemplate делает несколько попыток с паузой; после исчерпания — сообщение уходит в Dead Letter Exchange.
  • DLX/DLQ настраивается через QueueBuilder.withArgument("x-dead-letter-exchange", ...).
  • Стандартный режим подтверждений: AUTO + requeue=false + DLX.

Что почитать дальше

  • Протокол AMQP — модель доставки: exchange, queue, binding, ack.
  • RabbitMQ в production — Quorum Queues, кластеризация, мониторинг.
  • Messaging-паттерны через AMQP — work queue, pub/sub, RPC, идемпотентный consumer.