← назад к разделу

spring-rabbit (часто называют «Spring AMQP») оборачивает Java-клиент RabbitMQ и даёт декларативную модель, аналогичную spring-kafka. Это всё ещё AMQP — то, что описано в статье про протокол, но через идиоматичный Spring-код.

Подключение и базовая конфигурация

// build.gradle.kts
dependencies {
    implementation("org.springframework.boot:spring-boot-starter-amqp")
}
# application.yml
spring.rabbitmq.host=rabbit
spring.rabbitmq.port=5672
spring.rabbitmq.username=billing-service
spring.rabbitmq.password=${RABBIT_PASSWORD}
spring.rabbitmq.virtual-host=/prod
spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true

publisher-confirm-type=correlated включает publisher confirms — обязательно для критичных сообщений.

RabbitTemplate — публикация

@Component
@RequiredArgsConstructor
public class OrderEventPublisher {
    private final RabbitTemplate rabbit;

    public void publish(OrderCreatedEvent event) {
        rabbit.convertAndSend("orders", "order.created", event);
    }
}

convertAndSend(exchange, routingKey, payload) — основной метод. По умолчанию сериализация — Java-сериализация (плохо: межсервисная совместимость, размер). Заменяем на Jackson:

@Bean
MessageConverter jacksonConverter(ObjectMapper mapper) {
    return new Jackson2JsonMessageConverter(mapper);
}

@Bean
RabbitTemplate rabbitTemplate(ConnectionFactory cf, MessageConverter converter) {
    var t = new RabbitTemplate(cf);
    t.setMessageConverter(converter);
    return t;
}

После этого payload сериализуется в JSON, у сообщения автоматически проставляются content-type: application/json и __TypeId__ (для десериализации на стороне consumer'а в правильный тип).

Publisher confirms

С publisher-confirm-type=correlated каждое сообщение можно отправить с CorrelationData:

public void publishCritical(OrderEvent event) {
    var correlationData = new CorrelationData(event.orderId().toString());
    rabbit.convertAndSend("orders", "order.created", event, correlationData);

    correlationData.getFuture().thenAccept(confirm -> {
        if (!confirm.isAck()) {
            log.error("publish failed for orderId={}, reason={}",
                event.orderId(), confirm.getReason());
            // retry или fallback
        }
    });
}

confirm.isAck() == false означает: брокер отказал в публикации (например, не нашёл queue по routing key с mandatory=true, или превысил memory watermark).

@RabbitListener — потребление

@Component
public class OrderEventListener {

    @RabbitListener(queues = "orders.fulfillment", concurrency = "3-10")
    public void handle(OrderCreatedEvent event) {
        // ...
    }
}

concurrency = "3-10" — динамический пул: минимум 3 потока, до 10 при росте нагрузки.

Декларативное объявление queue/exchange/binding (Spring создаёт при старте, если не существует):

@Configuration
public class RabbitTopology {

    @Bean
    Queue ordersFulfillment() {
        return QueueBuilder
            .durable("orders.fulfillment")
            .quorum()  // Quorum Queue
            .withArgument("x-dead-letter-exchange", "orders.dlx")
            .build();
    }

    @Bean
    DirectExchange ordersExchange() {
        return new DirectExchange("orders", true, false);
    }

    @Bean
    Binding binding(Queue ordersFulfillment, DirectExchange ordersExchange) {
        return BindingBuilder
            .bind(ordersFulfillment)
            .to(ordersExchange)
            .with("order.created");
    }
}

Контейнер listener'ов

Под @RabbitListener живёт SimpleRabbitListenerContainerFactory или DirectMessageListenerContainer. По умолчанию — Simple, держит пул потоков.

@Bean
SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
        ConnectionFactory cf, MessageConverter converter) {
    var factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(cf);
    factory.setMessageConverter(converter);
    factory.setPrefetchCount(20);                            // basic.qos
    factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);       // или AUTO (=ack после успешного return)
    factory.setDefaultRequeueRejected(false);                 // не возвращать при exception, отправлять в DLX
    factory.setMissingQueuesFatal(false);
    return factory;
}

