Comprehensive Guide to Using Celery for Distributed Task Processing in Python
This article provides a detailed overview of Celery, covering its architecture, common use cases, installation steps, project structure, task definition, periodic tasks, worker management, distributed deployment, advanced monitoring, custom task handling, task routing, and monitoring with Flower, all illustrated with practical code examples.
Celery is a simple, flexible, and reliable distributed task execution framework that follows a producer‑consumer model, where producers submit tasks to a broker queue and multiple workers consume and execute them.
Architecture : Celery consists of three core components – a message broker (e.g., Redis), worker processes that execute tasks, and a result backend for storing task outcomes. The workflow includes task submission to the broker, immediate execution by workers for asynchronous tasks, or scheduled execution via Celery Beat for periodic tasks.
Typical Application Scenarios : large‑scale asynchronous jobs such as file uploads, high‑concurrency real‑time inference, and scheduled jobs like email sending or system monitoring.
Installation :
<code># celery installation
pip install celery
# flower monitoring tool
pip install flower
pip install redis</code> <code># redis installation
yum install redis
# start redis
redis-server /etc/redis.conf</code>Project Layout :
<code># project structure
wedo
├── config.py
├── __init__.py
├── period_task.py
└── tasks.py</code>Celery Instance Initialization (in __init__.py ) sets the broker and backend URLs, result serializer, task expiration, timezone, and imports task modules.
<code>from celery import Celery
app = Celery('wedo')
app.config_from_object('wedo.config')
# config.py
BROKER_URL = 'redis://10.8.238.2:6379/0'
CELERY_RESULT_BACKEND = 'redis://10.8.238.2:6379/0'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_RESULT_EXPIRES = 60*60*24
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_IMPORTS = ('wedo.tasks', 'wedo.period_task')
</code>Task Definition uses the @app.task decorator. Examples include a simple addition task and a multiplication task that sleeps before returning.
<code>@app.task
def sum(x, y):
return x + y
@app.task
def mul(x, y):
time.sleep(5)
return x * y
</code>Periodic Tasks are defined in period_task.py using @app.on_after_configure.connect and crontab or fixed intervals.
<code>@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
sender.add_periodic_task(5.0, to_string.s('celery period task'), name='to_string')
sender.add_periodic_task(crontab(minute='*/10'), send_mail.s('hello, this is a celery'), name='send_mail')
</code>Worker and Beat Startup :
<code># start workers (4 processes)
celery worker -A wedo -l debug -c 4
# start beat scheduler
celery beat -A wedo.period_task
</code>Workers can be stopped using ps auxww | awk '/celery worker/ {print $2}' | xargs kill -9 and similarly for beat.
Task Invocation can be performed with task.apply_async(...) or the shortcut task.delay(...) , both returning an AsyncResult that provides status, result, and success checks.
<code>result = mul.apply_async(args=(2, 2))
value = result.get()
print(result.state) # PENDING -> STARTED -> SUCCESS/FAIL
</code>Task Composition supports parallel execution with group and sequential pipelines with chain .
<code># parallel sum tasks
result = group(sum.s(i, i) for i in range(5))()
# chained tasks
result = chain(sum.s(1,2), sum.s(3), mul.s(3))()
</code>Distributed Cluster Deployment simply copies the project to other servers and runs the same worker command, leveraging the shared broker (Redis) to coordinate tasks across nodes.
Advanced Features include custom task classes for monitoring, progress updates, retries, and failure handling, as well as routing tasks to specific workers using queues.
<code>class TaskMonitor(celery.Task):
def on_failure(self, exc, task_id, args, kwargs, einfo):
logger.info('task %s failed: %r' % (task_id, exc))
def on_success(self, retval, task_id, args, kwargs):
logger.info('task %s succeeded' % task_id)
def on_retry(self, exc, task_id, args, kwargs, einfo):
logger.info('task %s retry: %s' % (task_id, exc))
@app.task(base=TaskMonitor, bind=True, name='post_file')
def post_file(self, file_names):
for i, file in enumerate(file_names):
self.update_state(state='PROGRESS', meta={'current': i, 'total': len(file_names)})
time.sleep(2)
raise self.retry(exc=Exception('error'), countdown=3, max_retries=5)
</code>Task routing is achieved by starting workers with specific queues ( -Q celery,hipri ) and specifying the queue when calling a task ( queue='hipri' ).
Monitoring with Flower provides a web UI to inspect workers, tasks, and queues.
<code>flower -A wedo --port=6006 # access at http://<host>:6006/</code>Overall, the guide walks through setting up Celery, defining tasks, managing workers, scaling across a cluster, and extending functionality for robust asynchronous processing in Python.
Python Programming Learning Circle
A global community of Chinese Python developers offering technical articles, columns, original video tutorials, and problem sets. Topics include web full‑stack development, web scraping, data analysis, natural language processing, image processing, machine learning, automated testing, DevOps automation, and big data.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.