← назад к разделу

AMQP (Advanced Message Queuing Protocol) — открытый протокол messaging-брокеров. Самая распространённая версия — AMQP 0.9.1 (RabbitMQ, ActiveMQ Classic, Qpid). Существует ещё AMQP 1.0, но это, по сути, другой протокол с другой моделью; в индустрии под «AMQP» обычно подразумевают 0.9.1, на нём и сфокусируемся.

Главное преимущество AMQP перед собственными протоколами брокеров — разделение маршрутизации и хранения. Продьюсер не знает, куда уйдёт сообщение; он публикует в exchange, тот по правилам bindings раскладывает по queues, откуда читают консьюмеры. Меняется маршрутизация — продьюсер не трогается.

Модель доставки

                 Producer
                    │
                    │ publish(exchange, routingKey, body)
                    ▼
              ┌──────────┐
              │ Exchange │
              └─────┬────┘
                    │ применить bindings
       ┌────────────┼────────────┐
       ▼            ▼            ▼
   ┌───────┐   ┌───────┐    ┌───────┐
   │ Queue │   │ Queue │    │ Queue │
   └───┬───┘   └───┬───┘    └───┬───┘
       │           │            │
       ▼           ▼            ▼
   Consumer    Consumer     Consumer

Producer не выбирает queue — выбирает exchange. Маршрутизация дальше — забота брокера.

Четыре типа exchange

Direct exchange

Сообщение уходит в queue, у которой binding key точно равен routing key публикации.

publish(exchange="orders", routingKey="payment-failed", body=...)

Bindings:
  payment-failed   → queue "alerts"
  payment-failed   → queue "audit-log"
  order-created    → queue "fulfillment"

Сообщение попадает в "alerts" и "audit-log", не в "fulfillment".

Когда брать: точечная маршрутизация одного типа события в несколько обработчиков. Самый частый случай для work-queue паттерна — exchange "" (default direct), routing key = имя queue.

Topic exchange

Маршрутизация по паттерну с подстановочными символами * (одно слово) и # (ноль или больше слов). Routing key — строка вида audit.order.created, metric.cpu.high.

publish(exchange="events", routingKey="order.created.eu", ...)

Bindings:
  order.created.*        → queue "fulfillment-orders"
  order.#                → queue "audit-all-orders"
  *.created.eu           → queue "eu-monitoring"
  metric.cpu.*           → queue "alerts" (не сработает)

Когда брать: иерархическая событийная модель, где консьюмерам нужно подписаться на подмножество событий по гибкому паттерну.

Fanout exchange

Сообщение уходит во все queues, привязанные к exchange, routing key игнорируется.

publish(exchange="broadcast", routingKey=ignored, ...)

Bindings:
  → queue "service-a-cache"
  → queue "service-b-cache"
  → queue "service-c-cache"

Все три queue получают копию.

Когда брать: классический publish/subscribe — инвалидация кеша, broadcast системного события всем сервисам, real-time-обновления.

Headers exchange

Маршрутизация по заголовкам сообщения, не по routing key. Bindings содержат пары {key: value} и совпадение x-match: all (все ключи) или x-match: any (хотя бы один).

publish(exchange="files", headers={format: "pdf", priority: "high"}, ...)

Bindings:
  {format: "pdf", priority: "high", x-match: all}  → queue "vip-pdf-processor"
  {format: "pdf", x-match: any}                    → queue "pdf-all"
  {priority: "high", x-match: any}                 → queue "high-priority"

Когда брать: маршрутизация по нескольким измерениям, которые не сводятся к одной строке routing key. На практике используется редко — обычно выручают topic exchange + структурированный routing key.

Queues — где сообщения ждут

Queue — это то, что хранит сообщения до доставки потребителю. Главные свойства:

  • durable — выживает рестарт брокера (для долгоживущих очередей бизнес-событий — true).
  • exclusive — доступна только одному соединению, удаляется при его закрытии (для временных RPC-ответных queue).
  • auto-delete — удаляется, когда отвалится последний подписчик.

Каждое сообщение тоже бывает persistent (delivery-mode = 2) или transient (delivery-mode = 1). Persistent сообщение в durable queue фактически сохраняется на диск; transient остаётся в памяти и теряется при рестарте.

Combo durable queue + persistent message — стандарт для бизнес-событий. auto-delete + transient — для RPC-ответных queue.

Bindings и virtual hosts

Binding — правило соединения exchange и queue. Создаётся обычно консьюмером при старте (но может и продьюсером).

# pseudocode
channel.queue_declare("payment-failed-alerts", durable=True)
channel.queue_bind(
    queue="payment-failed-alerts",
    exchange="orders",
    routing_key="payment-failed"
)
channel.basic_consume(queue="payment-failed-alerts", on_message=...)

Virtual host (vhost) — пространство имён для exchange/queue/bindings + разрешений. Один брокер обычно держит несколько vhost: / (default), /prod, /staging, /team-billing. Это и логическое разделение, и граница безопасности.

Ack/nack/reject — подтверждение обработки

Когда брокер отдал сообщение консьюмеру, оно перестаёт быть доступно другим, но остаётся в queue до подтверждения.

