← назад к разделу

Когда сервисы начинают общаться через брокер, возникает вопрос: как правильно организовать очереди, обменники и подписки? Большинство задач укладываются в несколько типовых схем. Разберём каждую на примерах Spring AMQP.

Раздать задачи нескольким обработчикам — Work Queue

Представьте: пользователи загружают фотографии, и каждую надо сжать и нарезать в несколько размеров. Это долго. Делать это прямо в HTTP-запросе нельзя — пользователь будет ждать минуты.

Решение — work queue (очередь задач): сохранить задание в очередь и отдать её одному из пула обработчиков.

producer → [одна queue] → consumer 1
                        → consumer 2
                        → consumer 3

Брокер сам распределяет задачи между обработчиками. Каждое сообщение получает ровно один из них.

@Configuration
class ImageProcessingTopology {
    @Bean
    Queue imagesQueue() {
        return QueueBuilder.durable("images.to-process").quorum().build();
    }
}

@Component
class ImageProcessor {
    @RabbitListener(queues = "images.to-process", concurrency = "5-20")
    public void process(ImageJob job) {
        // обработка изображения
    }
}

concurrency = "5-20" означает: минимум 5 потоков, максимум 20. Если запустить 5 копий сервиса — получится 25–100 параллельных обработчиков.

Когда брать: фоновые задачи — обработка файлов, отправка писем, генерация отчётов, любые «положили в очередь — кто-то возьмёт».

Отправить событие всем сервисам сразу — Publish/Subscribe

Другая задача: произошло событие «конфигурация обновлена» — каждый сервис должен обновить свой кеш. Нельзя знать заранее, кто именно подписан и сколько сервисов запущено.

Это publish/subscribe (рассылка всем): издатель отправляет одно сообщение, а копию получают все подписчики одновременно.

Здесь нужен fanout exchange — он копирует каждое сообщение во все привязанные очереди. Каждый сервис объявляет свою очередь и привязывает её к общему обменнику.

@Configuration
class CacheInvalidationTopology {

    @Bean FanoutExchange cacheInvalidation() {
        return new FanoutExchange("cache.invalidation", true, false);
    }

    @Bean Queue serviceACache() {
        return QueueBuilder.nonDurable().exclusive().autoDelete().build();
    }

    @Bean Binding bindA(Queue serviceACache, FanoutExchange cacheInvalidation) {
        return BindingBuilder.bind(serviceACache).to(cacheInvalidation);
    }
}

@Component
class ServiceACacheListener {
    @RabbitListener(queues = "#{serviceACache.name}")
    public void invalidate(CacheInvalidationEvent event) {
        cache.evict(event.key());
    }
}

exclusive + autoDelete — очередь принадлежит одному соединению и удаляется при отключении. При перезапуске сервиса не накапливается мусор в брокере.

Когда брать: инвалидация кешей, broadcast-уведомления всему кластеру, обновления конфигурации.

Направить событие в нужный обработчик — Routing

Иногда нужно не «всем», а «именно тому, кому надо». Например: событие order.created должно идти в сервис выполнения и в аудит, а order.payment-failed — только в алерты.

Это routing (точечная маршрутизация): direct exchange смотрит на ключ маршрутизации сообщения и отправляет его только в очереди с совпадающим binding key.

@Configuration
class OrderRoutingTopology {

    @Bean DirectExchange orders() { return new DirectExchange("orders", true, false); }

    @Bean Queue fulfillment() { return QueueBuilder.durable("orders.fulfillment").quorum().build(); }
    @Bean Queue audit()       { return QueueBuilder.durable("orders.audit").quorum().build(); }
    @Bean Queue alerts()      { return QueueBuilder.durable("orders.alerts").quorum().build(); }

    @Bean Binding b1(Queue fulfillment, DirectExchange orders) {
        return BindingBuilder.bind(fulfillment).to(orders).with("order.created");
    }
    @Bean Binding b2(Queue audit, DirectExchange orders) {
        return BindingBuilder.bind(audit).to(orders).with("order.created");
    }
    @Bean Binding b3(Queue audit, DirectExchange orders) {
        return BindingBuilder.bind(audit).to(orders).with("order.cancelled");
    }
    @Bean Binding b4(Queue alerts, DirectExchange orders) {
        return BindingBuilder.bind(alerts).to(orders).with("order.payment-failed");
    }
}
  • order.created → fulfillment + audit.
  • order.cancelled → только audit.
  • order.payment-failed → только alerts.

