Using Python multiprocessing and Celery for Parallel and Distributed Task Processing
This article introduces Python's multiprocessing module and the Celery task queue, explains their core concepts, and provides practical code examples for multi‑process parallel computation, inter‑process communication, asynchronous execution, scheduled jobs, result callbacks, retries, and distributed task orchestration.
When it comes to parallel computing and distributed task handling, Python offers powerful libraries such as multiprocessing and celery .
multiprocessing is a standard library module that enables multi‑process parallelism, offering process creation, management, inter‑process communication, and shared data capabilities.
celery is an open‑source distributed task queue framework for asynchronous tasks and scheduling, providing a scalable architecture that can dispatch work to multiple worker nodes.
Below are practical code snippets demonstrating how to use multiprocessing and celery for various parallel and distributed scenarios.
Using multiprocessing for multi‑process parallel computation:
import multiprocessing
def square(x):
return x ** 2
if __name__ == "__main__":
with multiprocessing.Pool() as pool:
numbers = [1, 2, 3, 4, 5]
results = pool.map(square, numbers)
print(results)Using multiprocessing for data parallel processing:
import multiprocessing
def process_data(data):
# 处理数据的逻辑
pass
if __name__ == "__main__":
with multiprocessing.Pool() as pool:
data = [1, 2, 3, 4, 5]
pool.map(process_data, data)Creating a process pool to execute asynchronous tasks with multiprocessing :
import multiprocessing
import time
def process_data(data):
# 模拟耗时的任务
time.sleep(1)
return data ** 2
if __name__ == "__main__":
with multiprocessing.Pool() as pool:
data = [1, 2, 3, 4, 5]
results = [pool.apply_async(process_data, (x,)) for x in data]
output = [r.get() for r in results]
print(output)Using multiprocessing for inter‑process communication and shared data:
import multiprocessing
def worker(shared_list):
shared_list.append("Data from worker")
if __name__ == "__main__":
manager = multiprocessing.Manager()
shared_list = manager.list()
processes = []
for _ in range(5):
p = multiprocessing.Process(target=worker, args=(shared_list,))
processes.append(p)
p.start()
for p in processes:
p.join()
print(shared_list)Using celery to execute an asynchronous task:
from celery import Celery
app = Celery("tasks", broker="redis://localhost:6379/0")
@app.task
def process_data(data):
# 处理数据的逻辑
pass
if __name__ == "__main__":
result = process_data.delay(10)
print(result.get())Using celery for distributed task scheduling:
from celery import Celery
app = Celery("tasks", broker="redis://localhost:6379/0")
@app.task
def process_data(data):
# 处理数据的逻辑
pass
if __name__ == "__main__":
result = process_data.apply_async(args=(10,), queue="high_priority")
print(result.get())Setting up periodic tasks with celery :
from celery import Celery
from celery.schedules import crontab
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
# 每天早上6点执行一次任务
sender.add_periodic_task(crontab(hour=6, minute=0), tasks.process_data.s())
@app.task
def process_data():
# 处理数据的任务代码
passTask result callback handling:
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task(bind=True)
def process_data(self):
# 处理数据的任务代码
result = do_some_work()
# 回调处理任务结果
callback_task.apply_async(args=(result,), link=self.request.callback)
@app.task
def callback_task(result):
# 处理任务结果的回调任务代码
passTask retry and error handling:
from celery import Celery
from celery.exceptions import MaxRetriesExceededError
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task(bind=True, max_retries=3)
def process_data(self):
try:
# 处理数据的任务代码
result = do_some_work()
except Exception as exc:
# 发生异常时进行重试
try:
raise self.retry(exc=exc)
except MaxRetriesExceededError:
# 重试次数超过限制后的处理代码
passDistributed task processing with celery groups:
from celery import Celery
from celery import group
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task
def process_data(data):
# 处理数据的任务代码
pass
def distribute_tasks(data_list):
# 将任务分发给多个工作节点
task_group = group(process_data.s(data) for data in data_list)
result = task_group.apply_async()
# 获取任务结果
task_results = result.get()These example snippets help you understand and apply parallel computing and distributed task processing techniques; choose the scenarios that match your needs and adapt the code accordingly.
Test Development Learning Exchange
Test Development Learning Exchange
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.