Mastering Celery: Build Distributed Task Queues with RabbitMQ & Redis
This article introduces Celery, a flexible Python distributed task queue, explains its architecture—including broker, beat, worker, and result backend—then provides a complete, step‑by‑step setup using RabbitMQ and Redis, covering installation, configuration, task definition, worker launch, custom exchanges, and queue routing.
Preface
Celery is a must‑know component in the Python backend skill set. It works together with RabbitMQ (see the RabbitMQ series) to form a powerful distributed task queue.
Overview
Celery is a simple, flexible, and reliable distributed task queue . Queues follow FIFO semantics and decouple producers from consumers, enabling asynchronous calls. Tasks are functions wrapped with the celery.task decorator, and the queued "task" contains all parameters needed for execution.
Celery itself does not provide a queue service; it acts as a controller. Therefore, a third‑party message broker such as RabbitMQ or Redis is required.
RabbitMQ focuses on managing the message queue, while Celery focuses on real‑time task processing.
Celery Use Cases
Instant response requirements – Asynchronous calls reduce response time, similar to Ajax loading on the front end.
Periodic tasks (Cron) – Celery Beat supports crontab‑style scheduling for heartbeat checks, log archiving, etc.
High concurrency and scalability – Celery can run tasks with multiple threads, processes, or coroutines and can scale dynamically.
Architecture Components
Broker
The message broker temporarily stores tasks and provides queue services. Producers send tasks to the broker; consumers retrieve them.
Beat
The scheduler that triggers periodic Celery tasks, reading the task list from CeleryConfig and sending due tasks to the queue.
Worker
The execution unit that runs tasks. Each worker has a concurrency pool (prefork, eventlet, gevent, thread) and listens to subscribed queues.
Result Backend/Store
Stores task execution status and results. Celery can return real‑time status and final results via this backend.
Basic Celery Setup
Below is a step‑by‑step example using RabbitMQ as the broker and Redis as the result backend.
Step 1: Install
pip install celery
pip install redis
sudo apt-get install -yq rabbitmq-serverStep 2: Initialize project
proj/
├── app_factory.py # Celery application factory
├── celeryconfig.py # Configuration module
├── celery.py # Celery startup module
├── __init__.py
└── task # Task package
├── api.py # Task API
├── tasks.py # Task implementations
└── __init__.pyStep 3: Configure Celery
# celeryconfig.py
BROKER_URL = 'amqp://guest:guest@localhost:5672//'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
CELERY_IMPORTS = ['proj.task.tasks']Step 4: App Factory
from __future__ import absolute_import
from celery import Celery
def make_app():
app = Celery('proj')
app.config_from_object('proj.celeryconfig')
return appStep 5: Celery Runner
# celery.py
from proj.app_factory import make_app
app = make_app()Step 6: Define Tasks
# tasks.py
from proj.celery import app
@app.task
def add(x, y):
return x + yStep 7: Start Worker /workspace$ celery worker -A proj -l info Options such as -A/--app specify the project; Celery loads the app object from celery.py.
Step 8: Execute Task
>> from proj.task import tasks
>>> result = tasks.add.delay(1, 2)
>>> result.status
u'SUCCESS'
>>> result.result
3NOTE: Retrieving results requires a configured Result Backend; otherwise an exception is raised.
Step 9: Custom Exchange & Queue
from __future__ import absolute_import
from celery import Celery
from kombu import Queue, Exchange
def make_app():
app = Celery('proj')
app.config_from_object('proj.celeryconfig')
default_exchange = Exchange('default', type='direct')
web_exchange = Exchange('task', type='direct')
app.conf.task_default_queue = 'default'
app.conf.task_default_exchange = 'default'
app.conf.task_default_routing_key = 'default'
app.conf.task_queues = (
Queue('default', default_exchange, routing_key='default'),
Queue('high_queue', web_exchange, routing_key='high_task'),
Queue('low_queue', web_exchange, routing_key='low_task'),
)
return appStep 10: Send Task to Specific Queue
>> from proj.task import tasks
>>> result = tasks.add.apply_async(args=(1, 2), queue='low_queue')
>>> result.status
u'SUCCESS'
>>> result.result
3NOTE: apply_async and delay trigger the task; calling the function directly executes it synchronously.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
