RabbitMQ — популярный брокер сообщений. Работать с ним из Java можно напрямую через его Java-клиент, но это много низкоуровневого кода: управление соединениями, потоками, ack/nack вручную. Spring AMQP (библиотека spring-rabbit) берёт этот код на себя и даёт удобный Spring-способ отправлять и получать сообщения.
Подключение
Одна зависимость в Gradle добавляет всё нужное:
// build.gradle.kts
dependencies {
implementation("org.springframework.boot:spring-boot-starter-amqp")
}
Базовая конфигурация в application.yml:
spring:
rabbitmq:
host: rabbit
port: 5672
username: billing-service
password: ${RABBIT_PASSWORD}
virtual-host: /prod
Spring Boot автоматически создаст соединение и настроит все нужные бины.
Как отправить сообщение: RabbitTemplate
Раньше, чтобы опубликовать сообщение, нужно было вручную открывать канал, сериализовать объект, вызывать методы протокола AMQP. Spring AMQP делает это одним вызовом.
Главный инструмент для отправки — RabbitTemplate. Его создаёт Spring Boot автоматически, достаточно внедрить как зависимость:
@Component
@RequiredArgsConstructor
public class OrderEventPublisher {
private final RabbitTemplate rabbit;
public void publish(OrderCreatedEvent event) {
rabbit.convertAndSend("orders", "order.created", event);
}
}
convertAndSend(exchange, routingKey, payload) — основной метод. Три параметра:
"orders"— имя exchange (куда шлём);"order.created"— routing key (по нему exchange решает, в какую очередь направить);event— объект, который станет телом сообщения.
Зачем менять сериализацию
По умолчанию Spring AMQP сериализует объект через Java-сериализацию. Это работает, но у такого подхода есть проблема: другой сервис (или другой язык) не сможет прочитать сообщение. Формат Java-сериализации понимает только Java.
Решение — переключиться на JSON. Для этого нужно объявить конвертер и указать его шаблону:
@Configuration
public class RabbitConfig {
@Bean
MessageConverter jacksonConverter(ObjectMapper mapper) {
return new Jackson2JsonMessageConverter(mapper);
}
@Bean
RabbitTemplate rabbitTemplate(ConnectionFactory cf, MessageConverter converter) {
var t = new RabbitTemplate(cf);
t.setMessageConverter(converter);
return t;
}
}
После этого payload уходит в виде JSON. Spring автоматически добавит заголовок content-type: application/json — получатель поймёт, как десериализовать.
Как получить сообщение: @RabbitListener
Подписаться на очередь проще всего через аннотацию @RabbitListener:
@Component
public class OrderEventListener {
@RabbitListener(queues = "orders.fulfillment", concurrency = "3-10")
public void handle(OrderCreatedEvent event) {
// обрабатываем событие
}
}
Spring запустит пул потоков, которые будут читать из очереди и вызывать метод. Параметр concurrency = "3-10" задаёт диапазон: минимум 3 потока постоянно, и до 10 при увеличении нагрузки.
Объект OrderCreatedEvent Spring десериализует автоматически из JSON, если настроен MessageConverter.
Как объявить очередь, exchange и binding
Обычно очередь создаётся вручную в RabbitMQ или через Infrastructure as Code. Но Spring AMQP умеет создавать их при старте приложения — это удобно для разработки и простых сетапов:
@Configuration
public class RabbitTopology {
@Bean
Queue ordersFulfillment() {
return QueueBuilder
.durable("orders.fulfillment")
.quorum()
.withArgument("x-dead-letter-exchange", "orders.dlx")
.build();
}
@Bean
DirectExchange ordersExchange() {
return new DirectExchange("orders", true, false);
}
@Bean
Binding binding(Queue ordersFulfillment, DirectExchange ordersExchange) {
return BindingBuilder
.bind(ordersFulfillment)
.to(ordersExchange)
.with("order.created");
}
}
Spring видит эти бины и при старте проверит, существуют ли они в брокере. Если нет — создаст.
Что происходит при ошибке
По умолчанию, если ваш метод-listener бросает исключение, Spring возвращает сообщение обратно в очередь — и оно тут же приходит снова. Это создаёт бесконечный цикл: сообщение обрабатывается, падает, возвращается, снова обрабатывается.
Чтобы этого избежать, настраивают два механизма вместе: retry (повторные попытки) и Dead Letter Exchange (место, куда уходят сообщения после всех неудачных попыток).
Повторные попытки
Если ошибка временная (база данных на секунду недоступна, внешний сервис ответил 503), имеет смысл попробовать несколько раз с паузой между попытками:
@Bean
RetryOperationsInterceptor retryInterceptor() {
return RetryInterceptorBuilder.stateless()
.maxAttempts(4)
.backOffOptions(1000L, 2.0, 30000L) // начать с 1с, удваивать, максимум 30с
.recoverer(new RejectAndDontRequeueRecoverer())
.build();
}
@Bean
SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
ConnectionFactory cf, MessageConverter converter) {
var factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(cf);
factory.setMessageConverter(converter);
factory.setAdviceChain(retryInterceptor());
factory.setDefaultRequeueRejected(false);
return factory;
}
Что здесь происходит:
- Spring сделает 4 попытки с экспоненциальной паузой (1с, 2с, 4с).
- После 4-й неудачной попытки
RejectAndDontRequeueRecovererскажет брокеру «не возвращай это сообщение» — и оно уйдёт в Dead Letter Exchange. setDefaultRequeueRejected(false)— на случай исключений, которые retry не перехватил.
Dead Letter Exchange
Dead Letter Exchange (DLX) — это специальный exchange в RabbitMQ, куда попадают «мёртвые» сообщения: те, которые не удалось обработать. Оттуда их можно проанализировать, отправить уведомление или попробовать обработать по-другому.
Настройка DLX на уровне очереди:
@Bean
Queue ordersMain() {
return QueueBuilder
.durable("orders.fulfillment")
.quorum()
.withArgument("x-dead-letter-exchange", "orders.dlx")
.withArgument("x-dead-letter-routing-key", "order.failed")
.build();
}
@Bean DirectExchange ordersDlx() {
return new DirectExchange("orders.dlx", true, false);
}
@Bean
Queue ordersDlq() {
return QueueBuilder.durable("orders.dlq").quorum().build();
}
@Bean
Binding dlqBinding(Queue ordersDlq, DirectExchange ordersDlx) {
return BindingBuilder.bind(ordersDlq).to(ordersDlx).with("order.failed");
}
Поток при ошибке: основная очередь → исключение → reject без возврата → DLX → очередь orders.dlq.
В очереди orders.dlq обычно сидит отдельный listener, который логирует проблему, сохраняет сообщение в базу для ручного разбора или отправляет алерт:
@RabbitListener(queues = "orders.dlq")
public void inspect(
OrderCreatedEvent event,
@Header("x-death") List<Map<String, Object>> deaths) {
log.error("Не удалось обработать orderId={}, попыток={}",
event.orderId(), deaths.size());
}
Заголовок x-death добавляет брокер — там история попыток: сколько раз сообщение возвращалось и по какой причине.
Режимы подтверждения
RabbitMQ работает на принципе подтверждений: брокер держит сообщение до тех пор, пока consumer не скажет «получил» (ack) или «не смог» (nack). Это защита от потери сообщений.
Spring AMQP поддерживает три режима:
AUTO(по умолчанию) — Spring сам отправляет ack после успешного возврата из метода; при исключении — nack. Самый простой режим.MANUAL— вы сами управляете ack/nack через объектChannel. Даёт полный контроль, но усложняет код.NONE— брокер считает сообщение доставленным сразу, без ожидания подтверждения. Для критичных данных не подходит.
Стандартный рецепт: AUTO + setDefaultRequeueRejected(false) + DLX. Этого достаточно для большинства случаев.
Заголовки в сообщениях
Технические метаданные (идентификатор запроса, версия события) удобно передавать через заголовки сообщения, а не смешивать с бизнес-данными в теле:
// При отправке
public void publish(OrderEvent event, String correlationId) {
var message = MessageBuilder
.withBody(jsonOf(event))
.setContentType("application/json")
.setHeader("X-Correlation-ID", correlationId)
.setHeader("X-Event-Version", "2")
.build();
rabbit.send("orders", "order.created", message);
}
// При получении
@RabbitListener(queues = "orders.fulfillment")
public void handle(
@Payload OrderEvent event,
@Header("X-Correlation-ID") String correlationId) {
MDC.put("correlationId", correlationId);
// ...
}
Целостный пример
Вот как выглядит минимальный рабочий сетап: топология, listener и обработчик мёртвых сообщений вместе:
@Configuration
public class OrdersTopology {
@Bean DirectExchange ordersEx() { return new DirectExchange("orders", true, false); }
@Bean DirectExchange ordersDlx() { return new DirectExchange("orders.dlx", true, false); }
@Bean
Queue ordersFulfillment() {
return QueueBuilder.durable("orders.fulfillment").quorum()
.withArgument("x-dead-letter-exchange", "orders.dlx")
.withArgument("x-dead-letter-routing-key", "order.failed")
.build();
}
@Bean Queue ordersDlq() { return QueueBuilder.durable("orders.dlq").quorum().build(); }
@Bean
Binding bindFulfillment(Queue ordersFulfillment, DirectExchange ordersEx) {
return BindingBuilder.bind(ordersFulfillment).to(ordersEx).with("order.created");
}
@Bean
Binding bindDlq(Queue ordersDlq, DirectExchange ordersDlx) {
return BindingBuilder.bind(ordersDlq).to(ordersDlx).with("order.failed");
}
}
@Component
@RequiredArgsConstructor
class OrderListener {
private final OrderHandler handler;
@RabbitListener(queues = "orders.fulfillment", concurrency = "3-10")
public void on(OrderCreatedEvent event) {
handler.process(event);
}
}
@Component
class DlqInspector {
@RabbitListener(queues = "orders.dlq")
public void inspect(
OrderCreatedEvent event,
@Header("x-death") List<Map<String, Object>> deaths) {
log.error("Не удалось обработать: orderId={}", event.orderId());
}
}
Коротко
- Spring AMQP (
spring-boot-starter-amqp) оборачивает RabbitMQ-клиент и убирает низкоуровневый код. - RabbitTemplate — для отправки; основной метод
convertAndSend(exchange, routingKey, payload). - По умолчанию сериализация — Java, для межсервисного взаимодействия нужен
Jackson2JsonMessageConverter. @RabbitListener— для получения; параметрconcurrencyзадаёт размер пула потоков.- При ошибке без настройки возникает бесконечный цикл — нужны
setDefaultRequeueRejected(false)и DLX. - RetryTemplate делает несколько попыток с паузой; после исчерпания — сообщение уходит в Dead Letter Exchange.
- DLX/DLQ настраивается через
QueueBuilder.withArgument("x-dead-letter-exchange", ...). - Стандартный режим подтверждений:
AUTO+requeue=false+ DLX.
Что почитать дальше
- Протокол AMQP — модель доставки: exchange, queue, binding, ack.
- RabbitMQ в production — Quorum Queues, кластеризация, мониторинг.
- Messaging-паттерны через AMQP — work queue, pub/sub, RPC, идемпотентный consumer.