Master RabbitMQ: From Basics to Advanced RPC and Exchange Patterns
This guide explains what a message queue is, why it’s used in asynchronous architectures, walks through installing RabbitMQ on Linux and macOS, and demonstrates core patterns such as simple queues, work queues, publish/subscribe, routing, topics, and RPC with complete Python code examples.
What Is a Message Queue
A message is data transferred between applications; it can be a simple text string or a complex object. A Message Queue (MQ) is an inter‑application communication mechanism where the sender publishes a message to the queue and the consumer retrieves it, without either party needing to know about the other.
Why Use a Message Queue
Message queues enable asynchronous collaboration. In an order‑processing system, actions such as inventory deduction, document generation, red‑packet distribution, and SMS notification can be split: the critical path (inventory, document) completes quickly, while non‑critical tasks are sent as messages to an MQ for later processing, improving performance and scalability.
RabbitMQ Introduction
RabbitMQ is an open‑source AMQP implementation written in Erlang. It provides reliable message delivery, high availability, and language‑agnostic communication, allowing producers and consumers to be decoupled.
RabbitMQ Installation
Linux
# Install EPEL repository
$ rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm
# Install Erlang
$ yum -y install erlang
# Install RabbitMQ
$ yum -y install rabbitmq-server
# Start/stop the service
service rabbitmq-server start/stopmacOS
# brew install rabbitmq
# export PATH=$PATH:/usr/local/sbin
# rabbitmq-serverRabbitMQ Work Model – Simple Queue
Producer example:
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print("[x] Sent 'Hello World!'")
connection.close()Consumer example:
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print("[x] Received %r" % body)
# acknowledge after processing
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(callback, queue='hello', no_ack=False)
print('[*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()Related Parameters
no_ack = False – if a consumer crashes, RabbitMQ will re‑queue the message.
In the callback, acknowledge with ch.basic_ack(delivery_tag=method.delivery_tag) Set no_ack=False in basic_consume to enable acknowledgments
Message Ordering and Prefetch
By default, messages are delivered in FIFO order. Using channel.basic_qos(prefetch_count=1) distributes messages to whichever consumer is ready, rather than strict alternating order.
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print("[x] Received %r" % body)
import time; time.sleep(10)
print('ok')
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue='hello', no_ack=False)
print('[*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()Exchange Models
Publish/Subscribe
In the publish/subscribe model, a message is delivered to all bound queues, so every subscriber receives a copy.
# Producer
#!/usr/bin/env python
import pika, sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
message = ' '.join(sys.argv[1:]) or "info: Hello World!"
channel.basic_publish(exchange='logs', routing_key='', body=message)
print("[x] Sent %r" % message)
connection.close()
# Consumer
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='logs', queue=queue_name)
print('[*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print("[x] %r" % body)
channel.basic_consume(callback, queue=queue_name, no_ack=True)
channel.start_consuming()Direct (Routing) Exchange
Messages are sent to queues whose binding key matches the routing key specified by the producer.
# Producer
#!/usr/bin/env python
import pika, sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
severities = sys.argv[1:]
if not severities:
sys.stderr.write('Usage: %s [info] [warning] [error]
' % sys.argv[0])
sys.exit(1)
for severity in severities:
channel.queue_bind(exchange='direct_logs', queue='my_queue', routing_key=severity)
print('[*] Sending logs')
# (publishing omitted for brevity)Topic Exchange (Wildcard Matching)
Queues can bind with patterns using * (match one word) and # (match zero or more words). The exchange routes messages based on these patterns.
# Producer
#!/usr/bin/env python
import pika, sys
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
binding_keys = sys.argv[1:]
if not binding_keys:
sys.stderr.write('Usage: %s [binding_key]...
' % sys.argv[0])
sys.exit(1)
for binding_key in binding_keys:
channel.queue_bind(exchange='topic_logs', queue='my_queue', routing_key=binding_key)
print('[*] Waiting for logs')
# (publishing omitted for brevity)RabbitMQ RPC
Callback Queue and Correlation ID
A client sends a request to an RPC queue and includes a reply_to property (the callback queue) and a unique correlation_id. The server processes the request, then publishes the response to the callback queue, preserving the same correlation_id so the client can match the reply to the original request.
Server Implementation
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='rpc_queue')
def fib(n):
if n == 0:
return 0
elif n == 1:
return 1
else:
return fib(n-1) + fib(n-2)
def on_request(ch, method, props, body):
n = int(body)
print("[.] fib(%s)" % n)
response = fib(n)
ch.basic_publish(exchange='', routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id=props.correlation_id),
body=str(response))
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(on_request, queue='rpc_queue')
print('[x] Awaiting RPC requests')
channel.start_consuming()Client Implementation
#!/usr/bin/env python
import pika, uuid
class FibonacciRpcClient(object):
def __init__(self):
self.connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
self.channel = self.connection.channel()
result = self.channel.queue_declare(exclusive=True)
self.callback_queue = result.method.queue
self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue)
def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body
def call(self, n):
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(exchange='', routing_key='rpc_queue',
properties=pika.BasicProperties(reply_to=self.callback_queue,
correlation_id=self.corr_id),
body=str(n))
while self.response is None:
self.connection.process_data_events()
return int(self.response)
fibonacci_rpc = FibonacciRpcClient()
print("[x] Requesting fib(30)")
response = fibonacci_rpc.call(30)
print("[.] Got %r" % response)Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
MaGe Linux Operations
Founded in 2009, MaGe Education is a top Chinese high‑end IT training brand. Its graduates earn 12K+ RMB salaries, and the school has trained tens of thousands of students. It offers high‑pay courses in Linux cloud operations, Python full‑stack, automation, data analysis, AI, and Go high‑concurrency architecture. Thanks to quality courses and a solid reputation, it has talent partnerships with numerous internet firms.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
