Practical Examples of Python multiprocessing and Celery for Parallel and Distributed Task Processing
This article introduces Python's multiprocessing module and the Celery distributed task queue, explains their core features, and provides ten practical code examples demonstrating multi‑process parallel computation, inter‑process communication, asynchronous tasks, scheduling, retries, and distributed processing for real‑world applications.
When dealing with parallel computing and distributed task handling, Python offers powerful libraries such as multiprocessing and celery . The multiprocessing module is part of the standard library and enables multi‑process parallelism with support for inter‑process communication and shared data. Celery is an open‑source distributed task queue framework that handles asynchronous tasks and scalable task distribution across multiple worker nodes.
Below are ten practical scenarios that illustrate how to use multiprocessing and celery for parallel and distributed processing.
1. Multiprocessing: simple parallel computation.
import multiprocessing
def square(x):
return x ** 2
if __name__ == "__main__":
with multiprocessing.Pool() as pool:
numbers = [1, 2, 3, 4, 5]
results = pool.map(square, numbers)
print(results)2. Multiprocessing: data‑parallel processing.
import multiprocessing
def process_data(data):
# 处理数据的逻辑
pass
if __name__ == "__main__":
with multiprocessing.Pool() as pool:
data = [1, 2, 3, 4, 5]
pool.map(process_data, data)3. Multiprocessing: asynchronous tasks with apply_async.
import multiprocessing, time
def process_data(data):
# 模拟耗时的任务
time.sleep(1)
return data ** 2
if __name__ == "__main__":
with multiprocessing.Pool() as pool:
data = [1, 2, 3, 4, 5]
results = [pool.apply_async(process_data, (x,)) for x in data]
output = [r.get() for r in results]
print(output)4. Multiprocessing: inter‑process communication and shared data.
import multiprocessing
def worker(shared_list):
shared_list.append("Data from worker")
if __name__ == "__main__":
manager = multiprocessing.Manager()
shared_list = manager.list()
processes = []
for _ in range(5):
p = multiprocessing.Process(target=worker, args=(shared_list,))
processes.append(p)
p.start()
for p in processes:
p.join()
print(shared_list)5. Celery: basic asynchronous task.
from celery import Celery
app = Celery("tasks", broker="redis://localhost:6379/0")
@app.task
def process_data(data):
# 处理数据的逻辑
pass
if __name__ == "__main__":
result = process_data.delay(10)
print(result.get())6. Celery: explicit task scheduling with a specific queue.
from celery import Celery
app = Celery("tasks", broker="redis://localhost:6379/0")
@app.task
def process_data(data):
# 处理数据的逻辑
pass
if __name__ == "__main__":
result = process_data.apply_async(args=(10,), queue="high_priority")
print(result.get())7. Celery: periodic (cron‑style) task scheduling.
from celery import Celery
from celery.schedules import crontab
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
# 每天早上6点执行一次任务
sender.add_periodic_task(crontab(hour=6, minute=0), tasks.process_data.s())
@app.task
def process_data():
# 处理数据的任务代码
pass8. Celery: task result callback handling.
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task(bind=True)
def process_data(self):
# 处理数据的任务代码
result = do_some_work()
# 回调处理任务结果
callback_task.apply_async(args=(result,), link=self.request.callback)
@app.task
def callback_task(result):
# 处理任务结果的回调任务代码
pass9. Celery: retry logic and error handling.
from celery import Celery
from celery.exceptions import MaxRetriesExceededError
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task(bind=True, max_retries=3)
def process_data(self):
try:
# 处理数据的任务代码
result = do_some_work()
except Exception as exc:
# 发生异常时进行重试
try:
raise self.retry(exc=exc)
except MaxRetriesExceededError:
# 重试次数超过限制后的处理代码
pass10. Celery: distributed task processing using groups.
from celery import Celery, group
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task
def process_data(data):
# 处理数据的任务代码
pass
def distribute_tasks(data_list):
# 将任务分发给多个工作节点
task_group = group(process_data.s(data) for data in data_list)
result = task_group.apply_async()
# 获取任务结果
task_results = result.get()
return task_resultsThese examples help you understand and apply parallel computing and distributed task processing techniques; you can select and adapt the scenarios that best fit your specific requirements.
Test Development Learning Exchange
Test Development Learning Exchange
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.