Acknowledge modes:

  • AUTO (default) — Spring сам шлёт ack после успешного return из метода листенера; при exception — nack с requeue (по умолчанию true — поэтому опасно: бесконечный цикл).
  • MANUAL — нужно явно объявить параметр Channel + @Header(AmqpHeaders.DELIVERY_TAG) и вызвать channel.basicAck(tag, false). Полностью контролируете ack.
  • NONE — auto-ack на уровне брокера, до прихода сообщения консьюмеру. Для критичных данных нельзя.

Стандарт: AUTO + setDefaultRequeueRejected(false) + Dead Letter Exchange (см. ниже).

Обработка ошибок и retry

Без настройки, если listener бросает exception:

  • AcknowledgeMode.AUTO + defaultRequeueRejected=true → бесконечный requeue, очередь зависает.
  • AcknowledgeMode.AUTO + defaultRequeueRejected=false → сразу nack без возврата, сообщение уходит в DLX (если настроен) или удаляется.

RetryTemplate — retry на стороне consumer

Для transient ошибок (БД temp недоступна, внешний сервис 503) — retry перед отказом:

@Bean
SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(...) {
    var factory = new SimpleRabbitListenerContainerFactory();
    // ... основная конфигурация
    factory.setAdviceChain(retryInterceptor());
    return factory;
}

@Bean
RetryOperationsInterceptor retryInterceptor() {
    return RetryInterceptorBuilder.stateless()
        .maxAttempts(4)
        .backOffOptions(1000L, 2.0, 30000L)  // initial 1s, multiplier 2, max 30s
        .recoverer(new RejectAndDontRequeueRecoverer())  // после 4 попыток — в DLX
        .build();
}

RejectAndDontRequeueRecoverer — встроенный recoverer, который после исчерпания попыток говорит брокеру «не возвращай». Сообщение уходит в DLX или удаляется.

Dead Letter Exchange — декларативно

DLX настраивается на уровне queue через arguments. В Spring AMQP — через QueueBuilder:

@Bean
Queue ordersMain() {
    return QueueBuilder
        .durable("orders.fulfillment")
        .quorum()
        .withArgument("x-dead-letter-exchange", "orders.dlx")
        .withArgument("x-dead-letter-routing-key", "order.failed")
        .build();
}

@Bean
DirectExchange ordersDlx() {
    return new DirectExchange("orders.dlx", true, false);
}

@Bean
Queue ordersDlqStorage() {
    return QueueBuilder
        .durable("orders.dlq")
        .quorum()
        .build();
}

@Bean
Binding dlqBinding(Queue ordersDlqStorage, DirectExchange ordersDlx) {
    return BindingBuilder.bind(ordersDlqStorage).to(ordersDlx).with("order.failed");
}

Поток: основная queue получила exception → reject(requeue=false) → сообщение направлено в DLX по правилам arguments → доставлено в DLQ-storage queue.

В DLQ-storage сидит listener-разборщик: alerting, сохранение в БД для ручной обработки, попытка переотправить с другими параметрами.

Не путать с Spring Kafka @RetryableTopic

В Spring Kafka есть @RetryableTopic с прозрачными retry-топиками. В Spring AMQP прямого аналога нет — но можно собрать руками через несколько queue с x-message-ttl + DLX. Это паттерн delayed retry queue: сообщение публикуется в retry-queue с TTL 30 секунд, через 30 сек уходит в DLX обратно на основной consumer.

Заголовки и трейсинг

Как и в Kafka, технические метаданные — в headers, бизнес-факт — в payload:

// Publish с трейсингом
public void publish(OrderEvent event, String correlationId) {
    var message = MessageBuilder
        .withBody(jsonOf(event))
        .setContentType("application/json")
        .setHeader("X-Correlation-ID", correlationId)
        .setHeader("X-Event-Version", "2")
        .build();
    rabbit.send("orders", "order.created", message);
}

// Consume
@RabbitListener(queues = "orders.fulfillment")
public void handle(
        @Payload OrderEvent event,
        @Header("X-Correlation-ID") String correlationId,
        @Header(value = "X-Event-Version", required = false) String version) {
    MDC.put("correlationId", correlationId);
    // ...
}

Под капотом Spring AMQP кладёт __TypeId__ header — имя класса для десериализации. Если у консьюмера тот же тип лежит в другом package — нужен DefaultJackson2JavaTypeMapper.setIdClassMapping().