Когда брать: явное разделение потоков — алерты отдельно от аудита, основной обработчик отдельно от мониторинга.

Подписаться по маске — Topic

Routing хорош для жёстких правил. Но что если сервис хочет подписаться на «все события по заказам»? Или «всё из EU-региона»?

Topic exchange позволяет задавать подписки через шаблоны. Ключи сообщений строятся через точку (order.created.eu), а в подписке можно использовать:

  • * — ровно одно слово,
  • # — ноль и более слов.
@Configuration
class TopicRoutingTopology {
    @Bean TopicExchange events() { return new TopicExchange("events", true, false); }

    @Bean Queue auditAllOrders() { return QueueBuilder.durable("audit.orders").quorum().build(); }
    @Bean Queue euDashboard()    { return QueueBuilder.durable("dashboard.eu").quorum().build(); }
    @Bean Queue alerts()         { return QueueBuilder.durable("alerts.critical").quorum().build(); }

    @Bean Binding b1(Queue auditAllOrders, TopicExchange events) {
        return BindingBuilder.bind(auditAllOrders).to(events).with("order.#");
    }
    @Bean Binding b2(Queue euDashboard, TopicExchange events) {
        return BindingBuilder.bind(euDashboard).to(events).with("*.*.eu");
    }
    @Bean Binding b3(Queue alerts, TopicExchange events) {
        return BindingBuilder.bind(alerts).to(events).with("payment.failed.#");
    }
}

