How to Integrate Celery + RabbitMQ with FastAPI for Asynchronous Task Processing
This guide walks through setting up RabbitMQ via Docker, configuring Celery with FastAPI, defining asynchronous tasks, launching workers, testing endpoints, and monitoring execution with Flower, providing a complete, step‑by‑step solution for background processing in Python web applications.
In this article I share my experience using Celery with RabbitMQ in a FastAPI project to run time‑consuming audio‑analysis tasks in the background.
Celery is a distributed task queue that supports real‑time processing and scheduling. When a request triggers a long‑running task, the task is added to Celery’s queue and executed by worker processes while the FastAPI server remains responsive. Celery requires a message broker; RabbitMQ is the default and the simplest choice.
Implementation
First, set up RabbitMQ. On Windows I run it via Docker:
<code>docker run -p 15672:15672 -p 5672:5672 rabbitmq:3-management</code>After the container starts, the management UI is available at http://127.0.0.1:15672 with username and password guest .
Next, install Celery and python‑dotenv:
<code>pip install celery python-dotenv</code>The FastAPI project structure is:
<code>app/
├── config/
│ ├── __init__.py
│ └── celery_config.py
├── tasks/
│ ├── __init__.py
│ └── celery_tasks.py
├── __init__.py
├── main.py
├── .env
└── requirements.txt</code>In .env define the broker URL and result backend:
<code>CELERY_BROKER_URL=amqp://<USERNAME>:<PASSWORD>@localhost:5672//
CELERY_RESULT_BACKEND=rpc://</code>Create celery_config.py to configure the Celery app:
<code>import os
from celery import Celery
from dotenv import load_dotenv
load_dotenv()
celery_app = Celery(__name__, broker=os.getenv("CELERY_BROKER_URL"),
backend=os.getenv("CELERY_RESULT_BACKEND"))
celery_app.conf.update(
imports=['app.tasks.celery_tasks'],
broker_connection_retry_on_startup=True,
task_track_started=True,
)
</code>Define tasks in celery_tasks.py :
<code>@celery_app.task
def my_task(x, y):
ans = x + y
print(ans)
return ans
async def my_async_task(x, y):
await asyncio.sleep(3)
ans = x + y
print(ans)
return ans
@celery_app.task
def my_second_task(x, y):
result = asyncio.run(my_async_task(x, y))
return result
</code>Use the tasks in a FastAPI route:
<code>from fastapi import FastAPI
from app.tasks.celery_tasks import my_task
app = FastAPI()
@app.get("/run")
def handle_run():
task_response = my_task.delay(5, 6)
return {"message": "Task execution started"}
if __name__ == '__main__':
uvicorn.run(app, port=8080)
</code>Start a Celery worker:
<code>celery --app app.config.celery_config.celery_app worker --loglevel=info --pool=solo</code>Start the FastAPI server:
<code>uvicorn app.main:app --port 8000</code>Visit the Swagger UI, call the /run endpoint, and observe task execution logs in the Celery worker terminal.
Monitoring tasks
Install Flower to monitor Celery workers:
<code>pip install flower</code>Run Flower:
<code>celery flower --app app.config.celery_config.celery_app --broker=amqp://localhost//</code>Open http://localhost:5555/ in a browser to view task status.
This demonstrates the simplest way to integrate Celery + RabbitMQ with FastAPI, with options for multiple queues, additional configuration, and WebSocket notifications.
Code Mala Tang
Read source code together, write articles together, and enjoy spicy hot pot together.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.