Backend Development 19 min read

Python Examples of Distributed Task Queues, Message Brokers, RPC, and Serialization Libraries

This article provides practical Python code examples for various messaging and serialization tools—including Celery, RQ, Huey, ZeroMQ, kafka‑python, Pika, stomp.py, nats‑py, gRPC, Thrift, Protobuf, Avro, msgpack, and Flatbuffers—demonstrating how to set up producers, consumers, and services for asynchronous processing and data exchange.

Test Development Learning Exchange
Test Development Learning Exchange
Test Development Learning Exchange
Python Examples of Distributed Task Queues, Message Brokers, RPC, and Serialization Libraries

1. Celery

Scenario: Use Celery to create a distributed task queue and execute asynchronous tasks.

from celery import Celery
import time
# Configure Celery
app = Celery('tasks', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')
# Define task
@app.task
def long_running_task(x, y):
    time.sleep(5)  # Simulate long-running task
    return x + y
# Start Celery worker
# Run in command line: celery -A your_script_name worker --loglevel=info
# Call task
result = long_running_task.delay(4, 6)
print("Task ID:", result.id)
print("Task Result:", result.get())  # Wait for task to finish and get result

2. RQ (Redis Queue)

Scenario: Use RQ to create a simple task queue and execute asynchronous tasks.

import redis
from rq import Queue
from rq.job import Job
from worker import conn  # Assume you have a worker.py with connection config
# Create queue
q = Queue(connection=conn)
# Define task function
def count_words(text):
    time.sleep(5)  # Simulate long-running task
    return len(text.split())
# Enqueue task
job = q.enqueue(count_words, 'Hello, world! This is a test.')
print("Job ID:", job.id)
print("Job Result:", job.result)  # May be None if not finished

3. Huey

Scenario: Use Huey to create a lightweight task queue and execute asynchronous tasks.

from huey import RedisHuey
import time
# Configure Huey
huey = RedisHuey('my-app', host='localhost', port=6379)
# Define task
@huey.task()
def long_running_task(x, y):
    time.sleep(5)  # Simulate long-running task
    return x + y
# Start Huey worker
# Run in command line: huey_consumer.py my_app.huey
# Call task
result = long_running_task(4, 6)
print("Task ID:", result.id)
print("Task Result:", result(blocking=True))  # Wait for task to finish and get result

4. ZeroMQ

Scenario: Use ZeroMQ to build a high‑performance asynchronous messaging system.

import zmq
import threading
import time
# Server side
def server():
    context = zmq.Context()
    socket = context.socket(zmq.REP)
    socket.bind("tcp://*:5555")
    while True:
        message = socket.recv_string()
        print(f"Received request: {message}")
        time.sleep(1)  # Simulate processing time
        socket.send_string("World")
# Client side
def client():
    context = zmq.Context()
    socket = context.socket(zmq.REQ)
    socket.connect("tcp://localhost:5555")
    for i in range(10):
        socket.send_string("Hello")
        reply = socket.recv_string()
        print(f"Received reply: {reply}")
# Start server thread
server_thread = threading.Thread(target=server)
server_thread.daemon = True
server_thread.start()
# Start client
client()

5. kafka‑python

Scenario: Use kafka‑python to create an Apache Kafka producer and consumer.

from kafka import KafkaProducer, KafkaConsumer
import json
import time
# Producer
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                         value_serializer=lambda v: json.dumps(v).encode('utf-8'))
for _ in range(10):
    producer.send('test-topic', {'key': 'value'})
    time.sleep(1)
# Consumer
consumer = KafkaConsumer('test-topic',
                         bootstrap_servers=['localhost:9092'],
                         auto_offset_reset='earliest',
                         enable_auto_commit=True,
                         group_id='my-group',
                         value_deserializer=lambda x: json.loads(x.decode('utf-8')))
for message in consumer:
    print(f"Received message: {message.value}")

6. Pika (RabbitMQ)

Scenario: Use Pika to create a RabbitMQ producer and consumer.

import pika
import time
# Producer
def send_message():
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    channel.queue_declare(queue='hello')
    for i in range(10):
        channel.basic_publish(exchange='', routing_key='hello', body=f'Hello World! {i}')
        time.sleep(1)
    connection.close()
# Consumer
def receive_message():
    def callback(ch, method, properties, body):
        print(f"Received: {body.decode()}")
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    channel.queue_declare(queue='hello')
    channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()
# Start producer thread
producer_thread = threading.Thread(target=send_message)
producer_thread.start()
# Start consumer
receive_message()

7. stomp.py

Scenario: Use stomp.py to create a STOMP protocol producer and consumer.

import stomp
import time
# Producer
class MyProducer(stomp.ConnectionListener):
    def __init__(self, conn):
        self.conn = conn
    def on_connected(self, frame):
        for i in range(10):
            self.conn.send('/queue/test', f'Hello, World! {i}'.encode())
            time.sleep(1)
# Consumer
class MyConsumer(stomp.ConnectionListener):
    def on_message(self, frame):
        print(f"Received: {frame.body.decode()}")
