AMQP (Advanced Message Queuing Protocol) — открытый протокол messaging-брокеров. Самая распространённая версия — AMQP 0.9.1 (RabbitMQ, ActiveMQ Classic, Qpid). Существует ещё AMQP 1.0, но это, по сути, другой протокол с другой моделью; в индустрии под «AMQP» обычно подразумевают 0.9.1, на нём и сфокусируемся.
Главное преимущество AMQP перед собственными протоколами брокеров — разделение маршрутизации и хранения. Продьюсер не знает, куда уйдёт сообщение; он публикует в exchange, тот по правилам bindings раскладывает по queues, откуда читают консьюмеры. Меняется маршрутизация — продьюсер не трогается.
Модель доставки
Producer
│
│ publish(exchange, routingKey, body)
▼
┌──────────┐
│ Exchange │
└─────┬────┘
│ применить bindings
┌────────────┼────────────┐
▼ ▼ ▼
┌───────┐ ┌───────┐ ┌───────┐
│ Queue │ │ Queue │ │ Queue │
└───┬───┘ └───┬───┘ └───┬───┘
│ │ │
▼ ▼ ▼
Consumer Consumer Consumer
Producer не выбирает queue — выбирает exchange. Маршрутизация дальше — забота брокера.
Четыре типа exchange
Direct exchange
Сообщение уходит в queue, у которой binding key точно равен routing key публикации.
publish(exchange="orders", routingKey="payment-failed", body=...)
Bindings:
payment-failed → queue "alerts"
payment-failed → queue "audit-log"
order-created → queue "fulfillment"
Сообщение попадает в "alerts" и "audit-log", не в "fulfillment".
Когда брать: точечная маршрутизация одного типа события в несколько обработчиков. Самый частый случай для work-queue паттерна — exchange "" (default direct), routing key = имя queue.
Topic exchange
Маршрутизация по паттерну с подстановочными символами * (одно слово) и # (ноль или больше слов). Routing key — строка вида audit.order.created, metric.cpu.high.
publish(exchange="events", routingKey="order.created.eu", ...)
Bindings:
order.created.* → queue "fulfillment-orders"
order.# → queue "audit-all-orders"
*.created.eu → queue "eu-monitoring"
metric.cpu.* → queue "alerts" (не сработает)
Когда брать: иерархическая событийная модель, где консьюмерам нужно подписаться на подмножество событий по гибкому паттерну.
Fanout exchange
Сообщение уходит во все queues, привязанные к exchange, routing key игнорируется.
publish(exchange="broadcast", routingKey=ignored, ...)
Bindings:
→ queue "service-a-cache"
→ queue "service-b-cache"
→ queue "service-c-cache"
Все три queue получают копию.
Когда брать: классический publish/subscribe — инвалидация кеша, broadcast системного события всем сервисам, real-time-обновления.
Headers exchange
Маршрутизация по заголовкам сообщения, не по routing key. Bindings содержат пары {key: value} и совпадение x-match: all (все ключи) или x-match: any (хотя бы один).
publish(exchange="files", headers={format: "pdf", priority: "high"}, ...)
Bindings:
{format: "pdf", priority: "high", x-match: all} → queue "vip-pdf-processor"
{format: "pdf", x-match: any} → queue "pdf-all"
{priority: "high", x-match: any} → queue "high-priority"
Когда брать: маршрутизация по нескольким измерениям, которые не сводятся к одной строке routing key. На практике используется редко — обычно выручают topic exchange + структурированный routing key.
Queues — где сообщения ждут
Queue — это то, что хранит сообщения до доставки потребителю. Главные свойства:
- durable — выживает рестарт брокера (для долгоживущих очередей бизнес-событий —
true). - exclusive — доступна только одному соединению, удаляется при его закрытии (для временных RPC-ответных queue).
- auto-delete — удаляется, когда отвалится последний подписчик.
Каждое сообщение тоже бывает persistent (delivery-mode = 2) или transient (delivery-mode = 1). Persistent сообщение в durable queue фактически сохраняется на диск; transient остаётся в памяти и теряется при рестарте.
Combo durable queue + persistent message — стандарт для бизнес-событий. auto-delete + transient — для RPC-ответных queue.
Bindings и virtual hosts
Binding — правило соединения exchange и queue. Создаётся обычно консьюмером при старте (но может и продьюсером).
# pseudocode
channel.queue_declare("payment-failed-alerts", durable=True)
channel.queue_bind(
queue="payment-failed-alerts",
exchange="orders",
routing_key="payment-failed"
)
channel.basic_consume(queue="payment-failed-alerts", on_message=...)
Virtual host (vhost) — пространство имён для exchange/queue/bindings + разрешений. Один брокер обычно держит несколько vhost: / (default), /prod, /staging, /team-billing. Это и логическое разделение, и граница безопасности.
Ack/nack/reject — подтверждение обработки
Когда брокер отдал сообщение консьюмеру, оно перестаёт быть доступно другим, но остаётся в queue до подтверждения.
Три варианта ответа консьюмера:
basic.ack— обработано успешно, удалить из queue.basic.nack(requeue=true)— не обработано, вернуть в queue (повтор сразу же).basic.nack(requeue=false)илиbasic.reject(requeue=false)— не обработано, удалить из queue (или отправить в DLX, если настроен).
Если консьюмер падает до ack, брокер видит обрыв соединения и автоматически возвращает сообщение в queue (requeue=true по умолчанию). Гарантия: at-least-once delivery.
auto-ack
Альтернатива — basic.consume(autoAck=true). Брокер считает сообщение доставленным сразу после отправки, не ждёт ack. Опасно: при падении консьюмера прямо во время обработки сообщение теряется.
Использовать auto-ack только для метрик и логов, где потеря приемлема.
Prefetch (basic.qos) — сколько сообщений выдавать без ack
По умолчанию брокер шлёт консьюмеру столько сообщений, сколько может, и они копятся в локальном буфере. Это убивает справедливость: первый медленный потребитель забирает всю очередь, остальные ждут.
basic.qos(prefetch_count=10)
После этого консьюмер получает максимум 10 unacked сообщений. Брокер не отправит 11-е, пока какое-нибудь из 10 не получит ack.
Правило: prefetch = concurrency × среднее_время_обработки_мс / 100. Грубо: 1–10 для медленных обработчиков (несколько секунд на сообщение), 100–1000 для быстрых (миллисекунды). По умолчанию во всех клиентах prefetch = 1 или 250 — без явной настройки производительность непредсказуема.
Publisher confirms — гарантия публикации
По умолчанию basic.publish возвращает успех немедленно, до того как сообщение попало в queue. Если брокер упал между приёмом сообщения и записью на диск — сообщение потеряно, продьюсер об этом не узнает.
Publisher confirms (RabbitMQ-расширение к AMQP) включают режим, где брокер шлёт basic.ack после того как сообщение надёжно сохранено:
channel.confirm_select()
channel.basic_publish(exchange="orders", routing_key="order.created", body=...)
# здесь блокирующее ожидание ack от брокера, либо timeout/callback
Включать confirms для критичных бизнес-событий обязательно. Для метрик и логов — не обязательно (потеря допустима).
Mandatory / immediate
Флаги при публикации, влияющие на поведение, когда сообщение некуда отправить:
- mandatory — если ни один binding не привёл к queue, брокер возвращает сообщение продьюсеру через
basic.return. Иначе сообщение тихо удаляется. - immediate — если в queue нет активных консьюмеров прямо сейчас, сообщение возвращается. RabbitMQ удалил
immediateв 3.0 (2012) — нет, не используем.
Mandatory полезен для отлова misconfiguration: продьюсер публикует в exchange, к которому никто не привязал нужную queue, и видит сразу basic.return вместо тихой потери.
TTL, max-length, expiry
Queue-уровневые параметры (объявляются через queue.declare arguments):
x-message-ttl— время жизни сообщения в очереди. После — отправляется в DLX (если настроен) или удаляется.x-expires— время простоя queue, после которого она удаляется (полезно для временных queue).x-max-length/x-max-length-bytes— максимальный размер. При превышении — старые сообщения удаляются или отправляются в DLX (стратегияx-overflow:drop-head/reject-publish/reject-publish-dlx).
Полезно для:
- ограничения «pile-up» — если консьюмер упал на сутки, не дать queue вырасти в гигабайты;
- автоудаления неактуальных команд (например, «обновить кэш» старше 30 секунд бесполезен).
Универсальность AMQP
AMQP 0.9.1 поддерживают:
- RabbitMQ — самый зрелый, флагман.
- Apache Qpid — реализация от Apache, поддерживает AMQP 1.0 и 0.9.1.
- Apache ActiveMQ Classic — поддерживает множество протоколов, включая AMQP 0.9.1.
Из практики: AMQP 0.9.1 + RabbitMQ — стандартный стек для очередей задач и pub/sub. Если код написан против чистого AMQP (через com.rabbitmq:amqp-client или Spring AMQP) и не использует RabbitMQ-расширения (federation, plugins) — он портируется.
RabbitMQ-расширения, которыми обычно пользуются: publisher confirms, consumer cancel notify, basic.nack, Dead Letter Exchange через arguments, priority queues. Большинство из них работают и в Qpid с небольшими отличиями.
Когда что выбирать
| Сценарий | Тип exchange |
|---|---|
| Worker queue: одно сообщение — один обработчик из пула | "" (default direct), routing key = имя queue |
| Точечная маршрутизация события одного типа в несколько обработчиков | direct |
Иерархические события (order.*.eu, metric.cpu.#) | topic |
| Broadcast: один сигнал — все слышат | fanout |
| Многомерная маршрутизация по нескольким атрибутам | headers (редко; обычно сводится к topic) |
Что почитать дальше
- RabbitMQ в production — операционные аспекты конкретной реализации.
- Spring AMQP — практический код на Java/Spring.
- Messaging-паттерны через AMQP — как протокол ложится на классические паттерны (work queue, RPC, pub/sub).
- AMQP vs Kafka — когда AMQP, когда лог-модель.