Transactional listener

@Transactional поверх @RabbitListener оборачивает обработку в локальную транзакцию AMQP (через RabbitTransactionManager). На уровне AMQP это tx.select / tx.commit — медленнее publisher confirms, поэтому используется редко.

Обычно нужен другой паттерн: транзакция БД + ack после успеха БД. С AcknowledgeMode.MANUAL:

@RabbitListener(queues = "orders.fulfillment")
public void handle(
        OrderEvent event,
        Channel channel,
        @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException {
    try {
        orderRepository.process(event);   // транзакция БД внутри
        channel.basicAck(deliveryTag, false);
    } catch (Exception e) {
        channel.basicNack(deliveryTag, false, false);  // в DLX
        throw e;
    }
}

Альтернативно — chained transaction manager или Outbox pattern: пишем событие в БД-таблицу outbox в той же транзакции что и бизнес-данные, отдельный процесс читает её и публикует в RabbitMQ. См. распределённые паттерны.

Mandatory + publisher returns

Если хотим узнавать, что сообщение не попало ни в одну queue:

spring.rabbitmq.template.mandatory=true
spring.rabbitmq.publisher-returns=true
@Bean
RabbitTemplate rabbitTemplate(ConnectionFactory cf, MessageConverter converter) {
    var t = new RabbitTemplate(cf);
    t.setMessageConverter(converter);
    t.setMandatory(true);
    t.setReturnsCallback(returned -> {
        log.error("Returned: routingKey={}, replyText={}",
            returned.getRoutingKey(), returned.getReplyText());
    });
    return t;
}

mandatory=true + callback ловит misconfiguration (продьюсер шлёт в exchange, к которому не привязана ожидаемая queue) сразу, а не через тихую потерю сообщений.

Sample: целостный сетап для бизнес-сообщения

@Configuration
public class OrdersTopology {

    private static final String EX = "orders";
    private static final String EX_DLX = "orders.dlx";
    private static final String Q_FULFILLMENT = "orders.fulfillment";
    private static final String Q_DLQ = "orders.dlq";

    @Bean DirectExchange ordersEx()   { return new DirectExchange(EX, true, false); }
    @Bean DirectExchange ordersDlx()  { return new DirectExchange(EX_DLX, true, false); }

    @Bean
    Queue ordersFulfillment() {
        return QueueBuilder.durable(Q_FULFILLMENT).quorum()
            .withArgument("x-dead-letter-exchange", EX_DLX)
            .withArgument("x-dead-letter-routing-key", "order.failed")
            .build();
    }

    @Bean Queue ordersDlq() { return QueueBuilder.durable(Q_DLQ).quorum().build(); }

    @Bean
    Binding bindFulfillment(Queue ordersFulfillment, DirectExchange ordersEx) {
        return BindingBuilder.bind(ordersFulfillment).to(ordersEx).with("order.created");
    }

    @Bean
    Binding bindDlq(Queue ordersDlq, DirectExchange ordersDlx) {
        return BindingBuilder.bind(ordersDlq).to(ordersDlx).with("order.failed");
    }
}

@Component
@RequiredArgsConstructor
class OrderListener {

    private final OrderHandler handler;

    @RabbitListener(queues = "orders.fulfillment", concurrency = "3-10")
    public void on(OrderCreatedEvent event) {
        handler.process(event);
    }
}

@Component
@RequiredArgsConstructor
class DlqInspector {
    @RabbitListener(queues = "orders.dlq")
    public void inspect(OrderCreatedEvent event,
                        @Header("x-death") List<Map<String, Object>> deaths) {
        log.error("Dead letter: orderId={} deaths={}", event.orderId(), deaths);
        // alert, save для ручной обработки
    }
}

x-death header — массив с историей попыток (брокер дописывает при каждом dead-letter routing). Полезно для диагностики «сколько раз и почему».

Что почитать дальше

  • Протокол AMQP — модель доставки, без Spring-кода.
  • RabbitMQ в production — операционные аспекты.
  • Messaging-паттерны через AMQP — work queue, pub/sub, RPC, idempotent consumer.
  • Apache Kafka в production — параллельная статья про Spring Kafka.
  • Spring AMQP Reference — официальная документация.