← назад к разделу

Когда два сервиса хотят обменяться данными, простейшее решение — вызов напрямую: один делает HTTP-запрос к другому. Это работает, пока второй сервис всегда доступен, успевает принять все запросы и не падает под нагрузкой.

Как только один из этих пунктов перестаёт выполняться, появляется брокер сообщений. Первый сервис кладёт сообщение в брокер и занимается своими делами; второй забирает его, когда готов. Но чтобы брокер знал, куда класть каждое сообщение и кому его отдавать, нужен протокол — набор правил маршрутизации.

AMQP 0.9.1 (Advanced Message Queuing Protocol) — такой протокол. Его используют RabbitMQ, Apache Qpid и ActiveMQ Classic. Именно AMQP 0.9.1 обычно имеют в виду, когда говорят «AMQP» без уточнения версии.

Как сообщение попадает от отправителя к получателю

В обычной почте вы пишете адрес на конверте, а сортировочный центр решает, на какой склад его везти. В AMQP устроено похоже:

  • Producer (продьюсер) — отправляет сообщение.
  • Exchange — «сортировочный центр». Получает сообщение и решает, в какие очереди его положить.
  • Queue (очередь) — «склад». Хранит сообщения до тех пор, пока потребитель не заберёт их.
  • Consumer (потребитель) — забирает сообщения из очереди и обрабатывает.
  • Binding — правило, по которому exchange знает, в какую очередь направить сообщение.
  • Routing key — «адрес» на конверте, метка-строка, которую продьюсер ставит при отправке.
Producer
   │
   │ publish(exchange, routingKey, body)
   ▼
Exchange  ──── bindings ────► Queue A
                         └──► Queue B
                                   │
                                   ▼
                               Consumer

Ключевая идея: продьюсер не выбирает очередь — он указывает exchange и routing key. Куда именно попадёт сообщение, решают bindings.

Четыре типа exchange

Direct exchange

Сообщение попадает в очередь, если её binding key точно совпадает с routing key.

publish(exchange="orders", routingKey="payment-failed")

Bindings:
  "payment-failed"  → queue "alerts"
  "payment-failed"  → queue "audit-log"
  "order-created"   → queue "fulfillment"

Результат: сообщение попадёт в "alerts" и "audit-log".
           В "fulfillment" — нет.

Частный случай — exchange с именем "" (пустая строка, default direct). Если указать routing key = имя очереди, сообщение уйдёт прямо туда. Это самый простой способ отправить что-то в конкретную очередь.

Topic exchange

Routing key здесь — строка из слов через точку: order.created.eu, metric.cpu.high. В bindings можно использовать маски:

  • * — ровно одно слово
  • # — ноль или больше слов
publish(exchange="events", routingKey="order.created.eu")

Bindings:
  "order.created.*"   → queue "fulfillment-orders"   ✓
  "order.#"           → queue "audit-all-orders"      ✓
  "*.created.eu"      → queue "eu-monitoring"         ✓
  "metric.cpu.*"      → queue "alerts"                ✗

Topic exchange подходит, когда потребители хотят подписаться не на всё, а на определённое подмножество событий.

Fanout exchange

Routing key не учитывается совсем. Сообщение уходит во все очереди, привязанные к этому exchange.

publish(exchange="broadcast", body=...)

Bindings:
  → queue "service-a-cache"
  → queue "service-b-cache"
  → queue "service-c-cache"

Все три получат копию сообщения.

Классическая задача — инвалидация кеша или рассылка системного события всем сервисам одновременно.

Headers exchange

Маршрутизация по заголовкам сообщения, а не по routing key. В binding указывают набор пар ключ: значение и условие совпадения: x-match: all (все должны совпасть) или x-match: any (хотя бы один).

publish(exchange="files", headers={format: "pdf", priority: "high"})

Bindings:
  {format: "pdf", priority: "high", x-match: all}  → queue "vip-pdf"      ✓
  {format: "pdf", x-match: any}                    → queue "pdf-all"      ✓
  {priority: "high", x-match: any}                 → queue "high-prio"    ✓

На практике headers exchange используется редко — чаще ту же задачу решают через topic exchange со структурированным routing key.

Что такое очередь и как она живёт

Очередь хранит сообщения до тех пор, пока потребитель их не заберёт. При объявлении выбирают несколько параметров:

  • durable — выживает ли очередь при перезапуске брокера. Для бизнес-событий — true.
  • exclusive — доступна только одному соединению, удаляется при его разрыве. Удобно для временных RPC-ответов.
  • auto-delete — удаляется, когда последний потребитель отключается.

Каждое сообщение тоже бывает persistent (записывается на диск) или transient (только в памяти). Если брокер перезапустится, transient-сообщения пропадут.

Стандарт для важных данных: durable очередь + persistent сообщение.

Bindings и virtual hosts

Binding создаётся потребителем при запуске — он объявляет, какую очередь создать и к какому exchange привязать:

