spring-rabbit (часто называют «Spring AMQP») оборачивает Java-клиент RabbitMQ и даёт декларативную модель, аналогичную spring-kafka. Это всё ещё AMQP — то, что описано в статье про протокол, но через идиоматичный Spring-код.
Подключение и базовая конфигурация
// build.gradle.kts
dependencies {
implementation("org.springframework.boot:spring-boot-starter-amqp")
}
# application.yml
spring.rabbitmq.host=rabbit
spring.rabbitmq.port=5672
spring.rabbitmq.username=billing-service
spring.rabbitmq.password=${RABBIT_PASSWORD}
spring.rabbitmq.virtual-host=/prod
spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true
publisher-confirm-type=correlated включает publisher confirms — обязательно для критичных сообщений.
RabbitTemplate — публикация
@Component
@RequiredArgsConstructor
public class OrderEventPublisher {
private final RabbitTemplate rabbit;
public void publish(OrderCreatedEvent event) {
rabbit.convertAndSend("orders", "order.created", event);
}
}
convertAndSend(exchange, routingKey, payload) — основной метод. По умолчанию сериализация — Java-сериализация (плохо: межсервисная совместимость, размер). Заменяем на Jackson:
@Bean
MessageConverter jacksonConverter(ObjectMapper mapper) {
return new Jackson2JsonMessageConverter(mapper);
}
@Bean
RabbitTemplate rabbitTemplate(ConnectionFactory cf, MessageConverter converter) {
var t = new RabbitTemplate(cf);
t.setMessageConverter(converter);
return t;
}
После этого payload сериализуется в JSON, у сообщения автоматически проставляются content-type: application/json и __TypeId__ (для десериализации на стороне consumer'а в правильный тип).
Publisher confirms
С publisher-confirm-type=correlated каждое сообщение можно отправить с CorrelationData:
public void publishCritical(OrderEvent event) {
var correlationData = new CorrelationData(event.orderId().toString());
rabbit.convertAndSend("orders", "order.created", event, correlationData);
correlationData.getFuture().thenAccept(confirm -> {
if (!confirm.isAck()) {
log.error("publish failed for orderId={}, reason={}",
event.orderId(), confirm.getReason());
// retry или fallback
}
});
}
confirm.isAck() == false означает: брокер отказал в публикации (например, не нашёл queue по routing key с mandatory=true, или превысил memory watermark).
@RabbitListener — потребление
@Component
public class OrderEventListener {
@RabbitListener(queues = "orders.fulfillment", concurrency = "3-10")
public void handle(OrderCreatedEvent event) {
// ...
}
}
concurrency = "3-10" — динамический пул: минимум 3 потока, до 10 при росте нагрузки.
Декларативное объявление queue/exchange/binding (Spring создаёт при старте, если не существует):
@Configuration
public class RabbitTopology {
@Bean
Queue ordersFulfillment() {
return QueueBuilder
.durable("orders.fulfillment")
.quorum() // Quorum Queue
.withArgument("x-dead-letter-exchange", "orders.dlx")
.build();
}
@Bean
DirectExchange ordersExchange() {
return new DirectExchange("orders", true, false);
}
@Bean
Binding binding(Queue ordersFulfillment, DirectExchange ordersExchange) {
return BindingBuilder
.bind(ordersFulfillment)
.to(ordersExchange)
.with("order.created");
}
}
Контейнер listener'ов
Под @RabbitListener живёт SimpleRabbitListenerContainerFactory или DirectMessageListenerContainer. По умолчанию — Simple, держит пул потоков.
@Bean
SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
ConnectionFactory cf, MessageConverter converter) {
var factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(cf);
factory.setMessageConverter(converter);
factory.setPrefetchCount(20); // basic.qos
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL); // или AUTO (=ack после успешного return)
factory.setDefaultRequeueRejected(false); // не возвращать при exception, отправлять в DLX
factory.setMissingQueuesFatal(false);
return factory;
}
Acknowledge modes:
AUTO(default) — Spring сам шлёт ack после успешного return из метода листенера; при exception —nackсrequeue(по умолчаниюtrue— поэтому опасно: бесконечный цикл).MANUAL— нужно явно объявить параметрChannel+@Header(AmqpHeaders.DELIVERY_TAG)и вызватьchannel.basicAck(tag, false). Полностью контролируете ack.NONE— auto-ack на уровне брокера, до прихода сообщения консьюмеру. Для критичных данных нельзя.
Стандарт: AUTO + setDefaultRequeueRejected(false) + Dead Letter Exchange (см. ниже).
Обработка ошибок и retry
Без настройки, если listener бросает exception:
AcknowledgeMode.AUTO+defaultRequeueRejected=true→ бесконечный requeue, очередь зависает.AcknowledgeMode.AUTO+defaultRequeueRejected=false→ сразуnackбез возврата, сообщение уходит в DLX (если настроен) или удаляется.
RetryTemplate — retry на стороне consumer
Для transient ошибок (БД temp недоступна, внешний сервис 503) — retry перед отказом:
@Bean
SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(...) {
var factory = new SimpleRabbitListenerContainerFactory();
// ... основная конфигурация
factory.setAdviceChain(retryInterceptor());
return factory;
}
@Bean
RetryOperationsInterceptor retryInterceptor() {
return RetryInterceptorBuilder.stateless()
.maxAttempts(4)
.backOffOptions(1000L, 2.0, 30000L) // initial 1s, multiplier 2, max 30s
.recoverer(new RejectAndDontRequeueRecoverer()) // после 4 попыток — в DLX
.build();
}
RejectAndDontRequeueRecoverer — встроенный recoverer, который после исчерпания попыток говорит брокеру «не возвращай». Сообщение уходит в DLX или удаляется.
Dead Letter Exchange — декларативно
DLX настраивается на уровне queue через arguments. В Spring AMQP — через QueueBuilder:
@Bean
Queue ordersMain() {
return QueueBuilder
.durable("orders.fulfillment")
.quorum()
.withArgument("x-dead-letter-exchange", "orders.dlx")
.withArgument("x-dead-letter-routing-key", "order.failed")
.build();
}
@Bean
DirectExchange ordersDlx() {
return new DirectExchange("orders.dlx", true, false);
}
@Bean
Queue ordersDlqStorage() {
return QueueBuilder
.durable("orders.dlq")
.quorum()
.build();
}
@Bean
Binding dlqBinding(Queue ordersDlqStorage, DirectExchange ordersDlx) {
return BindingBuilder.bind(ordersDlqStorage).to(ordersDlx).with("order.failed");
}
Поток: основная queue получила exception → reject(requeue=false) → сообщение направлено в DLX по правилам arguments → доставлено в DLQ-storage queue.
В DLQ-storage сидит listener-разборщик: alerting, сохранение в БД для ручной обработки, попытка переотправить с другими параметрами.
Не путать с Spring Kafka @RetryableTopic
В Spring Kafka есть @RetryableTopic с прозрачными retry-топиками. В Spring AMQP прямого аналога нет — но можно собрать руками через несколько queue с x-message-ttl + DLX. Это паттерн delayed retry queue: сообщение публикуется в retry-queue с TTL 30 секунд, через 30 сек уходит в DLX обратно на основной consumer.
Заголовки и трейсинг
Как и в Kafka, технические метаданные — в headers, бизнес-факт — в payload:
// Publish с трейсингом
public void publish(OrderEvent event, String correlationId) {
var message = MessageBuilder
.withBody(jsonOf(event))
.setContentType("application/json")
.setHeader("X-Correlation-ID", correlationId)
.setHeader("X-Event-Version", "2")
.build();
rabbit.send("orders", "order.created", message);
}
// Consume
@RabbitListener(queues = "orders.fulfillment")
public void handle(
@Payload OrderEvent event,
@Header("X-Correlation-ID") String correlationId,
@Header(value = "X-Event-Version", required = false) String version) {
MDC.put("correlationId", correlationId);
// ...
}
Под капотом Spring AMQP кладёт __TypeId__ header — имя класса для десериализации. Если у консьюмера тот же тип лежит в другом package — нужен DefaultJackson2JavaTypeMapper.setIdClassMapping().
Transactional listener
@Transactional поверх @RabbitListener оборачивает обработку в локальную транзакцию AMQP (через RabbitTransactionManager). На уровне AMQP это tx.select / tx.commit — медленнее publisher confirms, поэтому используется редко.
Обычно нужен другой паттерн: транзакция БД + ack после успеха БД. С AcknowledgeMode.MANUAL:
@RabbitListener(queues = "orders.fulfillment")
public void handle(
OrderEvent event,
Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
try {
orderRepository.process(event); // транзакция БД внутри
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
channel.basicNack(deliveryTag, false, false); // в DLX
throw e;
}
}
Альтернативно — chained transaction manager или Outbox pattern: пишем событие в БД-таблицу outbox в той же транзакции что и бизнес-данные, отдельный процесс читает её и публикует в RabbitMQ. См. распределённые паттерны.
Mandatory + publisher returns
Если хотим узнавать, что сообщение не попало ни в одну queue:
spring.rabbitmq.template.mandatory=true
spring.rabbitmq.publisher-returns=true
@Bean
RabbitTemplate rabbitTemplate(ConnectionFactory cf, MessageConverter converter) {
var t = new RabbitTemplate(cf);
t.setMessageConverter(converter);
t.setMandatory(true);
t.setReturnsCallback(returned -> {
log.error("Returned: routingKey={}, replyText={}",
returned.getRoutingKey(), returned.getReplyText());
});
return t;
}
mandatory=true + callback ловит misconfiguration (продьюсер шлёт в exchange, к которому не привязана ожидаемая queue) сразу, а не через тихую потерю сообщений.
Sample: целостный сетап для бизнес-сообщения
@Configuration
public class OrdersTopology {
private static final String EX = "orders";
private static final String EX_DLX = "orders.dlx";
private static final String Q_FULFILLMENT = "orders.fulfillment";
private static final String Q_DLQ = "orders.dlq";
@Bean DirectExchange ordersEx() { return new DirectExchange(EX, true, false); }
@Bean DirectExchange ordersDlx() { return new DirectExchange(EX_DLX, true, false); }
@Bean
Queue ordersFulfillment() {
return QueueBuilder.durable(Q_FULFILLMENT).quorum()
.withArgument("x-dead-letter-exchange", EX_DLX)
.withArgument("x-dead-letter-routing-key", "order.failed")
.build();
}
@Bean Queue ordersDlq() { return QueueBuilder.durable(Q_DLQ).quorum().build(); }
@Bean
Binding bindFulfillment(Queue ordersFulfillment, DirectExchange ordersEx) {
return BindingBuilder.bind(ordersFulfillment).to(ordersEx).with("order.created");
}
@Bean
Binding bindDlq(Queue ordersDlq, DirectExchange ordersDlx) {
return BindingBuilder.bind(ordersDlq).to(ordersDlx).with("order.failed");
}
}
@Component
@RequiredArgsConstructor
class OrderListener {
private final OrderHandler handler;
@RabbitListener(queues = "orders.fulfillment", concurrency = "3-10")
public void on(OrderCreatedEvent event) {
handler.process(event);
}
}
@Component
@RequiredArgsConstructor
class DlqInspector {
@RabbitListener(queues = "orders.dlq")
public void inspect(OrderCreatedEvent event,
@Header("x-death") List<Map<String, Object>> deaths) {
log.error("Dead letter: orderId={} deaths={}", event.orderId(), deaths);
// alert, save для ручной обработки
}
}
x-death header — массив с историей попыток (брокер дописывает при каждом dead-letter routing). Полезно для диагностики «сколько раз и почему».
Что почитать дальше
- Протокол AMQP — модель доставки, без Spring-кода.
- RabbitMQ в production — операционные аспекты.
- Messaging-паттерны через AMQP — work queue, pub/sub, RPC, idempotent consumer.
- Apache Kafka в production — параллельная статья про Spring Kafka.
- Spring AMQP Reference — официальная документация.