Comprehensive Guide to Using Celery for Asynchronous Task Processing in Python
This article provides a detailed tutorial on Celery, covering its architecture, installation, task structures, basic and advanced usage, integration with Django, and code examples for creating workers, scheduling tasks, and retrieving results, enabling developers to implement robust asynchronous processing in Python applications.
Celery is introduced with official links ( http://www.celeryproject.org/, http://docs.celeryproject.org/en/latest/index.html) and a brief overview of its role as an asynchronous task framework.
The architecture consists of three parts: a message broker (e.g., RabbitMQ or Redis), worker processes that execute tasks, and a result store for task outcomes.
Typical usage scenarios include asynchronous execution for time‑consuming jobs, delayed execution with apply_async, and periodic execution via Celery beat.
Installation is performed with pip install celery, followed by configuring the broker and backend, for example:
broker = 'redis://127.0.0.1:6379/1'
backend = 'redis://127.0.0.1:6379/2'
app = Celery(broker=broker, backend=backend, include=['celery_task.tasks'])Two recommended project structures are shown. When using a package, create a celery_task directory containing __init__.py, celery.py (the Celery app), and tasks.py (task definitions). When using a single module, place the Celery app directly in celery.py.
Basic task examples illustrate how to define and register tasks:
@app.task
def add(n1, n2):
return n1 + n2
@app.task
def low(n1, n2):
return n1 - n2Adding tasks can be done immediately with tasks.add.delay(10, 20) or delayed with apply_async and an eta value, e.g.:
t1 = tasks.add.delay(10, 20)
eta_time = datetime.utcnow() + timedelta(seconds=10)
result = tasks.some_task.apply_async(args=(a, b), eta=eta_time)Results are retrieved using AsyncResult:
from celery.result import AsyncResult
async_res = AsyncResult(id='task-id', app=app)
if async_res.successful():
print(async_res.get())Advanced usage demonstrates configuring periodic tasks with Celery beat, setting timezone, disabling UTC, and defining a schedule dictionary:
app.conf.timezone = 'Asia/Shanghai'
app.conf.enable_utc = False
app.conf.beat_schedule = {
'update_banner': {
'task': 'celery_task.tasks.update_banner_list',
'schedule': timedelta(seconds=10),
},
}Integration with Django includes Redis cache settings, a viewset that caches banner data, and a Celery task that updates the cache:
cache.set('banner_list', banner_list, 86400)
@app.task
def update_banner_list():
queryset = Banner.objects.filter(is_delete=False, is_show=True).order_by('-orders')[:settings.BANNER_COUNT]
banner_list = BannerSerializer(queryset, many=True).data
for banner in banner_list:
banner['image'] = f'http://127.0.0.1:8000{banner["image"]}'
cache.set('banner_list', banner_list, 86400)
return TrueFinally, the article explains how to start the services: run a worker with celery worker -A celery_task -l info -P eventlet, start the beat scheduler with celery beat -A celery_task -l info, and launch the Django server to serve cached endpoints.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Python Programming Learning Circle
A global community of Chinese Python developers offering technical articles, columns, original video tutorials, and problem sets. Topics include web full‑stack development, web scraping, data analysis, natural language processing, image processing, machine learning, automated testing, DevOps automation, and big data.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