# Connection
conn = stomp.Connection([('localhost', 61613)])
conn.set_listener('', MyConsumer())
conn.connect('admin', 'password', wait=True)
# Start producer
producer = MyProducer(conn)
conn.set_listener('', producer)
conn.connect('admin', 'password', wait=True)
# Subscribe consumer
conn.subscribe(destination='/queue/test', id=1, ack='auto')
time.sleep(10)  # Keep connection alive to receive messages
conn.disconnect()

8. nats‑py

Scenario: Use nats‑py to create a NATS.io producer and consumer.

import asyncio
import nats
async def run():
    # Connect to NATS
    nc = await nats.connect(servers=["nats://localhost:4222"])
    # Producer
    async def producer():
        for i in range(10):
            await nc.publish("test", f"Message {i}".encode())
            await asyncio.sleep(1)
    # Consumer
    async def consumer(msg):
        print(f"Received: {msg.data.decode()}")
    # Subscribe
    sub = await nc.subscribe("test", cb=consumer)
    # Start producer
    asyncio.create_task(producer())
    # Keep connection alive to receive messages
    await asyncio.sleep(10)
    await sub.unsubscribe()
    await nc.close()
if __name__ == '__main__':
    asyncio.run(run())

9. gRPC

Scenario: Use gRPC to create a high‑performance RPC service.

syntax = "proto3";
package helloworld;
service Greeter {
  rpc SayHello (HelloRequest) returns (HelloReply) {}
}
message HelloRequest {
  string name = 1;
}
message HelloReply {
  string message = 1;
}

Generate Python code:

python -m grpc_tools.protoc -I. --python_out=. --grpc_python_out=. helloworld.proto

Server (server.py):

from concurrent import futures
import grpc
import helloworld_pb2
import helloworld_pb2_grpc
class Greeter(helloworld_pb2_grpc.GreeterServicer):
    def SayHello(self, request, context):
        return helloworld_pb2.HelloReply(message='Hello, %s!' % request.name)
def serve():
    server = grpc.server(futures.ThreadPoolExecutor(max_workers=10))
    helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
    server.add_insecure_port('[::]:50051')
    server.start()
    server.wait_for_termination()
if __name__ == '__main__':
    serve()

Client (client.py):

import grpc
import helloworld_pb2
import helloworld_pb2_grpc
def run():
    with grpc.insecure_channel('localhost:50051') as channel:
        stub = helloworld_pb2_grpc.GreeterStub(channel)
        response = stub.SayHello(helloworld_pb2.HelloRequest(name='you'))
        print("Greeter client received: " + response.message)
if __name__ == '__main__':
    run()

10. Thrift

Scenario: Use Thrift to create a cross‑language service.

namespace py tutorial
struct Work {
  1: i32 num1 = 0,
  2: i32 num2,
  3: Operation op,
  4: optional i32 comment,
}
enum Operation {
  ADD = 1,
  SUBTRACT = 2,
  MULTIPLY = 3,
  DIVIDE = 4
}
exception InvalidOperation {
  1: i32 whatOp,
  2: string why
}
service Calculator {
  void ping(),
  i32 add(1:i32 num1, 2:i32 num2),
  i32 calculate(1:i32 logid, 2:Work w) throws (1:InvalidOperation ouch),
}

Generate Python code:

thrift --gen py tutorial.thrift

Server (server.py):

from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
from thrift.server import TServer
from tutorial import Calculator
from tutorial.ttypes import *
class CalculatorHandler:
    def ping(self):
        print("ping()")
    def add(self, num1, num2):
        print(f"add({num1}, {num2})")
        return num1 + num2
    def calculate(self, logid, work):
        print(f"calculate({logid}, {work})")
        if work.op == Operation.DIVIDE and work.num2 == 0:
            raise InvalidOperation(whatOp=work.op, why="Cannot divide by 0")
        if work.op == Operation.ADD:
            val = work.num1 + work.num2
        elif work.op == Operation.SUBTRACT:
            val = work.num1 - work.num2
        elif work.op == Operation.MULTIPLY:
            val = work.num1 * work.num2
        elif work.op == Operation.DIVIDE:
            val = work.num1 // work.num2
        else:
            raise InvalidOperation(whatOp=work.op, why="Unknown operation")
        return val
handler = CalculatorHandler()
processor = Calculator.Processor(handler)
transport = TSocket.TServerSocket(host='localhost', port=9090)
tfactory = TTransport.TBufferedTransportFactory()
pfactory = TBinaryProtocol.TBinaryProtocolFactory()
server = TServer.TSimpleServer(processor, transport, tfactory, pfactory)
print('Starting the server...')
server.serve()

Client (client.py):