channel.queue_declare("payment-failed-alerts", durable=True)
channel.queue_bind(
    queue="payment-failed-alerts",
    exchange="orders",
    routing_key="payment-failed"
)
channel.basic_consume(queue="payment-failed-alerts", on_message=handler)

Virtual host (vhost) — это пространство имён: свои exchange, очереди и права доступа, изолированные от других vhost. Один брокер обычно держит несколько vhost: / (по умолчанию), /prod, /staging. Это удобный способ разделить окружения без запуска отдельных серверов.

Ack, nack, reject — как потребитель подтверждает обработку

Когда брокер отдаёт сообщение потребителю, оно ещё не удаляется из очереди — только «резервируется». Оно будет удалено только после подтверждения.

У потребителя три варианта:

  • basic.ack — обработано успешно, удалить из очереди.
  • basic.nack(requeue=true) — не обработано, вернуть в очередь (потребитель попробует снова).
  • basic.nack(requeue=false) или basic.reject(requeue=false) — не обработано, удалить (или отправить в Dead Letter Exchange, если настроен).

Если потребитель упал, не отправив ack, брокер видит обрыв соединения и автоматически возвращает сообщение в очередь. Это гарантия at-least-once: каждое сообщение будет обработано хотя бы один раз, даже если потребитель падал в процессе.

Автоподтверждение

Если использовать basic.consume(autoAck=true), брокер сразу считает сообщение доставленным — без ожидания ack. Если потребитель упадёт до конца обработки, сообщение будет потеряно. Подходит только для метрик и логов, где потеря допустима.

Prefetch — сколько сообщений отдавать за раз

По умолчанию брокер отправляет потребителю столько сообщений, сколько может. Если потребитель медленный — все сообщения скопятся у него в буфере, а другие потребители будут простаивать без работы.

basic.qos ограничивает количество неподтверждённых сообщений у одного потребителя:

basic.qos(prefetch_count=10)

После этого брокер не пришлёт 11-е сообщение, пока хотя бы одно из первых десяти не получит ack.

Практические ориентиры: 1–10 для медленных обработчиков (секунды на сообщение), 100–1000 для быстрых (миллисекунды). Без явной настройки поведение непредсказуемо.

Publisher confirms — гарантия на стороне отправителя

По умолчанию basic.publish возвращает управление немедленно, не дожидаясь, пока сообщение окажется в очереди. Если брокер упадёт в этот момент — сообщение пропадёт, а продьюсер об этом не узнает.

Publisher confirms — расширение, при котором брокер отправляет basic.ack только после надёжного сохранения сообщения:

channel.confirm_select()
channel.basic_publish(exchange="orders", routing_key="order.created", body=...)
# ждём ack от брокера перед продолжением

Для важных бизнес-событий publisher confirms нужно включать всегда. Для метрик и логов — по желанию.

TTL и ограничения на размер очереди

При объявлении очереди можно задать дополнительные параметры:

  • x-message-ttl — время жизни сообщения в очереди. Просроченное сообщение удаляется или уходит в Dead Letter Exchange.
  • x-expires — через сколько миллисекунд неиспользуемая очередь удалится сама. Удобно для временных очередей.
  • x-max-length / x-max-length-bytes — максимальное число сообщений или байт. При превышении старые сообщения вытесняются или отклоняются (зависит от стратегии x-overflow).

Практические случаи: не дать очереди вырасти до гигабайт, пока потребитель не работает; автоматически отбрасывать устаревшие команды (например, «обновить кеш» старше 30 секунд уже неактуален).

Что выбрать для каждого случая

ЗадачаТип exchange
Одно сообщение — один обработчик из пула"" (default direct), routing key = имя очереди
Событие → несколько конкретных очередейdirect
Иерархические события (order.*.eu, metric.cpu.#)topic
Один сигнал — все слышатfanout
Маршрутизация по нескольким атрибутамheaders (редко)

Коротко

  • Producer публикует в exchange, не в очередь напрямую. Exchange раскладывает по очередям по правилам bindings.
  • Direct — точное совпадение routing key. Topic — маски * и #. Fanout — всем без разбора. Headers — по заголовкам сообщения.
  • durable + persistent — стандарт для данных, которые нельзя терять.
  • Ack подтверждает успешную обработку. Без ack — сообщение вернётся в очередь при обрыве соединения. Это at-least-once.
  • Prefetch ограничивает, сколько неподтверждённых сообщений у потребителя. Без него — нагрузка распределится неравномерно.
  • Publisher confirms дают гарантию на стороне отправителя: брокер подтверждает сохранение перед тем, как вернуть управление.
  • TTL и max-length защищают от бесконтрольного роста очереди.

Что почитать дальше

  • RabbitMQ в production — кластеризация, Quorum Queues, мониторинг.
  • Spring AMQP — практический код на Java/Spring.
  • Messaging-паттерны через AMQP — work queue, RPC, pub/sub на практике.
  • AMQP vs Kafka — когда очереди, когда лог-модель.