Master RabbitMQ with Python: From Basics to Advanced Patterns
This tutorial explains RabbitMQ fundamentals, shows how to install the Pika client, provides complete producer and consumer Python examples, and covers advanced topics such as work queues, message acknowledgments, persistence, fair dispatch, exchanges, routing, and RPC usage.
Introduction
RabbitMQ acts like a post office: it receives messages and forwards them. Producers and consumers interact via queues, which can be considered unlimited in size; multiple producers can send to a queue and multiple consumers can receive from it.
Code
The protocol used by RabbitMQ is AMQP, and the recommended Python client is pika.
pip install pika -i https://pypi.douban.com/simple/Producer (send.py)
import pika # establish a connection
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) # connect to local RabbitMQ server
channel = connection.channel() # get a channelBy default it connects to the local machine; to connect to a remote server replace the host name. Before publishing a message, ensure the target queue exists, otherwise RabbitMQ will discard the message.
channel.queue_declare(queue='hello') # create a queue named hello
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!') # send message
connection.close() # close connectionRabbitMQ requires at least 1 GB of free disk space; otherwise publishing fails.
After sending, you can verify the queue content with rabbitmqctl list_queues, which shows hello 1, indicating one message in the hello queue.
Consumer (receive.py)
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()Again, connect to the server first.
channel.queue_declare(queue='hello') # ensure the queue exists
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
channel.basic_consume(callback,
queue='hello',
no_ack=True) # auto‑acknowledge
channel.start_consuming() # start receiving loopWork Queues (Task Queues)
Work queues distribute time‑consuming tasks to multiple worker processes. Instead of performing heavy work inside an HTTP request, tasks are sent as messages to a queue and processed by separate workers.
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode = 2, # make message persistent
)Messages are dispatched in round‑robin order, giving each worker a similar number of tasks.
Message Acknowledgement
If a worker crashes before completing a task, the message could be lost because RabbitMQ removes the message as soon as it is delivered. To prevent loss, workers send an ack after processing; if the ack is not received, RabbitMQ re‑queues the message for another worker.
By default, ack is enabled. The earlier example used no_ack=True, which disables ack.
channel.basic_consume(callback, queue='hello') # ack enabledExample callback with manual ack:
def callback(ch, method, properties, body):
print(" [x] Received %r" % (body,))
time.sleep(body.count('.'))
print(" [x] Done")
ch.basic_ack(delivery_tag = method.delivery_tag) # send ackMessage Persistence
To survive RabbitMQ restarts, declare queues as durable and publish messages with delivery_mode = 2.
channel.queue_declare(queue='task_queue', durable=True) channel.basic_publish(exchange='',
routing_key="task_queue",
body=message,
properties=pika.BasicProperties(
delivery_mode = 2, # make message persistent
))Even with persistence, a message can be lost if RabbitMQ crashes before writing it to disk. For stronger guarantees, use publisher confirms.
Fair Dispatch
Round‑robin dispatch may be unfair if some tasks are heavier. Workers can request only one unacknowledged message at a time: channel.basic_qos(prefetch_count=1) This tells RabbitMQ not to send new messages to a worker that has not yet acked the previous one.
Broadcast (Fanout Exchange)
To send a message to multiple workers, use a fanout exchange.
channel.exchange_declare(exchange='logs', type='fanout')Temporary Queues
result = channel.queue_declare()
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queueBinding Exchanges and Queues
channel.queue_bind(exchange='logs', queue='hello')Now messages published to the logs exchange are also delivered to the hello queue.
Routing
Bindings can specify a routing_key to filter messages.
Direct Exchange
channel.exchange_declare(exchange='direct_logs', type='direct') channel.basic_publish(exchange='direct_logs',
routing_key=severity,
body=message)Topic Exchange
Topic exchanges use dot‑separated routing keys with * (single word) and # (zero or more words) wildcards.
* matches one word
# matches zero or more wordsRPC
Remote Procedure Call using RabbitMQ involves a client that creates a temporary callback queue, sends a request with reply_to and a unique correlation_id, then waits for the response.
self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
self.channel = self.connection.channel()
result = self.channel.queue_declare(exclusive=True)
self.callback_queue = result.method.queue
self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue)Client publishes the request:
channel.basic_publish(exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to=self.callback_queue,
correlation_id=self.corr_id,
),
body=str(n))
while self.response is None:
self.connection.process_data_events()
return int(self.response)Server consumes from rpc_queue, processes the request, and sends the reply back to props.reply_to with the same correlation_id, then acknowledges the request.
channel.basic_consume(on_request, queue='rpc_queue')
# after processing:
ch.basic_publish(exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id=props.correlation_id),
body=str(response))
ch.basic_ack(delivery_tag=method.delivery_tag)Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
MaGe Linux Operations
Founded in 2009, MaGe Education is a top Chinese high‑end IT training brand. Its graduates earn 12K+ RMB salaries, and the school has trained tens of thousands of students. It offers high‑pay courses in Linux cloud operations, Python full‑stack, automation, data analysis, AI, and Go high‑concurrency architecture. Thanks to quality courses and a solid reputation, it has talent partnerships with numerous internet firms.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