from thrift import Thrift
from thrift.transport import TSocket
from thrift.transport import TTransport
from thrift.protocol import TBinaryProtocol
from tutorial import Calculator
from tutorial.ttypes import *
try:
    transport = TSocket.TSocket('localhost', 9090)
    transport = TTransport.TBufferedTransport(transport)
    protocol = TBinaryProtocol.TBinaryProtocol(transport)
    client = Calculator.Client(protocol)
    transport.open()
    print('ping()')
    client.ping()
    print('add(1, 1)')
    print('1+1=%d' % client.add(1, 1))
    work = Work()
    work.op = Operation.DIVIDE
    work.num1 = 1
    work.num2 = 0
    try:
        quotient = client.calculate(1, work)
        print('Whoa? You know how to divide by zero?')
    except InvalidOperation as e:
        print('InvalidOperation: %r' % e)
    work.op = Operation.SUBTRACT
    work.num1 = 15
    work.num2 = 10
    diff = client.calculate(1, work)
    print('15-10=%d' % diff)
    transport.close()
except Thrift.TException as tx:
    print('%s' % tx.message)

11. Protobuf

Scenario: Use Protobuf for efficient data serialization.

syntax = "proto3";
message Person {
  string name = 1;
  int32 id = 2;
  string email = 3;
}
message AddressBook {
  repeated Person people = 1;
}

Generate Python code:

python -m grpc_tools.protoc -I. --python_out=. person.proto

Serialization / Deserialization:

import person_pb2
# Create Person object
person = person_pb2.Person()
person.id = 1234
person.name = "John Doe"
person.email = "[email protected]"
# Serialize to binary
person_data = person.SerializeToString()
print("Serialized data:", person_data)
# Deserialize
new_person = person_pb2.Person()
new_person.ParseFromString(person_data)
print("Deserialized data:")
print(new_person)

12. Avro‑Python3

Scenario: Use Avro‑Python3 for data serialization.

{
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "favorite_number", "type": ["int", "null"]},
    {"name": "favorite_color", "type": ["string", "null"]}
  ]
}

Serialization / Deserialization:

import avro.schema
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter
# Load schema
schema = avro.schema.parse(open("user.avsc", "rb").read())
# Write data
writer = DataFileWriter(open("users.avro", "wb"), DatumWriter(), schema)
writer.append({"name": "Alyssa", "favorite_number": 256})
writer.append({"name": "Ben", "favorite_number": 7, "favorite_color": "red"})
writer.close()
# Read data
reader = DataFileReader(open("users.avro", "rb"), DatumReader())
for user in reader:
    print(user)
reader.close()

13. msgpack‑python

Scenario: Use msgpack‑python for efficient data serialization.

import msgpack
# Create data
data = {
    'name': 'Alice',
    'age': 30,
    'is_student': False,
    'courses': ['Math', 'Physics']
}
# Serialize to binary
packed_data = msgpack.packb(data)
print("Packed data:", packed_data)
# Deserialize
unpacked_data = msgpack.unpackb(packed_data)
print("Unpacked data:", unpacked_data)

14. Flatbuffers

Scenario: Use Flatbuffers for high‑performance serialization.

table Monster {
  name:string;
  health:int;
  mana:int;
  attack:short;
  defense:short;
  equipment:[Equipment];
}

table Equipment {
  type:Type;
  power:float;
}

enum Type:byte { None, Weapon, Armor, Potion }
root_type Monster;

Compile schema:

flatc --python monster.fbs

Serialization / Deserialization:

import monster_generated
import flatbuffers
# Create Monster object
builder = flatbuffers.Builder(0)
equipment_list = []
for i in range(3):
    monster_generated.EquipmentStart(builder)
    monster_generated.EquipmentAddType(builder, i)
    monster_generated.EquipmentAddPower(builder, 10.0 + i)
    equipment_list.append(monster_generated.EquipmentEnd(builder))
monster_generated.MonsterStartEquipmentVector(builder, 3)
for item in reversed(equipment_list):
    builder.PrependUOffsetTRelative(item)
equipment_vector = builder.EndVector(3)
monster_generated.MonsterStart(builder)
monster_generated.MonsterAddName(builder, builder.CreateString("Orc"))
monster_generated.MonsterAddHealth(builder, 100)
monster_generated.MonsterAddMana(builder, 50)
monster_generated.MonsterAddAttack(builder, 10)
monster_generated.MonsterAddDefense(builder, 5)
monster_generated.MonsterAddEquipment(builder, equipment_vector)
monster = monster_generated.MonsterEnd(builder)
builder.Finish(monster)
# Serialize to binary
buf = builder.Output()
print("Serialized data:", buf)
# Deserialize
monster_obj = monster_generated.Monster.GetRootAsMonster(buf, 0)
print("Name:", monster_obj.Name().decode('utf-8'))
print("Health:", monster_obj.Health())
print("Mana:", monster_obj.Mana())
print("Attack:", monster_obj.Attack())
print("Defense:", monster_obj.Defense())
for i in range(monster_obj.EquipmentLength()):
    equipment = monster_obj.Equipment(i)
    print(f"Equipment {i}: Type={equipment.Type()}, Power={equipment.Power()}")
pythonRPCSerializationMessage Queuetutorialdistributed tasks
Test Development Learning Exchange
Written by

Test Development Learning Exchange

Test Development Learning Exchange

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.