Три варианта ответа консьюмера:

  • basic.ack — обработано успешно, удалить из queue.
  • basic.nack(requeue=true) — не обработано, вернуть в queue (повтор сразу же).
  • basic.nack(requeue=false) или basic.reject(requeue=false) — не обработано, удалить из queue (или отправить в DLX, если настроен).

Если консьюмер падает до ack, брокер видит обрыв соединения и автоматически возвращает сообщение в queue (requeue=true по умолчанию). Гарантия: at-least-once delivery.

auto-ack

Альтернатива — basic.consume(autoAck=true). Брокер считает сообщение доставленным сразу после отправки, не ждёт ack. Опасно: при падении консьюмера прямо во время обработки сообщение теряется.

Использовать auto-ack только для метрик и логов, где потеря приемлема.

Prefetch (basic.qos) — сколько сообщений выдавать без ack

По умолчанию брокер шлёт консьюмеру столько сообщений, сколько может, и они копятся в локальном буфере. Это убивает справедливость: первый медленный потребитель забирает всю очередь, остальные ждут.

basic.qos(prefetch_count=10)

После этого консьюмер получает максимум 10 unacked сообщений. Брокер не отправит 11-е, пока какое-нибудь из 10 не получит ack.

Правило: prefetch = concurrency × среднее_время_обработки_мс / 100. Грубо: 1–10 для медленных обработчиков (несколько секунд на сообщение), 100–1000 для быстрых (миллисекунды). По умолчанию во всех клиентах prefetch = 1 или 250 — без явной настройки производительность непредсказуема.

Publisher confirms — гарантия публикации

По умолчанию basic.publish возвращает успех немедленно, до того как сообщение попало в queue. Если брокер упал между приёмом сообщения и записью на диск — сообщение потеряно, продьюсер об этом не узнает.

Publisher confirms (RabbitMQ-расширение к AMQP) включают режим, где брокер шлёт basic.ack после того как сообщение надёжно сохранено:

channel.confirm_select()
channel.basic_publish(exchange="orders", routing_key="order.created", body=...)
# здесь блокирующее ожидание ack от брокера, либо timeout/callback

Включать confirms для критичных бизнес-событий обязательно. Для метрик и логов — не обязательно (потеря допустима).

Mandatory / immediate

Флаги при публикации, влияющие на поведение, когда сообщение некуда отправить:

  • mandatory — если ни один binding не привёл к queue, брокер возвращает сообщение продьюсеру через basic.return. Иначе сообщение тихо удаляется.
  • immediate — если в queue нет активных консьюмеров прямо сейчас, сообщение возвращается. RabbitMQ удалил immediate в 3.0 (2012) — нет, не используем.

Mandatory полезен для отлова misconfiguration: продьюсер публикует в exchange, к которому никто не привязал нужную queue, и видит сразу basic.return вместо тихой потери.

TTL, max-length, expiry

Queue-уровневые параметры (объявляются через queue.declare arguments):

  • x-message-ttl — время жизни сообщения в очереди. После — отправляется в DLX (если настроен) или удаляется.
  • x-expires — время простоя queue, после которого она удаляется (полезно для временных queue).
  • x-max-length / x-max-length-bytes — максимальный размер. При превышении — старые сообщения удаляются или отправляются в DLX (стратегия x-overflow: drop-head / reject-publish / reject-publish-dlx).

Полезно для:

  • ограничения «pile-up» — если консьюмер упал на сутки, не дать queue вырасти в гигабайты;
  • автоудаления неактуальных команд (например, «обновить кэш» старше 30 секунд бесполезен).

Универсальность AMQP

AMQP 0.9.1 поддерживают:

  • RabbitMQ — самый зрелый, флагман.
  • Apache Qpid — реализация от Apache, поддерживает AMQP 1.0 и 0.9.1.
  • Apache ActiveMQ Classic — поддерживает множество протоколов, включая AMQP 0.9.1.

Из практики: AMQP 0.9.1 + RabbitMQ — стандартный стек для очередей задач и pub/sub. Если код написан против чистого AMQP (через com.rabbitmq:amqp-client или Spring AMQP) и не использует RabbitMQ-расширения (federation, plugins) — он портируется.

RabbitMQ-расширения, которыми обычно пользуются: publisher confirms, consumer cancel notify, basic.nack, Dead Letter Exchange через arguments, priority queues. Большинство из них работают и в Qpid с небольшими отличиями.

Когда что выбирать

СценарийТип exchange
Worker queue: одно сообщение — один обработчик из пула"" (default direct), routing key = имя queue
Точечная маршрутизация события одного типа в несколько обработчиковdirect
Иерархические события (order.*.eu, metric.cpu.#)topic
Broadcast: один сигнал — все слышатfanout
Многомерная маршрутизация по нескольким атрибутамheaders (редко; обычно сводится к topic)

Что почитать дальше

  • RabbitMQ в production — операционные аспекты конкретной реализации.
  • Spring AMQP — практический код на Java/Spring.
  • Messaging-паттерны через AMQP — как протокол ложится на классические паттерны (work queue, RPC, pub/sub).
  • AMQP vs Kafka — когда AMQP, когда лог-модель.