Большинство задач интеграции сводятся к шести типовым паттернам, описанным в книге Hohpe/Woolf «Enterprise Integration Patterns». AMQP с его моделью exchange/binding/queue ложится на них естественно — каждый паттерн соответствует определённой конфигурации exchange-типа и числа queue.
Эта статья — практический проход по каждому паттерну на примерах кода Spring AMQP и без перетягивания одеяла в сторону какой-то одной задачи.
1. Work Queue — распределение задач
Задача: одна задача, пул обработчиков, каждое сообщение обрабатывается ровно одним из них.
Конфигурация: одна queue, несколько consumer'ов на ней. Брокер сам распределяет round-robin (с учётом prefetch).
@Configuration
class ImageProcessingTopology {
@Bean
Queue imagesQueue() {
return QueueBuilder.durable("images.to-process").quorum().build();
}
}
@Component
class ImageProcessor {
@RabbitListener(queues = "images.to-process", concurrency = "5-20")
public void process(ImageJob job) {
// тяжёлая обработка
}
}
Один сервис может крутить 5–20 потоков на одну queue; 5 экземпляров сервиса = 25–100 параллельных обработчиков. Брокер распределяет.
Когда брать: задачи в фоне — обработка изображений, отправка писем, генерация PDF, любые «положили в очередь — кто-то возьмёт».
2. Publish/Subscribe — broadcast
Задача: одно событие, все заинтересованные должны получить копию.
Конфигурация: fanout exchange + по одной queue на каждого подписчика.
@Configuration
class CacheInvalidationTopology {
@Bean FanoutExchange cacheInvalidation() {
return new FanoutExchange("cache.invalidation", true, false);
}
@Bean Queue serviceACache() {
return QueueBuilder.nonDurable().exclusive().autoDelete().build();
}
@Bean
Binding bindA(Queue serviceACache, FanoutExchange cacheInvalidation) {
return BindingBuilder.bind(serviceACache).to(cacheInvalidation);
}
}
@Component
class ServiceACacheListener {
@RabbitListener(queues = "#{serviceACache.name}")
public void invalidate(CacheInvalidationEvent event) {
cache.evict(event.key());
}
}
Каждый сервис объявляет свою временную queue, привязывает её к общему fanout exchange. При публикации сообщение копируется во все queue.
exclusive + autoDelete — queue уникальна для соединения и удаляется при отключении. Удобно: при рестарте сервиса не накапливается мусор.
Когда брать: инвалидация кешей, обновления конфигурации, broadcast-уведомления всем сервисам кластера.
3. Routing — точечная маршрутизация
Задача: разные события одного типа должны идти в разные обработчики по чёткому правилу.
Конфигурация: direct exchange + queue с разными binding key.
@Configuration
class OrderRoutingTopology {
@Bean DirectExchange orders() { return new DirectExchange("orders", true, false); }
@Bean Queue fulfillment() { return QueueBuilder.durable("orders.fulfillment").quorum().build(); }
@Bean Queue audit() { return QueueBuilder.durable("orders.audit").quorum().build(); }
@Bean Queue alerts() { return QueueBuilder.durable("orders.alerts").quorum().build(); }
@Bean Binding b1(Queue fulfillment, DirectExchange orders) {
return BindingBuilder.bind(fulfillment).to(orders).with("order.created");
}
@Bean Binding b2(Queue audit, DirectExchange orders) {
return BindingBuilder.bind(audit).to(orders).with("order.created");
}
@Bean Binding b3(Queue audit, DirectExchange orders) {
return BindingBuilder.bind(audit).to(orders).with("order.cancelled");
}
@Bean Binding b4(Queue alerts, DirectExchange orders) {
return BindingBuilder.bind(alerts).to(orders).with("order.payment-failed");
}
}
order.created→ fulfillment + audit (две queue получают).order.cancelled→ только audit.order.payment-failed→ только alerts.
Когда брать: явное разделение событий — alerts отдельно от audit, основной flow отдельно от мониторинга.
4. Topic — иерархическая маршрутизация
Задача: события с иерархической структурой (domain.entity.action.region), подписки гибкими паттернами.
Конфигурация: topic exchange + binding с * (одно слово) и # (ноль или более слов).
@Configuration
class TopicRoutingTopology {
@Bean TopicExchange events() { return new TopicExchange("events", true, false); }
@Bean Queue auditAllOrders() { return QueueBuilder.durable("audit.orders").quorum().build(); }
@Bean Queue euDashboard() { return QueueBuilder.durable("dashboard.eu").quorum().build(); }
@Bean Queue alerts() { return QueueBuilder.durable("alerts.critical").quorum().build(); }
@Bean Binding b1(Queue auditAllOrders, TopicExchange events) {
// все события про заказы любого типа
return BindingBuilder.bind(auditAllOrders).to(events).with("order.#");
}
@Bean Binding b2(Queue euDashboard, TopicExchange events) {
// только EU-события
return BindingBuilder.bind(euDashboard).to(events).with("*.*.eu");
}
@Bean Binding b3(Queue alerts, TopicExchange events) {
// payment.failed в любой регион + любое critical
return BindingBuilder.bind(alerts).to(events).with("payment.failed.#");
}
}
publish("events", "order.cancelled.eu", ...) попадает в auditAllOrders (order.#) и в euDashboard (*.*.eu).
Когда брать: системы событий с понятной таксономией, где консьюмеру нужно подписаться на «всё про заказы», «всё в EU», «всё, что относится к платежам» — без переделки топологии при добавлении новых событий.
5. RPC — запрос-ответ через очередь
Задача: синхронный запрос-ответ, но через брокер, не HTTP. Полезно, когда сервис-получатель сидит за NAT, или когда нужна асинхронность на стороне consumer без блокировки producer.
Конфигурация: request queue + временная reply-to queue для каждого producer'а + correlation-id для сопоставления запросов и ответов.
В Spring AMQP это упрощено через sendAndReceive:
// Producer (RPC client)
@Component
@RequiredArgsConstructor
class PricingClient {
private final RabbitTemplate rabbit;
public PriceQuote quote(QuoteRequest request) {
return (PriceQuote) rabbit.convertSendAndReceive(
"pricing.exchange", "pricing.quote", request);
}
}
// Consumer (RPC server)
@Component
class PricingServer {
@RabbitListener(queues = "pricing.quote")
public PriceQuote handle(QuoteRequest request) {
return PriceQuote.compute(request); // return автоматически отправляется в reply-to
}
}
Под капотом Spring AMQP создаёт временную reply queue, проставляет reply-to header в сообщение, ставит correlation-id, ждёт ответ. Возврат из метода @RabbitListener автоматически становится сообщением, публикуемым в reply-to.
Когда брать: cross-сервисный синхронный вызов, где нужна асинхронность по консьюмер-стороне (consumer может обработать пачкой); распределение запросов по пулу консьюмеров с балансировкой; ситуация, где HTTP не подходит (firewall, NAT, no public IP).
Когда не брать: когда HTTP/gRPC просто работает. RPC через брокер дороже и сложнее в отладке.
6. Idempotent Consumer — обработка повторов
Задача: AMQP даёт at-least-once delivery. Это значит: одно и то же сообщение может прийти дважды (брокер не получил ack из-за сетевой проблемы, ретранслировал). Консьюмер обязан быть идемпотентным.
Три типичных подхода:
Idempotency key + БД
@RabbitListener(queues = "payments")
@Transactional
public void process(PaymentEvent event) {
if (processedEventsRepo.existsByIdempotencyKey(event.idempotencyKey())) {
log.info("duplicate, skipping: {}", event.idempotencyKey());
return; // ack, ничего не делаем
}
processedEventsRepo.save(new ProcessedEvent(event.idempotencyKey()));
accountRepo.debit(event.accountId(), event.amount());
}
Таблица processed_events с UNIQUE индексом на idempotencyKey. Race condition между двумя одновременными повторами ловится через constraint violation.
Состояние агрегата
Если событие меняет состояние, проверять текущее состояние:
@Transactional
public void onOrderConfirmed(OrderConfirmedEvent event) {
var order = orderRepo.findById(event.orderId()).orElseThrow();
if (order.status() == OrderStatus.CONFIRMED) {
return; // уже обработано, ack
}
order.confirm();
orderRepo.save(order);
}
Простой и работающий способ — не требует отдельной таблицы.
Outbox/Inbox pattern
Идемпотентность на уровне инфраструктуры — см. Distributed Patterns.
Дополнительные паттерны
Delayed Retry Queue
В Spring Kafka есть @RetryableTopic с задержками. В Spring AMQP — собирается через x-message-ttl + DLX:
@Bean Queue retryQueue() {
return QueueBuilder.durable("orders.retry")
.withArgument("x-message-ttl", 30_000)
.withArgument("x-dead-letter-exchange", "orders")
.withArgument("x-dead-letter-routing-key", "order.created")
.quorum().build();
}
Поток: основной обработчик упал → reject → сообщение в retry-queue → через 30 сек по TTL уходит в DLX → обратно в основной exchange → новая попытка.
Минус: нет встроенного счётчика попыток (его надо считать через header x-death.count).
Dead Letter Pattern с разбором
Не просто «отправляем в DLQ и забываем», а разбираем причину:
@RabbitListener(queues = "orders.dlq")
public void inspect(
OrderEvent event,
@Header("x-death") List<Map<String, Object>> deaths,
@Header(AmqpHeaders.RECEIVED_EXCHANGE) String origExchange,
@Header("x-original-exception") String exceptionType) {
var deathCount = deaths.stream()
.mapToLong(d -> (long) d.get("count"))
.sum();
if (deathCount < 10) {
// mb transient — пробуем переотправить
rabbit.send(origExchange, "order.created", convert(event));
} else {
// persistent failure — alert + сохранить для ручного разбора
alertService.send("DLQ overflow: " + event.orderId() + " — " + exceptionType);
deadLetterRepo.save(event);
}
}
Outbox через AMQP
Outbox — паттерн atomic write «БД + событие». Транзакция: запись бизнес-данных + запись события в таблицу outbox. Отдельный процесс (poller или CDC) читает outbox и публикует в AMQP.
@Transactional
public void confirm(OrderId orderId) {
var order = orderRepo.findById(orderId).orElseThrow();
order.confirm();
orderRepo.save(order);
outboxRepo.save(new OutboxEvent(
UUID.randomUUID(),
"order.confirmed",
"orders",
toJson(new OrderConfirmedEvent(orderId))
));
}
@Scheduled(fixedDelay = 500)
@Transactional
public void publishOutbox() {
var batch = outboxRepo.fetchUnpublished(100);
for (var event : batch) {
rabbit.convertAndSend(event.exchange(), event.routingKey(), event.payload());
outboxRepo.markPublished(event.id());
}
}
Главный плюс: транзакция БД и событие либо обе применились, либо ни одна. Дубли возможны (publisher confirm не прошёл, перечитали outbox) — поэтому консьюмер обязан быть идемпотентным.
Distributed Patterns Style Guide — детальный разбор Outbox/Inbox.
Шпаргалка по выбору
| Задача | Паттерн | Exchange | Конфигурация |
|---|---|---|---|
| Распределить нагрузку между обработчиками | Work Queue | default direct ("") | 1 queue, N consumers |
| Broadcast события всем сервисам | Pub/Sub | fanout | 1 queue на сервис |
| Точечная маршрутизация события в подписчиков | Routing | direct | binding по routing key |
| Иерархическая фильтрация | Topic | topic | binding с * / # |
| Cross-service синхронный вызов через очередь | RPC | direct или dedicated | reply-to + correlation-id |
| At-least-once → at-most-once на стороне consumer | Idempotent Consumer | любой | idempotency key + UNIQUE |
| Retry с backoff | Delayed Retry Queue | direct + DLX | TTL queue → DLX → основной |
| БД-транзакция + событие атомарно | Outbox | direct | таблица outbox + poller |
Что почитать дальше
- Протокол AMQP — модель доставки.
- RabbitMQ в production — Quorum Queues, кластеризация, мониторинг.
- Spring AMQP — фреймворковый код.
- AMQP vs Kafka — какой брокер брать.
- Distributed Patterns Style Guide — Saga, Outbox, Idempotent Consumer.
- Enterprise Integration Patterns (Hohpe, Woolf) — классическая книга.