Backend Development 7 min read

How to Build a FastAPI Service with RabbitMQ Consumer & Publisher

This guide walks through creating a FastAPI application that both publishes and consumes messages via RabbitMQ, detailing the setup of a Pika client, asynchronous consumer, message handling, API routes, and deployment with uvicorn, complete with code snippets and testing instructions.

Code Mala Tang
Code Mala Tang
Code Mala Tang
How to Build a FastAPI Service with RabbitMQ Consumer & Publisher

When building a microservice with FastAPI, I needed a way for services to both publish and consume messages, so I chose RabbitMQ as the broker and used the Pika library.

First, a PikaClient class is created to handle all communication with RabbitMQ:

<code>class PikaClient:
    def __init__(self, process_callable):
        self.publish_queue_name = env('PUBLISH_QUEUE', 'foo_publish_queue')
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(host=env('RABBIT_HOST', '127.0.0.1'))
        )
        self.channel = self.connection.channel()
        self.publish_queue = self.channel.queue_declare(queue=self.publish_queue_name)
        self.callback_queue = self.publish_queue.method.queue
        self.response = None
        self.process_callable = process_callable
        logger.info('Pika connection initialized')
</code>

The client needs an asynchronous consumer that runs with the event loop:

<code>async def consume(self, loop):
    """Set up a message listener that works with the current loop"""
    connection = await connect_robust(host=env('RABBIT_HOST', '127.0.0.1'), port=5672, loop=loop)
    channel = await connection.channel()
    queue = await channel.declare_queue(env('CONSUME_QUEUE', 'foo_consume_queue'))
    await queue.consume(self.process_incoming_message, no_ack=False)
    logger.info('Pika async listener established')
    return connection
</code>

Incoming messages are processed by:

<code>async def process_incoming_message(self, message):
    """Handle incoming RabbitMQ messages"""
    message.ack()
    body = message.body
    logger.info('Message received')
    if body:
        self.process_callable(json.loads(body))
</code>

To publish messages a simple method is added:

<code>def send_message(self, message: dict):
    """Publish a message to RabbitMQ"""
    self.channel.basic_publish(
        exchange='',
        routing_key=self.publish_queue_name,
        properties=pika.BasicProperties(
            reply_to=self.callback_queue,
            correlation_id=str(uuid.uuid4())
        ),
        body=json.dumps(message)
    )
</code>

A FastAPI application class wires the client together:

<code>class FooApp(FastAPI):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.pika_client = PikaClient(self.log_incoming_message)

    @classmethod
    def log_incoming_message(cls, message: dict):
        """Process incoming messages meaningfully"""
        logger.info('Received message: %s', message)
</code>

The startup event launches the consumer:

<code>foo_app = FooApp()
foo_app.include_router(router)

@foo_app.on_event('startup')
async def startup():
    loop = asyncio.get_running_loop()
    task = loop.create_task(foo_app.pika_client.consume(loop))
    await task
</code>

An API router defines a POST endpoint that forwards the payload to RabbitMQ:

<code>router = APIRouter(
    tags=['items'],
    responses={404: {"description": "Not found"}}
)

@router.post('/send-message')
async def send_message(payload: MessageSchema, request: Request):
    request.app.pika_client.send_message({"message": payload.message})
    return {"status": "ok"}
</code>

The request schema is defined with Pydantic:

<code>from pydantic import BaseModel

class MessageSchema(BaseModel):
    message: str
</code>

To test the API, send a POST request such as:

<code>POST http://127.0.0.1:8001/send-message
Accept: application/json

{
  "message": "I am testing sending a message"
}
</code>

Run the service with uvicorn:

<code>uvicorn test_app:foo_app --reload --reload-dir .src/ --port 8001
</code>

After starting, the console shows the Pika connection message and the startup event log. Posting to the endpoint creates a message in the RabbitMQ management UI, and consuming from the foo_consume_queue displays the message, confirming the end‑to‑end flow.

PythonMessage QueueRabbitMQAsyncFastAPIPika
Code Mala Tang
Written by

Code Mala Tang

Read source code together, write articles together, and enjoy spicy hot pot together.

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.