Master RabbitMQ with Python: From Basics to Advanced Patterns

This tutorial explains RabbitMQ fundamentals, shows how to install the Pika client, provides complete producer and consumer Python examples, and covers advanced topics such as work queues, message acknowledgments, persistence, fair dispatch, exchanges, routing, and RPC usage.

MaGe Linux Operations
MaGe Linux Operations
MaGe Linux Operations
Master RabbitMQ with Python: From Basics to Advanced Patterns

Introduction

RabbitMQ acts like a post office: it receives messages and forwards them. Producers and consumers interact via queues, which can be considered unlimited in size; multiple producers can send to a queue and multiple consumers can receive from it.

Code

The protocol used by RabbitMQ is AMQP, and the recommended Python client is pika.

pip install pika -i https://pypi.douban.com/simple/

Producer (send.py)

import pika  # establish a connection
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))  # connect to local RabbitMQ server
channel = connection.channel()  # get a channel

By default it connects to the local machine; to connect to a remote server replace the host name. Before publishing a message, ensure the target queue exists, otherwise RabbitMQ will discard the message.

channel.queue_declare(queue='hello')  # create a queue named hello
channel.basic_publish(exchange='',
                      routing_key='hello',
                      body='Hello World!')  # send message
connection.close()  # close connection

RabbitMQ requires at least 1 GB of free disk space; otherwise publishing fails.

After sending, you can verify the queue content with rabbitmqctl list_queues, which shows hello 1, indicating one message in the hello queue.

Consumer (receive.py)

import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

Again, connect to the server first.

channel.queue_declare(queue='hello')  # ensure the queue exists

def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)

channel.basic_consume(callback,
                      queue='hello',
                      no_ack=True)  # auto‑acknowledge
channel.start_consuming()  # start receiving loop

Work Queues (Task Queues)

Work queues distribute time‑consuming tasks to multiple worker processes. Instead of performing heavy work inside an HTTP request, tasks are sent as messages to a queue and processed by separate workers.

routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
    delivery_mode = 2,  # make message persistent
)

Messages are dispatched in round‑robin order, giving each worker a similar number of tasks.

Message Acknowledgement

If a worker crashes before completing a task, the message could be lost because RabbitMQ removes the message as soon as it is delivered. To prevent loss, workers send an ack after processing; if the ack is not received, RabbitMQ re‑queues the message for another worker.

By default, ack is enabled. The earlier example used no_ack=True, which disables ack.

channel.basic_consume(callback, queue='hello')  # ack enabled

Example callback with manual ack:

def callback(ch, method, properties, body):
    print(" [x] Received %r" % (body,))
    time.sleep(body.count('.'))
    print(" [x] Done")
    ch.basic_ack(delivery_tag = method.delivery_tag)  # send ack

Message Persistence

To survive RabbitMQ restarts, declare queues as durable and publish messages with delivery_mode = 2.

channel.queue_declare(queue='task_queue', durable=True)
channel.basic_publish(exchange='',
                      routing_key="task_queue",
                      body=message,
                      properties=pika.BasicProperties(
                          delivery_mode = 2,  # make message persistent
                      ))

Even with persistence, a message can be lost if RabbitMQ crashes before writing it to disk. For stronger guarantees, use publisher confirms.

Fair Dispatch

Round‑robin dispatch may be unfair if some tasks are heavier. Workers can request only one unacknowledged message at a time: channel.basic_qos(prefetch_count=1) This tells RabbitMQ not to send new messages to a worker that has not yet acked the previous one.

Broadcast (Fanout Exchange)

To send a message to multiple workers, use a fanout exchange.

channel.exchange_declare(exchange='logs', type='fanout')

Temporary Queues

result = channel.queue_declare()
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

Binding Exchanges and Queues

channel.queue_bind(exchange='logs', queue='hello')

Now messages published to the logs exchange are also delivered to the hello queue.

Routing

Bindings can specify a routing_key to filter messages.

Direct Exchange

channel.exchange_declare(exchange='direct_logs', type='direct')
channel.basic_publish(exchange='direct_logs',
                      routing_key=severity,
                      body=message)

Topic Exchange

Topic exchanges use dot‑separated routing keys with * (single word) and # (zero or more words) wildcards.

*  matches one word
#  matches zero or more words

RPC

Remote Procedure Call using RabbitMQ involves a client that creates a temporary callback queue, sends a request with reply_to and a unique correlation_id, then waits for the response.

self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
self.channel = self.connection.channel()
result = self.channel.queue_declare(exclusive=True)
self.callback_queue = result.method.queue
self.channel.basic_consume(self.on_response, no_ack=True, queue=self.callback_queue)

Client publishes the request:

channel.basic_publish(exchange='',
                      routing_key='rpc_queue',
                      properties=pika.BasicProperties(
                          reply_to=self.callback_queue,
                          correlation_id=self.corr_id,
                      ),
                      body=str(n))
while self.response is None:
    self.connection.process_data_events()
return int(self.response)

Server consumes from rpc_queue, processes the request, and sends the reply back to props.reply_to with the same correlation_id, then acknowledges the request.

channel.basic_consume(on_request, queue='rpc_queue')
# after processing:
ch.basic_publish(exchange='',
                 routing_key=props.reply_to,
                 properties=pika.BasicProperties(correlation_id=props.correlation_id),
                 body=str(response))
ch.basic_ack(delivery_tag=method.delivery_tag)
Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

PythonMessage QueueRabbitMQAMQPPika
MaGe Linux Operations
Written by

MaGe Linux Operations

Founded in 2009, MaGe Education is a top Chinese high‑end IT training brand. Its graduates earn 12K+ RMB salaries, and the school has trained tens of thousands of students. It offers high‑pay courses in Linux cloud operations, Python full‑stack, automation, data analysis, AI, and Go high‑concurrency architecture. Thanks to quality courses and a solid reputation, it has talent partnerships with numerous internet firms.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.