Когда два сервиса хотят обменяться данными, простейшее решение — вызов напрямую: один делает HTTP-запрос к другому. Это работает, пока второй сервис всегда доступен, успевает принять все запросы и не падает под нагрузкой.
Как только один из этих пунктов перестаёт выполняться, появляется брокер сообщений. Первый сервис кладёт сообщение в брокер и занимается своими делами; второй забирает его, когда готов. Но чтобы брокер знал, куда класть каждое сообщение и кому его отдавать, нужен протокол — набор правил маршрутизации.
AMQP 0.9.1 (Advanced Message Queuing Protocol) — такой протокол. Его используют RabbitMQ, Apache Qpid и ActiveMQ Classic. Именно AMQP 0.9.1 обычно имеют в виду, когда говорят «AMQP» без уточнения версии.
Как сообщение попадает от отправителя к получателю
В обычной почте вы пишете адрес на конверте, а сортировочный центр решает, на какой склад его везти. В AMQP устроено похоже:
- Producer (продьюсер) — отправляет сообщение.
- Exchange — «сортировочный центр». Получает сообщение и решает, в какие очереди его положить.
- Queue (очередь) — «склад». Хранит сообщения до тех пор, пока потребитель не заберёт их.
- Consumer (потребитель) — забирает сообщения из очереди и обрабатывает.
- Binding — правило, по которому exchange знает, в какую очередь направить сообщение.
- Routing key — «адрес» на конверте, метка-строка, которую продьюсер ставит при отправке.
Producer
│
│ publish(exchange, routingKey, body)
▼
Exchange ──── bindings ────► Queue A
└──► Queue B
│
▼
Consumer
Ключевая идея: продьюсер не выбирает очередь — он указывает exchange и routing key. Куда именно попадёт сообщение, решают bindings.
Четыре типа exchange
Direct exchange
Сообщение попадает в очередь, если её binding key точно совпадает с routing key.
publish(exchange="orders", routingKey="payment-failed")
Bindings:
"payment-failed" → queue "alerts"
"payment-failed" → queue "audit-log"
"order-created" → queue "fulfillment"
Результат: сообщение попадёт в "alerts" и "audit-log".
В "fulfillment" — нет.
Частный случай — exchange с именем "" (пустая строка, default direct). Если указать routing key = имя очереди, сообщение уйдёт прямо туда. Это самый простой способ отправить что-то в конкретную очередь.
Topic exchange
Routing key здесь — строка из слов через точку: order.created.eu, metric.cpu.high. В bindings можно использовать маски:
*— ровно одно слово#— ноль или больше слов
publish(exchange="events", routingKey="order.created.eu")
Bindings:
"order.created.*" → queue "fulfillment-orders" ✓
"order.#" → queue "audit-all-orders" ✓
"*.created.eu" → queue "eu-monitoring" ✓
"metric.cpu.*" → queue "alerts" ✗
Topic exchange подходит, когда потребители хотят подписаться не на всё, а на определённое подмножество событий.
Fanout exchange
Routing key не учитывается совсем. Сообщение уходит во все очереди, привязанные к этому exchange.
publish(exchange="broadcast", body=...)
Bindings:
→ queue "service-a-cache"
→ queue "service-b-cache"
→ queue "service-c-cache"
Все три получат копию сообщения.
Классическая задача — инвалидация кеша или рассылка системного события всем сервисам одновременно.
Headers exchange
Маршрутизация по заголовкам сообщения, а не по routing key. В binding указывают набор пар ключ: значение и условие совпадения: x-match: all (все должны совпасть) или x-match: any (хотя бы один).
publish(exchange="files", headers={format: "pdf", priority: "high"})
Bindings:
{format: "pdf", priority: "high", x-match: all} → queue "vip-pdf" ✓
{format: "pdf", x-match: any} → queue "pdf-all" ✓
{priority: "high", x-match: any} → queue "high-prio" ✓
На практике headers exchange используется редко — чаще ту же задачу решают через topic exchange со структурированным routing key.
Что такое очередь и как она живёт
Очередь хранит сообщения до тех пор, пока потребитель их не заберёт. При объявлении выбирают несколько параметров:
- durable — выживает ли очередь при перезапуске брокера. Для бизнес-событий —
true. - exclusive — доступна только одному соединению, удаляется при его разрыве. Удобно для временных RPC-ответов.
- auto-delete — удаляется, когда последний потребитель отключается.
Каждое сообщение тоже бывает persistent (записывается на диск) или transient (только в памяти). Если брокер перезапустится, transient-сообщения пропадут.
Стандарт для важных данных: durable очередь + persistent сообщение.
Bindings и virtual hosts
Binding создаётся потребителем при запуске — он объявляет, какую очередь создать и к какому exchange привязать:
channel.queue_declare("payment-failed-alerts", durable=True)
channel.queue_bind(
queue="payment-failed-alerts",
exchange="orders",
routing_key="payment-failed"
)
channel.basic_consume(queue="payment-failed-alerts", on_message=handler)
Virtual host (vhost) — это пространство имён: свои exchange, очереди и права доступа, изолированные от других vhost. Один брокер обычно держит несколько vhost: / (по умолчанию), /prod, /staging. Это удобный способ разделить окружения без запуска отдельных серверов.
Ack, nack, reject — как потребитель подтверждает обработку
Когда брокер отдаёт сообщение потребителю, оно ещё не удаляется из очереди — только «резервируется». Оно будет удалено только после подтверждения.
У потребителя три варианта:
basic.ack— обработано успешно, удалить из очереди.basic.nack(requeue=true)— не обработано, вернуть в очередь (потребитель попробует снова).basic.nack(requeue=false)илиbasic.reject(requeue=false)— не обработано, удалить (или отправить в Dead Letter Exchange, если настроен).
Если потребитель упал, не отправив ack, брокер видит обрыв соединения и автоматически возвращает сообщение в очередь. Это гарантия at-least-once: каждое сообщение будет обработано хотя бы один раз, даже если потребитель падал в процессе.
Автоподтверждение
Если использовать basic.consume(autoAck=true), брокер сразу считает сообщение доставленным — без ожидания ack. Если потребитель упадёт до конца обработки, сообщение будет потеряно. Подходит только для метрик и логов, где потеря допустима.
Prefetch — сколько сообщений отдавать за раз
По умолчанию брокер отправляет потребителю столько сообщений, сколько может. Если потребитель медленный — все сообщения скопятся у него в буфере, а другие потребители будут простаивать без работы.
basic.qos ограничивает количество неподтверждённых сообщений у одного потребителя:
basic.qos(prefetch_count=10)
После этого брокер не пришлёт 11-е сообщение, пока хотя бы одно из первых десяти не получит ack.
Практические ориентиры: 1–10 для медленных обработчиков (секунды на сообщение), 100–1000 для быстрых (миллисекунды). Без явной настройки поведение непредсказуемо.
Publisher confirms — гарантия на стороне отправителя
По умолчанию basic.publish возвращает управление немедленно, не дожидаясь, пока сообщение окажется в очереди. Если брокер упадёт в этот момент — сообщение пропадёт, а продьюсер об этом не узнает.
Publisher confirms — расширение, при котором брокер отправляет basic.ack только после надёжного сохранения сообщения:
channel.confirm_select()
channel.basic_publish(exchange="orders", routing_key="order.created", body=...)
# ждём ack от брокера перед продолжением
Для важных бизнес-событий publisher confirms нужно включать всегда. Для метрик и логов — по желанию.
TTL и ограничения на размер очереди
При объявлении очереди можно задать дополнительные параметры:
x-message-ttl— время жизни сообщения в очереди. Просроченное сообщение удаляется или уходит в Dead Letter Exchange.x-expires— через сколько миллисекунд неиспользуемая очередь удалится сама. Удобно для временных очередей.x-max-length/x-max-length-bytes— максимальное число сообщений или байт. При превышении старые сообщения вытесняются или отклоняются (зависит от стратегииx-overflow).
Практические случаи: не дать очереди вырасти до гигабайт, пока потребитель не работает; автоматически отбрасывать устаревшие команды (например, «обновить кеш» старше 30 секунд уже неактуален).
Что выбрать для каждого случая
| Задача | Тип exchange |
|---|---|
| Одно сообщение — один обработчик из пула | "" (default direct), routing key = имя очереди |
| Событие → несколько конкретных очередей | direct |
Иерархические события (order.*.eu, metric.cpu.#) | topic |
| Один сигнал — все слышат | fanout |
| Маршрутизация по нескольким атрибутам | headers (редко) |
Коротко
- Producer публикует в exchange, не в очередь напрямую. Exchange раскладывает по очередям по правилам bindings.
- Direct — точное совпадение routing key. Topic — маски
*и#. Fanout — всем без разбора. Headers — по заголовкам сообщения. - durable + persistent — стандарт для данных, которые нельзя терять.
- Ack подтверждает успешную обработку. Без ack — сообщение вернётся в очередь при обрыве соединения. Это at-least-once.
- Prefetch ограничивает, сколько неподтверждённых сообщений у потребителя. Без него — нагрузка распределится неравномерно.
- Publisher confirms дают гарантию на стороне отправителя: брокер подтверждает сохранение перед тем, как вернуть управление.
- TTL и max-length защищают от бесконтрольного роста очереди.
Что почитать дальше
- RabbitMQ в production — кластеризация, Quorum Queues, мониторинг.
- Spring AMQP — практический код на Java/Spring.
- Messaging-паттерны через AMQP — work queue, RPC, pub/sub на практике.
- AMQP vs Kafka — когда очереди, когда лог-модель.