Comprehensive Guide to Using Celery for Asynchronous Task Processing in Python
This article provides a detailed tutorial on Celery, covering its architecture, installation, task structures, basic and advanced usage, integration with Django, and code examples for creating workers, scheduling tasks, and retrieving results, enabling developers to implement robust asynchronous processing in Python applications.
Celery is introduced with official links ( http://www.celeryproject.org/ , http://docs.celeryproject.org/en/latest/index.html ) and a brief overview of its role as an asynchronous task framework.
The architecture consists of three parts: a message broker (e.g., RabbitMQ or Redis), worker processes that execute tasks, and a result store for task outcomes.
Typical usage scenarios include asynchronous execution for time‑consuming jobs, delayed execution with apply_async , and periodic execution via Celery beat.
Installation is performed with pip install celery , followed by configuring the broker and backend, for example: broker = 'redis://127.0.0.1:6379/1' backend = 'redis://127.0.0.1:6379/2' app = Celery(broker=broker, backend=backend, include=['celery_task.tasks'])
Two recommended project structures are shown. When using a package, create a celery_task directory containing __init__.py , celery.py (the Celery app), and tasks.py (task definitions). When using a single module, place the Celery app directly in celery.py .
Basic task examples illustrate how to define and register tasks: @app.task def add(n1, n2): return n1 + n2 @app.task def low(n1, n2): return n1 - n2
Adding tasks can be done immediately with tasks.add.delay(10, 20) or delayed with apply_async and an eta value, e.g.: t1 = tasks.add.delay(10, 20) eta_time = datetime.utcnow() + timedelta(seconds=10) result = tasks.some_task.apply_async(args=(a, b), eta=eta_time)
Results are retrieved using AsyncResult : from celery.result import AsyncResult async_res = AsyncResult(id='task-id', app=app) if async_res.successful(): print(async_res.get())
Advanced usage demonstrates configuring periodic tasks with Celery beat, setting timezone, disabling UTC, and defining a schedule dictionary: app.conf.timezone = 'Asia/Shanghai' app.conf.enable_utc = False app.conf.beat_schedule = { 'update_banner': { 'task': 'celery_task.tasks.update_banner_list', 'schedule': timedelta(seconds=10), }, }
Integration with Django includes Redis cache settings, a viewset that caches banner data, and a Celery task that updates the cache: cache.set('banner_list', banner_list, 86400) @app.task def update_banner_list(): queryset = Banner.objects.filter(is_delete=False, is_show=True).order_by('-orders')[:settings.BANNER_COUNT] banner_list = BannerSerializer(queryset, many=True).data for banner in banner_list: banner['image'] = f'http://127.0.0.1:8000{banner["image"]}' cache.set('banner_list', banner_list, 86400) return True
Finally, the article explains how to start the services: run a worker with celery worker -A celery_task -l info -P eventlet , start the beat scheduler with celery beat -A celery_task -l info , and launch the Django server to serve cached endpoints.
Python Programming Learning Circle
A global community of Chinese Python developers offering technical articles, columns, original video tutorials, and problem sets. Topics include web full‑stack development, web scraping, data analysis, natural language processing, image processing, machine learning, automated testing, DevOps automation, and big data.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.