Mastering Celery: Periodic Tasks, Sync Calls, Result Storage, and Monitoring

Explore how to configure Celery’s periodic (Beat) tasks, perform synchronous task calls, persist results using Redis, monitor workers with Flower, and debug remotely via telnet, with practical code examples and step‑by‑step instructions for robust backend task management.

AI Cyberspace
AI Cyberspace
AI Cyberspace
Mastering Celery: Periodic Tasks, Sync Calls, Result Storage, and Monitoring

Celery Periodic (Scheduled) Tasks

Celery’s periodic task feature is provided by the Beat scheduler process, which launches tasks defined in beat_schedule .

# filename: app_factory.py
from __future__ import absolute_import
from celery import Celery
from kombu import Queue, Exchange

def make_app():
    app = Celery('proj')
    app.config_from_object('proj.celeryconfig')

    default_exchange = Exchange('default', type='direct')
    web_exchange = Exchange('task', type='direct')
    app.conf.task_default_queue = 'default'
    app.conf.task_default_exchange = 'default'
    app.conf.task_default_routing_key = 'default'

    app.conf.task_queues = (
        Queue('default', default_exchange, routing_key='default'),
        Queue('high_queue', web_exchange, routing_key='hign_task'),
        Queue('low_queue', web_exchange, routing_key='low_task'),
    )

    # set Beat timezone, default UTC
    app.conf.timezone = 'Asia/Shanghai'
    # declare periodic tasks in beat_schedule
    app.conf.beat_schedule = {
        'periodic_task_add': {
            'task': 'proj.task.tasks.add',
            'schedule': 3.0,
            'args': (2, 2)
        },
    }
    return app

Start a Celery worker with the -B option to enable Beat.

Celery Synchronous Calls

The Task.get method can be used for synchronous invocation, blocking until the task returns.

# filename: tasks.py
import time
from proj.celery import app

@app.task
def add(x, y, debug=False):
    # Test sync invoke.
    time.sleep(10)
    for i in xrange(10):
        print("Warting: %s s" % i)
    if debug:
        print("x: %s; y: %s" % (x, y))
    return x + y

Example of calling the task synchronously:

>> from proj.task.tasks import add
>>> add.delay(2, 2).get()
4

Celery Result Storage

When task results are critical, a backend such as Redis can be used to persist results for later analysis.

# filename: app_factory.py
from celery.schedules import crontab

... 
    app.conf.beat_schedule = {
        'periodic_task_add': {
            'task': 'proj.task.tasks.add',
            'schedule': crontab(minute='*/1'),
            'args': (2, 2)
        },
    }

Inspect stored results in Redis:

$ redis-cli
127.0.0.1:6379> keys *
1) "celery-task-meta-da3f6f3d-f977-4b39-a795-eaa89aca03ec"
2) "celery-task-meta-38437d5c-ebd8-442c-8605-435a48853085"
... 
35) "celery-task-meta-65cee5e0-5f4f-4d2b-b52f-6904e7f2b6ab"
... 
127.0.0.1:6379> GET 'celery-task-meta-65cee5e0-5f4f-4d2b-b52f-6904e7f2b6ab'
{"status": "SUCCESS", "traceback": null, "result": 4, "task_id": "65cee5e0-5f4f-4d2b-b52f-6904e7f2b6ab", "children": []}

Celery Monitoring

Flower is the official monitoring tool for Celery, using the Events interface to display workers, tasks, broker, and pool information in real time.

Step 1: Install Flower $ pip install flower Step 2: Enable Celery Events

# -E enables events
$ celery worker -A proj -E -l info

Step 3: Activate RabbitMQ Management Plugin

$ rabbitmq-plugins enable rabbitmq_management
$ service rabbitmq-server restart

Step 4: Start Flower with broker URL

$ celery flower -l info --broker_api=http://guest:guest@<rabbitmq_server_ip>:15672/api/

Step 5: Open the dashboard URL

Visit http://<flower_ip>:5555/dashboard to view the monitoring UI.

Celery Debugging

Celery can be debugged remotely using telnet and the celery.contrib.rdb module.

Step 1: Insert rdb.set_trace() in the task.

# filename: tasks.py
from proj.celery import app
from celery.contrib import rdb

@app.task
def add(x, y):
    # set breakpoint
    rdb.set_trace()
    return x + y

Step 2: Restart the Celery worker process.

Celery logs show the telnet connection command; connect from another terminal to enter the familiar pdb shell.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

Distributed SystemsPythonBackend DevelopmentceleryTask Queue
AI Cyberspace
Written by

AI Cyberspace

AI, big data, cloud computing, and networking.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.