← назад к разделу

Большинство задач интеграции сводятся к шести типовым паттернам, описанным в книге Hohpe/Woolf «Enterprise Integration Patterns». AMQP с его моделью exchange/binding/queue ложится на них естественно — каждый паттерн соответствует определённой конфигурации exchange-типа и числа queue.

Эта статья — практический проход по каждому паттерну на примерах кода Spring AMQP и без перетягивания одеяла в сторону какой-то одной задачи.

1. Work Queue — распределение задач

Задача: одна задача, пул обработчиков, каждое сообщение обрабатывается ровно одним из них.

Конфигурация: одна queue, несколько consumer'ов на ней. Брокер сам распределяет round-robin (с учётом prefetch).

@Configuration
class ImageProcessingTopology {
    @Bean
    Queue imagesQueue() {
        return QueueBuilder.durable("images.to-process").quorum().build();
    }
}

@Component
class ImageProcessor {
    @RabbitListener(queues = "images.to-process", concurrency = "5-20")
    public void process(ImageJob job) {
        // тяжёлая обработка
    }
}

Один сервис может крутить 5–20 потоков на одну queue; 5 экземпляров сервиса = 25–100 параллельных обработчиков. Брокер распределяет.

Когда брать: задачи в фоне — обработка изображений, отправка писем, генерация PDF, любые «положили в очередь — кто-то возьмёт».

2. Publish/Subscribe — broadcast

Задача: одно событие, все заинтересованные должны получить копию.

Конфигурация: fanout exchange + по одной queue на каждого подписчика.

@Configuration
class CacheInvalidationTopology {

    @Bean FanoutExchange cacheInvalidation() {
        return new FanoutExchange("cache.invalidation", true, false);
    }

    @Bean Queue serviceACache() {
        return QueueBuilder.nonDurable().exclusive().autoDelete().build();
    }

    @Bean
    Binding bindA(Queue serviceACache, FanoutExchange cacheInvalidation) {
        return BindingBuilder.bind(serviceACache).to(cacheInvalidation);
    }
}

@Component
class ServiceACacheListener {
    @RabbitListener(queues = "#{serviceACache.name}")
    public void invalidate(CacheInvalidationEvent event) {
        cache.evict(event.key());
    }
}

Каждый сервис объявляет свою временную queue, привязывает её к общему fanout exchange. При публикации сообщение копируется во все queue.

exclusive + autoDelete — queue уникальна для соединения и удаляется при отключении. Удобно: при рестарте сервиса не накапливается мусор.

Когда брать: инвалидация кешей, обновления конфигурации, broadcast-уведомления всем сервисам кластера.

3. Routing — точечная маршрутизация

Задача: разные события одного типа должны идти в разные обработчики по чёткому правилу.

Конфигурация: direct exchange + queue с разными binding key.

@Configuration
class OrderRoutingTopology {

    @Bean DirectExchange orders() { return new DirectExchange("orders", true, false); }

    @Bean Queue fulfillment() { return QueueBuilder.durable("orders.fulfillment").quorum().build(); }
    @Bean Queue audit()       { return QueueBuilder.durable("orders.audit").quorum().build(); }
    @Bean Queue alerts()      { return QueueBuilder.durable("orders.alerts").quorum().build(); }

    @Bean Binding b1(Queue fulfillment, DirectExchange orders) {
        return BindingBuilder.bind(fulfillment).to(orders).with("order.created");
    }
    @Bean Binding b2(Queue audit, DirectExchange orders) {
        return BindingBuilder.bind(audit).to(orders).with("order.created");
    }
    @Bean Binding b3(Queue audit, DirectExchange orders) {
        return BindingBuilder.bind(audit).to(orders).with("order.cancelled");
    }
    @Bean Binding b4(Queue alerts, DirectExchange orders) {
        return BindingBuilder.bind(alerts).to(orders).with("order.payment-failed");
    }
}
  • order.created → fulfillment + audit (две queue получают).
  • order.cancelled → только audit.
  • order.payment-failed → только alerts.

Когда брать: явное разделение событий — alerts отдельно от audit, основной flow отдельно от мониторинга.

4. Topic — иерархическая маршрутизация

Задача: события с иерархической структурой (domain.entity.action.region), подписки гибкими паттернами.

