Когда сервисы начинают общаться через брокер, возникает вопрос: как правильно организовать очереди, обменники и подписки? Большинство задач укладываются в несколько типовых схем. Разберём каждую на примерах Spring AMQP.
Раздать задачи нескольким обработчикам — Work Queue
Представьте: пользователи загружают фотографии, и каждую надо сжать и нарезать в несколько размеров. Это долго. Делать это прямо в HTTP-запросе нельзя — пользователь будет ждать минуты.
Решение — work queue (очередь задач): сохранить задание в очередь и отдать её одному из пула обработчиков.
producer → [одна queue] → consumer 1
→ consumer 2
→ consumer 3
Брокер сам распределяет задачи между обработчиками. Каждое сообщение получает ровно один из них.
@Configuration
class ImageProcessingTopology {
@Bean
Queue imagesQueue() {
return QueueBuilder.durable("images.to-process").quorum().build();
}
}
@Component
class ImageProcessor {
@RabbitListener(queues = "images.to-process", concurrency = "5-20")
public void process(ImageJob job) {
// обработка изображения
}
}
concurrency = "5-20" означает: минимум 5 потоков, максимум 20. Если запустить 5 копий сервиса — получится 25–100 параллельных обработчиков.
Когда брать: фоновые задачи — обработка файлов, отправка писем, генерация отчётов, любые «положили в очередь — кто-то возьмёт».
Отправить событие всем сервисам сразу — Publish/Subscribe
Другая задача: произошло событие «конфигурация обновлена» — каждый сервис должен обновить свой кеш. Нельзя знать заранее, кто именно подписан и сколько сервисов запущено.
Это publish/subscribe (рассылка всем): издатель отправляет одно сообщение, а копию получают все подписчики одновременно.
Здесь нужен fanout exchange — он копирует каждое сообщение во все привязанные очереди. Каждый сервис объявляет свою очередь и привязывает её к общему обменнику.
@Configuration
class CacheInvalidationTopology {
@Bean FanoutExchange cacheInvalidation() {
return new FanoutExchange("cache.invalidation", true, false);
}
@Bean Queue serviceACache() {
return QueueBuilder.nonDurable().exclusive().autoDelete().build();
}
@Bean Binding bindA(Queue serviceACache, FanoutExchange cacheInvalidation) {
return BindingBuilder.bind(serviceACache).to(cacheInvalidation);
}
}
@Component
class ServiceACacheListener {
@RabbitListener(queues = "#{serviceACache.name}")
public void invalidate(CacheInvalidationEvent event) {
cache.evict(event.key());
}
}
exclusive + autoDelete — очередь принадлежит одному соединению и удаляется при отключении. При перезапуске сервиса не накапливается мусор в брокере.
Когда брать: инвалидация кешей, broadcast-уведомления всему кластеру, обновления конфигурации.
Направить событие в нужный обработчик — Routing
Иногда нужно не «всем», а «именно тому, кому надо». Например: событие order.created должно идти в сервис выполнения и в аудит, а order.payment-failed — только в алерты.
Это routing (точечная маршрутизация): direct exchange смотрит на ключ маршрутизации сообщения и отправляет его только в очереди с совпадающим binding key.
@Configuration
class OrderRoutingTopology {
@Bean DirectExchange orders() { return new DirectExchange("orders", true, false); }
@Bean Queue fulfillment() { return QueueBuilder.durable("orders.fulfillment").quorum().build(); }
@Bean Queue audit() { return QueueBuilder.durable("orders.audit").quorum().build(); }
@Bean Queue alerts() { return QueueBuilder.durable("orders.alerts").quorum().build(); }
@Bean Binding b1(Queue fulfillment, DirectExchange orders) {
return BindingBuilder.bind(fulfillment).to(orders).with("order.created");
}
@Bean Binding b2(Queue audit, DirectExchange orders) {
return BindingBuilder.bind(audit).to(orders).with("order.created");
}
@Bean Binding b3(Queue audit, DirectExchange orders) {
return BindingBuilder.bind(audit).to(orders).with("order.cancelled");
}
@Bean Binding b4(Queue alerts, DirectExchange orders) {
return BindingBuilder.bind(alerts).to(orders).with("order.payment-failed");
}
}
order.created→ fulfillment + audit.order.cancelled→ только audit.order.payment-failed→ только alerts.
Когда брать: явное разделение потоков — алерты отдельно от аудита, основной обработчик отдельно от мониторинга.
Подписаться по маске — Topic
Routing хорош для жёстких правил. Но что если сервис хочет подписаться на «все события по заказам»? Или «всё из EU-региона»?
Topic exchange позволяет задавать подписки через шаблоны. Ключи сообщений строятся через точку (order.created.eu), а в подписке можно использовать:
*— ровно одно слово,#— ноль и более слов.
@Configuration
class TopicRoutingTopology {
@Bean TopicExchange events() { return new TopicExchange("events", true, false); }
@Bean Queue auditAllOrders() { return QueueBuilder.durable("audit.orders").quorum().build(); }
@Bean Queue euDashboard() { return QueueBuilder.durable("dashboard.eu").quorum().build(); }
@Bean Queue alerts() { return QueueBuilder.durable("alerts.critical").quorum().build(); }
@Bean Binding b1(Queue auditAllOrders, TopicExchange events) {
return BindingBuilder.bind(auditAllOrders).to(events).with("order.#");
}
@Bean Binding b2(Queue euDashboard, TopicExchange events) {
return BindingBuilder.bind(euDashboard).to(events).with("*.*.eu");
}
@Bean Binding b3(Queue alerts, TopicExchange events) {
return BindingBuilder.bind(alerts).to(events).with("payment.failed.#");
}
}
Сообщение с ключом order.cancelled.eu попадёт в auditAllOrders (по order.#) и в euDashboard (по *.*.eu).
Когда брать: события с иерархической структурой, когда нужно гибко подписываться без переделки топологии при добавлении новых типов событий.
Запрос-ответ через очередь — RPC
Иногда нужен синхронный ответ, но HTTP не подходит: сервис за NAT, нет публичного адреса, или хочется балансировки по пулу обработчиков.
RPC через очередь: клиент отправляет запрос и ждёт ответа. Брокер доставляет запрос одному из обработчиков, тот отвечает в отдельную очередь-ответ. Для сопоставления запроса и ответа используется correlation-id.
В Spring AMQP это скрыто за sendAndReceive:
// Клиент
@Component
@RequiredArgsConstructor
class PricingClient {
private final RabbitTemplate rabbit;
public PriceQuote quote(QuoteRequest request) {
return (PriceQuote) rabbit.convertSendAndReceive(
"pricing.exchange", "pricing.quote", request);
}
}
// Сервер
@Component
class PricingServer {
@RabbitListener(queues = "pricing.quote")
public PriceQuote handle(QuoteRequest request) {
return PriceQuote.compute(request); // возврат автоматически уходит в reply-to
}
}
Spring AMQP сам создаёт временную очередь-ответ, проставляет reply-to и correlation-id, ждёт ответа. Возврат из @RabbitListener автоматически публикуется обратно.
Когда брать: нужен синхронный вызов, но HTTP не работает (NAT, firewall, нет публичного адреса); нужна балансировка запросов по пулу обработчиков.
Когда не брать: если HTTP/gRPC просто работает — RPC через брокер сложнее в отладке и дороже.
Что делать с повторной доставкой — Idempotent Consumer
AMQP гарантирует at-least-once доставку: одно и то же сообщение может прийти дважды. Это случается, когда брокер не получил подтверждение обработки (например, из-за сетевой проблемы) и переотправляет сообщение.
Обработчик обязан уметь работать с повторами, не ломая бизнес-логику.
Ключ идемпотентности в базе данных
Самый надёжный способ — запоминать уже обработанные сообщения:
@RabbitListener(queues = "payments")
@Transactional
public void process(PaymentEvent event) {
if (processedEventsRepo.existsByIdempotencyKey(event.idempotencyKey())) {
return; // уже обработано — просто подтверждаем получение
}
processedEventsRepo.save(new ProcessedEvent(event.idempotencyKey()));
accountRepo.debit(event.accountId(), event.amount());
}
Таблица processed_events с уникальным индексом на idempotency_key. Если два одинаковых сообщения придут одновременно — база данных поймает дубль через нарушение уникального ограничения.
Проверка состояния объекта
Если событие переводит объект в новое состояние, достаточно проверить текущее:
@Transactional
public void onOrderConfirmed(OrderConfirmedEvent event) {
var order = orderRepo.findById(event.orderId()).orElseThrow();
if (order.status() == OrderStatus.CONFIRMED) {
return; // уже в нужном состоянии
}
order.confirm();
orderRepo.save(order);
}
Не требует отдельной таблицы — состояние уже хранится в бизнес-объекте.
Retry с задержкой и Dead Letter Queue
Что если обработчик упал не из-за бага, а из-за временной недоступности внешнего сервиса? Нужно попробовать снова, но не немедленно.
В Spring AMQP delayed retry собирается через x-message-ttl и Dead Letter Exchange:
@Bean Queue retryQueue() {
return QueueBuilder.durable("orders.retry")
.withArgument("x-message-ttl", 30_000) // ждём 30 секунд
.withArgument("x-dead-letter-exchange", "orders")
.withArgument("x-dead-letter-routing-key", "order.created")
.quorum().build();
}
Поток: обработчик отклонил сообщение → оно попадает в retry очередь → через 30 секунд по истечении TTL уходит через DLX обратно в основную очередь → новая попытка.
Количество попыток считается через заголовок x-death.count — его нужно проверять вручную, встроенного ограничителя нет.
Сообщения, которые не получилось обработать после всех попыток, уходят в Dead Letter Queue (DLQ) — отдельную очередь для разбора вручную или через алерты.
Гарантированная публикация — Outbox
Бывает задача: сохранить заказ в базу данных и опубликовать событие — атомарно. Если сначала сохранить, потом опубликовать, то сервис может упасть между двумя операциями. Событие потеряется.
Outbox pattern: событие сохраняется в ту же транзакцию, что и бизнес-данные. Отдельный процесс читает таблицу и публикует в AMQP.
@Transactional
public void confirm(OrderId orderId) {
var order = orderRepo.findById(orderId).orElseThrow();
order.confirm();
orderRepo.save(order);
outboxRepo.save(new OutboxEvent(
UUID.randomUUID(),
"order.confirmed",
"orders",
toJson(new OrderConfirmedEvent(orderId))
));
}
@Scheduled(fixedDelay = 500)
@Transactional
public void publishOutbox() {
var batch = outboxRepo.fetchUnpublished(100);
for (var event : batch) {
rabbit.convertAndSend(event.exchange(), event.routingKey(), event.payload());
outboxRepo.markPublished(event.id());
}
}
Либо оба изменения зафиксированы, либо ни одного. Дубли возможны (публикация прошла, но пометить как отправленное не успело) — поэтому получатель всё равно должен быть идемпотентным.
Шпаргалка по выбору
| Задача | Паттерн | Тип обменника |
|---|---|---|
| Распределить нагрузку между обработчиками | Work Queue | direct (default) |
| Broadcast события всем сервисам | Publish/Subscribe | fanout |
| Разные события в разные очереди | Routing | direct |
| Подписка по маске на иерархические события | Topic | topic |
| Синхронный вызов через очередь | RPC | direct + reply-to |
| Защита от повторной доставки | Idempotent Consumer | любой |
| Retry с задержкой | Delayed Retry | direct + DLX |
| Атомарная публикация вместе с записью в БД | Outbox | direct |
Коротко
- Work Queue — одна очередь, несколько обработчиков, каждое сообщение получает ровно один. Для фоновых задач.
- Publish/Subscribe — fanout exchange копирует сообщение во все привязанные очереди. Для broadcast-событий.
- Routing — direct exchange смотрит на ключ маршрутизации. Для точного разделения потоков.
- Topic — как routing, но с шаблонами
*и#. Для иерархических событий с гибкой подпиской. - RPC через очередь — запрос-ответ через брокер с
reply-toиcorrelation-id. Для вызовов без HTTP. - Idempotent Consumer — at-least-once означает возможные дубли. Защита: ключ идемпотентности в БД или проверка состояния объекта.
- Delayed Retry — TTL + DLX: сообщение «паркуется» на время, потом возвращается.
- Outbox — событие сохраняется в той же транзакции, что и данные. Атомарность без двухфазного коммита.
Что почитать дальше
- Протокол AMQP — модель exchange/binding/queue изнутри.
- Spring AMQP — конфигурация, RabbitTemplate, аннотации.
- RabbitMQ в production — Quorum Queues, кластеризация, мониторинг.
- AMQP vs Kafka — какой брокер и когда брать.