Backend Development 7 min read

Using Python multiprocessing and Celery for Parallel and Distributed Task Processing

This article introduces Python's multiprocessing module and the Celery task queue, explains their core concepts, and provides practical code examples for multi‑process parallel computation, inter‑process communication, asynchronous execution, scheduled jobs, result callbacks, retries, and distributed task orchestration.

Test Development Learning Exchange
Test Development Learning Exchange
Test Development Learning Exchange
Using Python multiprocessing and Celery for Parallel and Distributed Task Processing

When it comes to parallel computing and distributed task handling, Python offers powerful libraries such as multiprocessing and celery .

multiprocessing is a standard library module that enables multi‑process parallelism, offering process creation, management, inter‑process communication, and shared data capabilities.

celery is an open‑source distributed task queue framework for asynchronous tasks and scheduling, providing a scalable architecture that can dispatch work to multiple worker nodes.

Below are practical code snippets demonstrating how to use multiprocessing and celery for various parallel and distributed scenarios.

Using multiprocessing for multi‑process parallel computation:

import multiprocessing

def square(x):
    return x ** 2

if __name__ == "__main__":
    with multiprocessing.Pool() as pool:
        numbers = [1, 2, 3, 4, 5]
        results = pool.map(square, numbers)
        print(results)

Using multiprocessing for data parallel processing:

import multiprocessing

def process_data(data):
    # 处理数据的逻辑
    pass

if __name__ == "__main__":
    with multiprocessing.Pool() as pool:
        data = [1, 2, 3, 4, 5]
        pool.map(process_data, data)

Creating a process pool to execute asynchronous tasks with multiprocessing :

import multiprocessing
import time

def process_data(data):
    # 模拟耗时的任务
    time.sleep(1)
    return data ** 2

if __name__ == "__main__":
    with multiprocessing.Pool() as pool:
        data = [1, 2, 3, 4, 5]
        results = [pool.apply_async(process_data, (x,)) for x in data]
        output = [r.get() for r in results]
        print(output)

Using multiprocessing for inter‑process communication and shared data:

import multiprocessing

def worker(shared_list):
    shared_list.append("Data from worker")

if __name__ == "__main__":
    manager = multiprocessing.Manager()
    shared_list = manager.list()
    processes = []
    for _ in range(5):
        p = multiprocessing.Process(target=worker, args=(shared_list,))
        processes.append(p)
        p.start()
    for p in processes:
        p.join()
    print(shared_list)

Using celery to execute an asynchronous task:

from celery import Celery

app = Celery("tasks", broker="redis://localhost:6379/0")

@app.task
def process_data(data):
    # 处理数据的逻辑
    pass

if __name__ == "__main__":
    result = process_data.delay(10)
    print(result.get())

Using celery for distributed task scheduling:

from celery import Celery

app = Celery("tasks", broker="redis://localhost:6379/0")

@app.task
def process_data(data):
    # 处理数据的逻辑
    pass

if __name__ == "__main__":
    result = process_data.apply_async(args=(10,), queue="high_priority")
    print(result.get())

Setting up periodic tasks with celery :

from celery import Celery
from celery.schedules import crontab

app = Celery('tasks', broker='redis://localhost:6379/0')

@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    # 每天早上6点执行一次任务
    sender.add_periodic_task(crontab(hour=6, minute=0), tasks.process_data.s())

@app.task
def process_data():
    # 处理数据的任务代码
    pass

Task result callback handling:

from celery import Celery

app = Celery('tasks', broker='redis://localhost:6379/0')

@app.task(bind=True)
def process_data(self):
    # 处理数据的任务代码
    result = do_some_work()
    # 回调处理任务结果
    callback_task.apply_async(args=(result,), link=self.request.callback)

@app.task
def callback_task(result):
    # 处理任务结果的回调任务代码
    pass

Task retry and error handling:

from celery import Celery
from celery.exceptions import MaxRetriesExceededError

app = Celery('tasks', broker='redis://localhost:6379/0')

@app.task(bind=True, max_retries=3)
def process_data(self):
    try:
        # 处理数据的任务代码
        result = do_some_work()
    except Exception as exc:
        # 发生异常时进行重试
        try:
            raise self.retry(exc=exc)
        except MaxRetriesExceededError:
            # 重试次数超过限制后的处理代码
            pass

Distributed task processing with celery groups:

from celery import Celery
from celery import group

app = Celery('tasks', broker='redis://localhost:6379/0')

@app.task
def process_data(data):
    # 处理数据的任务代码
    pass

def distribute_tasks(data_list):
    # 将任务分发给多个工作节点
    task_group = group(process_data.s(data) for data in data_list)
    result = task_group.apply_async()
    # 获取任务结果
    task_results = result.get()

These example snippets help you understand and apply parallel computing and distributed task processing techniques; choose the scenarios that match your needs and adapt the code accordingly.

PythonParallel ComputingCeleryTask Queuedistributed tasksmultiprocessing
Test Development Learning Exchange
Written by

Test Development Learning Exchange

Test Development Learning Exchange

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.