Конфигурация: topic exchange + binding с * (одно слово) и # (ноль или более слов).

@Configuration
class TopicRoutingTopology {
    @Bean TopicExchange events() { return new TopicExchange("events", true, false); }

    @Bean Queue auditAllOrders() { return QueueBuilder.durable("audit.orders").quorum().build(); }
    @Bean Queue euDashboard()    { return QueueBuilder.durable("dashboard.eu").quorum().build(); }
    @Bean Queue alerts()         { return QueueBuilder.durable("alerts.critical").quorum().build(); }

    @Bean Binding b1(Queue auditAllOrders, TopicExchange events) {
        // все события про заказы любого типа
        return BindingBuilder.bind(auditAllOrders).to(events).with("order.#");
    }
    @Bean Binding b2(Queue euDashboard, TopicExchange events) {
        // только EU-события
        return BindingBuilder.bind(euDashboard).to(events).with("*.*.eu");
    }
    @Bean Binding b3(Queue alerts, TopicExchange events) {
        // payment.failed в любой регион + любое critical
        return BindingBuilder.bind(alerts).to(events).with("payment.failed.#");
    }
}

publish("events", "order.cancelled.eu", ...) попадает в auditAllOrders (order.#) и в euDashboard (*.*.eu).

Когда брать: системы событий с понятной таксономией, где консьюмеру нужно подписаться на «всё про заказы», «всё в EU», «всё, что относится к платежам» — без переделки топологии при добавлении новых событий.

5. RPC — запрос-ответ через очередь

Задача: синхронный запрос-ответ, но через брокер, не HTTP. Полезно, когда сервис-получатель сидит за NAT, или когда нужна асинхронность на стороне consumer без блокировки producer.

Конфигурация: request queue + временная reply-to queue для каждого producer'а + correlation-id для сопоставления запросов и ответов.

В Spring AMQP это упрощено через sendAndReceive:

// Producer (RPC client)
@Component
@RequiredArgsConstructor
class PricingClient {
    private final RabbitTemplate rabbit;

    public PriceQuote quote(QuoteRequest request) {
        return (PriceQuote) rabbit.convertSendAndReceive(
            "pricing.exchange", "pricing.quote", request);
    }
}

// Consumer (RPC server)
@Component
class PricingServer {
    @RabbitListener(queues = "pricing.quote")
    public PriceQuote handle(QuoteRequest request) {
        return PriceQuote.compute(request);  // return автоматически отправляется в reply-to
    }
}

Под капотом Spring AMQP создаёт временную reply queue, проставляет reply-to header в сообщение, ставит correlation-id, ждёт ответ. Возврат из метода @RabbitListener автоматически становится сообщением, публикуемым в reply-to.

Когда брать: cross-сервисный синхронный вызов, где нужна асинхронность по консьюмер-стороне (consumer может обработать пачкой); распределение запросов по пулу консьюмеров с балансировкой; ситуация, где HTTP не подходит (firewall, NAT, no public IP).

Когда не брать: когда HTTP/gRPC просто работает. RPC через брокер дороже и сложнее в отладке.

6. Idempotent Consumer — обработка повторов

Задача: AMQP даёт at-least-once delivery. Это значит: одно и то же сообщение может прийти дважды (брокер не получил ack из-за сетевой проблемы, ретранслировал). Консьюмер обязан быть идемпотентным.

Три типичных подхода:

Idempotency key + БД

@RabbitListener(queues = "payments")
@Transactional
public void process(PaymentEvent event) {
    if (processedEventsRepo.existsByIdempotencyKey(event.idempotencyKey())) {
        log.info("duplicate, skipping: {}", event.idempotencyKey());
        return;  // ack, ничего не делаем
    }
    processedEventsRepo.save(new ProcessedEvent(event.idempotencyKey()));
    accountRepo.debit(event.accountId(), event.amount());
}

Таблица processed_events с UNIQUE индексом на idempotencyKey. Race condition между двумя одновременными повторами ловится через constraint violation.

Состояние агрегата

Если событие меняет состояние, проверять текущее состояние:

@Transactional
public void onOrderConfirmed(OrderConfirmedEvent event) {
    var order = orderRepo.findById(event.orderId()).orElseThrow();
    if (order.status() == OrderStatus.CONFIRMED) {
        return;  // уже обработано, ack
    }
    order.confirm();
    orderRepo.save(order);
}

Простой и работающий способ — не требует отдельной таблицы.

Outbox/Inbox pattern

Идемпотентность на уровне инфраструктуры — см. Distributed Patterns.

Дополнительные паттерны

Delayed Retry Queue

В Spring Kafka есть @RetryableTopic с задержками. В Spring AMQP — собирается через x-message-ttl + DLX:

@Bean Queue retryQueue() {
    return QueueBuilder.durable("orders.retry")
        .withArgument("x-message-ttl", 30_000)
        .withArgument("x-dead-letter-exchange", "orders")
        .withArgument("x-dead-letter-routing-key", "order.created")
        .quorum().build();
}

Поток: основной обработчик упал → reject → сообщение в retry-queue → через 30 сек по TTL уходит в DLX → обратно в основной exchange → новая попытка.

Минус: нет встроенного счётчика попыток (его надо считать через header x-death.count).

Dead Letter Pattern с разбором

Не просто «отправляем в DLQ и забываем», а разбираем причину:

@RabbitListener(queues = "orders.dlq")
public void inspect(
        OrderEvent event,
        @Header("x-death") List<Map<String, Object>> deaths,
        @Header(AmqpHeaders.RECEIVED_EXCHANGE) String origExchange,
        @Header("x-original-exception") String exceptionType) {

    var deathCount = deaths.stream()
        .mapToLong(d -> (long) d.get("count"))
        .sum();

    if (deathCount < 10) {
        // mb transient — пробуем переотправить
        rabbit.send(origExchange, "order.created", convert(event));
    } else {
        // persistent failure — alert + сохранить для ручного разбора
        alertService.send("DLQ overflow: " + event.orderId() + " — " + exceptionType);
        deadLetterRepo.save(event);
    }
}

Outbox через AMQP

Outbox — паттерн atomic write «БД + событие». Транзакция: запись бизнес-данных + запись события в таблицу outbox. Отдельный процесс (poller или CDC) читает outbox и публикует в AMQP.

@Transactional
public void confirm(OrderId orderId) {
    var order = orderRepo.findById(orderId).orElseThrow();
    order.confirm();
    orderRepo.save(order);
    outboxRepo.save(new OutboxEvent(
        UUID.randomUUID(),
        "order.confirmed",
        "orders",
        toJson(new OrderConfirmedEvent(orderId))
    ));
}

@Scheduled(fixedDelay = 500)
@Transactional
public void publishOutbox() {
    var batch = outboxRepo.fetchUnpublished(100);
    for (var event : batch) {
        rabbit.convertAndSend(event.exchange(), event.routingKey(), event.payload());
        outboxRepo.markPublished(event.id());
    }
}

Главный плюс: транзакция БД и событие либо обе применились, либо ни одна. Дубли возможны (publisher confirm не прошёл, перечитали outbox) — поэтому консьюмер обязан быть идемпотентным.

Distributed Patterns Style Guide — детальный разбор Outbox/Inbox.

Шпаргалка по выбору

ЗадачаПаттернExchangeКонфигурация
Распределить нагрузку между обработчикамиWork Queuedefault direct ("")1 queue, N consumers
Broadcast события всем сервисамPub/Subfanout1 queue на сервис
Точечная маршрутизация события в подписчиковRoutingdirectbinding по routing key
Иерархическая фильтрацияTopictopicbinding с * / #
Cross-service синхронный вызов через очередьRPCdirect или dedicatedreply-to + correlation-id
At-least-once → at-most-once на стороне consumerIdempotent Consumerлюбойidempotency key + UNIQUE
Retry с backoffDelayed Retry Queuedirect + DLXTTL queue → DLX → основной
БД-транзакция + событие атомарноOutboxdirectтаблица outbox + poller

Что почитать дальше

  • Протокол AMQP — модель доставки.
  • RabbitMQ в production — Quorum Queues, кластеризация, мониторинг.
  • Spring AMQP — фреймворковый код.
  • AMQP vs Kafka — какой брокер брать.
  • Distributed Patterns Style Guide — Saga, Outbox, Idempotent Consumer.
  • Enterprise Integration Patterns (Hohpe, Woolf) — классическая книга.