Backend Development 7 min read

Practical Examples of Python multiprocessing and Celery for Parallel and Distributed Task Processing

This article introduces Python's multiprocessing module and the Celery distributed task queue, explains their core features, and provides ten practical code examples demonstrating multi‑process parallel computation, inter‑process communication, asynchronous tasks, scheduling, retries, and distributed processing for real‑world applications.

Test Development Learning Exchange
Test Development Learning Exchange
Test Development Learning Exchange
Practical Examples of Python multiprocessing and Celery for Parallel and Distributed Task Processing

When dealing with parallel computing and distributed task handling, Python offers powerful libraries such as multiprocessing and celery . The multiprocessing module is part of the standard library and enables multi‑process parallelism with support for inter‑process communication and shared data. Celery is an open‑source distributed task queue framework that handles asynchronous tasks and scalable task distribution across multiple worker nodes.

Below are ten practical scenarios that illustrate how to use multiprocessing and celery for parallel and distributed processing.

1. Multiprocessing: simple parallel computation.

import multiprocessing

def square(x):
    return x ** 2

if __name__ == "__main__":
    with multiprocessing.Pool() as pool:
        numbers = [1, 2, 3, 4, 5]
        results = pool.map(square, numbers)
        print(results)

2. Multiprocessing: data‑parallel processing.

import multiprocessing

def process_data(data):
    # 处理数据的逻辑
    pass

if __name__ == "__main__":
    with multiprocessing.Pool() as pool:
        data = [1, 2, 3, 4, 5]
        pool.map(process_data, data)

3. Multiprocessing: asynchronous tasks with apply_async.

import multiprocessing, time

def process_data(data):
    # 模拟耗时的任务
    time.sleep(1)
    return data ** 2

if __name__ == "__main__":
    with multiprocessing.Pool() as pool:
        data = [1, 2, 3, 4, 5]
        results = [pool.apply_async(process_data, (x,)) for x in data]
        output = [r.get() for r in results]
        print(output)

4. Multiprocessing: inter‑process communication and shared data.

import multiprocessing

def worker(shared_list):
    shared_list.append("Data from worker")

if __name__ == "__main__":
    manager = multiprocessing.Manager()
    shared_list = manager.list()
    processes = []
    for _ in range(5):
        p = multiprocessing.Process(target=worker, args=(shared_list,))
        processes.append(p)
        p.start()
    for p in processes:
        p.join()
    print(shared_list)

5. Celery: basic asynchronous task.

from celery import Celery

app = Celery("tasks", broker="redis://localhost:6379/0")

@app.task
def process_data(data):
    # 处理数据的逻辑
    pass

if __name__ == "__main__":
    result = process_data.delay(10)
    print(result.get())

6. Celery: explicit task scheduling with a specific queue.

from celery import Celery

app = Celery("tasks", broker="redis://localhost:6379/0")

@app.task
def process_data(data):
    # 处理数据的逻辑
    pass

if __name__ == "__main__":
    result = process_data.apply_async(args=(10,), queue="high_priority")
    print(result.get())

7. Celery: periodic (cron‑style) task scheduling.

from celery import Celery
from celery.schedules import crontab

app = Celery('tasks', broker='redis://localhost:6379/0')

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    # 每天早上6点执行一次任务
    sender.add_periodic_task(crontab(hour=6, minute=0), tasks.process_data.s())

@app.task
def process_data():
    # 处理数据的任务代码
    pass

8. Celery: task result callback handling.

from celery import Celery

app = Celery('tasks', broker='redis://localhost:6379/0')

@app.task(bind=True)
def process_data(self):
    # 处理数据的任务代码
    result = do_some_work()
    # 回调处理任务结果
    callback_task.apply_async(args=(result,), link=self.request.callback)

@app.task
def callback_task(result):
    # 处理任务结果的回调任务代码
    pass

9. Celery: retry logic and error handling.

from celery import Celery
from celery.exceptions import MaxRetriesExceededError

app = Celery('tasks', broker='redis://localhost:6379/0')

@app.task(bind=True, max_retries=3)
def process_data(self):
    try:
        # 处理数据的任务代码
        result = do_some_work()
    except Exception as exc:
        # 发生异常时进行重试
        try:
            raise self.retry(exc=exc)
        except MaxRetriesExceededError:
            # 重试次数超过限制后的处理代码
            pass

10. Celery: distributed task processing using groups.

from celery import Celery, group

app = Celery('tasks', broker='redis://localhost:6379/0')

@app.task
def process_data(data):
    # 处理数据的任务代码
    pass

def distribute_tasks(data_list):
    # 将任务分发给多个工作节点
    task_group = group(process_data.s(data) for data in data_list)
    result = task_group.apply_async()
    # 获取任务结果
    task_results = result.get()
    return task_results

These examples help you understand and apply parallel computing and distributed task processing techniques; you can select and adapt the scenarios that best fit your specific requirements.

PythonParallel ComputingCelerycode examplesdistributed tasksmultiprocessing
Test Development Learning Exchange
Written by

Test Development Learning Exchange

Test Development Learning Exchange

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.