Сообщение с ключом order.cancelled.eu попадёт в auditAllOrders (по order.#) и в euDashboard (по *.*.eu).

Когда брать: события с иерархической структурой, когда нужно гибко подписываться без переделки топологии при добавлении новых типов событий.

Запрос-ответ через очередь — RPC

Иногда нужен синхронный ответ, но HTTP не подходит: сервис за NAT, нет публичного адреса, или хочется балансировки по пулу обработчиков.

RPC через очередь: клиент отправляет запрос и ждёт ответа. Брокер доставляет запрос одному из обработчиков, тот отвечает в отдельную очередь-ответ. Для сопоставления запроса и ответа используется correlation-id.

В Spring AMQP это скрыто за sendAndReceive:

// Клиент
@Component
@RequiredArgsConstructor
class PricingClient {
    private final RabbitTemplate rabbit;

    public PriceQuote quote(QuoteRequest request) {
        return (PriceQuote) rabbit.convertSendAndReceive(
            "pricing.exchange", "pricing.quote", request);
    }
}

// Сервер
@Component
class PricingServer {
    @RabbitListener(queues = "pricing.quote")
    public PriceQuote handle(QuoteRequest request) {
        return PriceQuote.compute(request); // возврат автоматически уходит в reply-to
    }
}

Spring AMQP сам создаёт временную очередь-ответ, проставляет reply-to и correlation-id, ждёт ответа. Возврат из @RabbitListener автоматически публикуется обратно.

Когда брать: нужен синхронный вызов, но HTTP не работает (NAT, firewall, нет публичного адреса); нужна балансировка запросов по пулу обработчиков.

Когда не брать: если HTTP/gRPC просто работает — RPC через брокер сложнее в отладке и дороже.

Что делать с повторной доставкой — Idempotent Consumer

AMQP гарантирует at-least-once доставку: одно и то же сообщение может прийти дважды. Это случается, когда брокер не получил подтверждение обработки (например, из-за сетевой проблемы) и переотправляет сообщение.

Обработчик обязан уметь работать с повторами, не ломая бизнес-логику.

Ключ идемпотентности в базе данных

Самый надёжный способ — запоминать уже обработанные сообщения:

@RabbitListener(queues = "payments")
@Transactional
public void process(PaymentEvent event) {
    if (processedEventsRepo.existsByIdempotencyKey(event.idempotencyKey())) {
        return; // уже обработано — просто подтверждаем получение
    }
    processedEventsRepo.save(new ProcessedEvent(event.idempotencyKey()));
    accountRepo.debit(event.accountId(), event.amount());
}

Таблица processed_events с уникальным индексом на idempotency_key. Если два одинаковых сообщения придут одновременно — база данных поймает дубль через нарушение уникального ограничения.

Проверка состояния объекта

Если событие переводит объект в новое состояние, достаточно проверить текущее:

@Transactional
public void onOrderConfirmed(OrderConfirmedEvent event) {
    var order = orderRepo.findById(event.orderId()).orElseThrow();
    if (order.status() == OrderStatus.CONFIRMED) {
        return; // уже в нужном состоянии
    }
    order.confirm();
    orderRepo.save(order);
}

Не требует отдельной таблицы — состояние уже хранится в бизнес-объекте.

Retry с задержкой и Dead Letter Queue

Что если обработчик упал не из-за бага, а из-за временной недоступности внешнего сервиса? Нужно попробовать снова, но не немедленно.

В Spring AMQP delayed retry собирается через x-message-ttl и Dead Letter Exchange:

@Bean Queue retryQueue() {
    return QueueBuilder.durable("orders.retry")
        .withArgument("x-message-ttl", 30_000)             // ждём 30 секунд
        .withArgument("x-dead-letter-exchange", "orders")
        .withArgument("x-dead-letter-routing-key", "order.created")
        .quorum().build();
}

Поток: обработчик отклонил сообщение → оно попадает в retry очередь → через 30 секунд по истечении TTL уходит через DLX обратно в основную очередь → новая попытка.

Количество попыток считается через заголовок x-death.count — его нужно проверять вручную, встроенного ограничителя нет.

Сообщения, которые не получилось обработать после всех попыток, уходят в Dead Letter Queue (DLQ) — отдельную очередь для разбора вручную или через алерты.

Гарантированная публикация — Outbox

Бывает задача: сохранить заказ в базу данных и опубликовать событие — атомарно. Если сначала сохранить, потом опубликовать, то сервис может упасть между двумя операциями. Событие потеряется.

Outbox pattern: событие сохраняется в ту же транзакцию, что и бизнес-данные. Отдельный процесс читает таблицу и публикует в AMQP.

@Transactional
public void confirm(OrderId orderId) {
    var order = orderRepo.findById(orderId).orElseThrow();
    order.confirm();
    orderRepo.save(order);
    outboxRepo.save(new OutboxEvent(
        UUID.randomUUID(),
        "order.confirmed",
        "orders",
        toJson(new OrderConfirmedEvent(orderId))
    ));
}

@Scheduled(fixedDelay = 500)
@Transactional
public void publishOutbox() {
    var batch = outboxRepo.fetchUnpublished(100);
    for (var event : batch) {
        rabbit.convertAndSend(event.exchange(), event.routingKey(), event.payload());
        outboxRepo.markPublished(event.id());
    }
}

Либо оба изменения зафиксированы, либо ни одного. Дубли возможны (публикация прошла, но пометить как отправленное не успело) — поэтому получатель всё равно должен быть идемпотентным.

Шпаргалка по выбору

ЗадачаПаттернТип обменника
Распределить нагрузку между обработчикамиWork Queuedirect (default)
Broadcast события всем сервисамPublish/Subscribefanout
Разные события в разные очередиRoutingdirect
Подписка по маске на иерархические событияTopictopic
Синхронный вызов через очередьRPCdirect + reply-to
Защита от повторной доставкиIdempotent Consumerлюбой
Retry с задержкойDelayed Retrydirect + DLX
Атомарная публикация вместе с записью в БДOutboxdirect

Коротко

  • Work Queue — одна очередь, несколько обработчиков, каждое сообщение получает ровно один. Для фоновых задач.
  • Publish/Subscribe — fanout exchange копирует сообщение во все привязанные очереди. Для broadcast-событий.
  • Routing — direct exchange смотрит на ключ маршрутизации. Для точного разделения потоков.
  • Topic — как routing, но с шаблонами * и #. Для иерархических событий с гибкой подпиской.
  • RPC через очередь — запрос-ответ через брокер с reply-to и correlation-id. Для вызовов без HTTP.
  • Idempotent Consumer — at-least-once означает возможные дубли. Защита: ключ идемпотентности в БД или проверка состояния объекта.
  • Delayed Retry — TTL + DLX: сообщение «паркуется» на время, потом возвращается.
  • Outbox — событие сохраняется в той же транзакции, что и данные. Атомарность без двухфазного коммита.

Что почитать дальше

  • Протокол AMQP — модель exchange/binding/queue изнутри.
  • Spring AMQP — конфигурация, RabbitTemplate, аннотации.
  • RabbitMQ в production — Quorum Queues, кластеризация, мониторинг.
  • AMQP vs Kafka — какой брокер и когда брать.