Master Python Concurrency: Threads, Processes, Async and Best Practices
This comprehensive guide explores Python concurrency and parallelism, covering threading, multiprocessing, async/await, the GIL, thread safety, thread pools, process pools, producer‑consumer patterns, performance testing, and practical best‑practice recommendations for choosing the right concurrency model.
Python Concurrency and Parallelism Guide
Welcome! In modern computing, leveraging multiple CPU cores and concurrent execution is essential for performance. This article dives deep into Python concurrency, from basic concepts to advanced techniques, helping you master multithreading, multiprocessing, and asynchronous programming.
1. Concurrency Basics
Concurrency means multiple tasks making progress simultaneously, while parallelism means tasks truly running at the same time on multiple cores. Python’s threading module provides threads, but the Global Interpreter Lock (GIL) limits true parallel execution for CPU‑bound code.
1.1 Multithreading
import time, threading, multiprocessing
def sequential_task():
for i in range(3):
time.sleep(1)
print(f"顺序任务 {i}")
def concurrent_threading():
def task(name):
for i in range(3):
time.sleep(1)
print(f"{name} 任务 {i}")
threads = []
for name in ["线程A", "线程B"]:
t = threading.Thread(target=task, args=(name,))
threads.append(t)
t.start()
for t in threads:
t.join()
def parallel_processing():
def task(name):
for i in range(3):
time.sleep(1)
print(f"{name} 任务 {i}")
processes = []
for name in ["进程A", "进程B"]:
p = multiprocessing.Process(target=task, args=(name,))
processes.append(p)
p.start()
for p in processes:
p.join()
print("=== 顺序执行 ===")
start = time.time()
sequential_task()
print(f"顺序执行时间: {time.time() - start:.2f}s")
print("=== 多线程并发 ===")
start = time.time()
concurrent_threading()
print(f"多线程时间: {time.time() - start:.2f}s")
print("=== 多进程并行 ===")
start = time.time()
parallel_processing()
print(f"多进程时间: {time.time() - start:.2f}s")The GIL allows only one thread to execute Python bytecode at a time, making multithreading unsuitable for CPU‑intensive tasks but still valuable for I/O‑bound workloads.
1.2 The Global Interpreter Lock (GIL)
Limitation : Only one thread runs Python bytecode simultaneously.
Impact : Multithreading does not speed up CPU‑bound tasks.
Exception : I/O‑bound tasks still benefit from threading.
Solution : Use multiprocessing or C extensions to bypass the GIL.
2. Multithreaded Programming
2.1 Thread Creation and Management
import threading, queue, random, time
class WorkerThread(threading.Thread):
def __init__(self, thread_id, task_queue):
super().__init__()
self.thread_id = thread_id
self.task_queue = task_queue
self.daemon = True
def run(self):
while True:
try:
task = self.task_queue.get(timeout=1)
print(f"线程 {self.thread_id} 处理任务: {task}")
time.sleep(random.uniform(0.1, 0.5))
self.task_queue.task_done()
except queue.Empty:
break
def threading_example():
task_queue = queue.Queue()
for i in range(20):
task_queue.put(f"任务-{i}")
threads = []
for i in range(4):
t = WorkerThread(i, task_queue)
threads.append(t)
t.start()
task_queue.join()
print("所有任务完成!")
threading_example()2.2 Thread Synchronization and Locks
import threading, time
class BankAccount:
def __init__(self, initial_balance=0):
self.balance = initial_balance
self.lock = threading.Lock()
def deposit(self, amount):
with self.lock:
old = self.balance
time.sleep(0.001)
self.balance = old + amount
def withdraw(self, amount):
with self.lock:
if self.balance >= amount:
old = self.balance
time.sleep(0.001)
self.balance = old - amount
return True
return False
def get_balance(self):
with self.lock:
return self.balance
def test_thread_safety():
account = BankAccount(1000)
def deposit_task():
for _ in range(100):
account.deposit(1)
def withdraw_task():
for _ in range(100):
account.withdraw(1)
threads = []
for _ in range(5):
threads.append(threading.Thread(target=deposit_task))
threads.append(threading.Thread(target=withdraw_task))
for t in threads:
t.start()
for t in threads:
t.join()
print(f"最终余额: {account.get_balance()}")
print("预期余额: 1000 + 5*100 - 5*100 = 1000")
test_thread_safety()2.3 Thread Communication (Producer‑Consumer)
import threading, queue, random, time
def producer_consumer():
shared_queue = queue.Queue(maxsize=5)
def producer():
for i in range(10):
item = f"产品-{i}"
shared_queue.put(item)
print(f"生产: {item}")
time.sleep(random.uniform(0.1, 0.3))
shared_queue.put(None) # stop signal
def consumer():
while True:
item = shared_queue.get()
if item is None:
shared_queue.put(None)
break
print(f"消费: {item}")
time.sleep(random.uniform(0.2, 0.4))
shared_queue.task_done()
prod_thread = threading.Thread(target=producer)
cons_threads = [threading.Thread(target=consumer) for _ in range(2)]
prod_thread.start()
for t in cons_threads:
t.start()
prod_thread.join()
for t in cons_threads:
t.join()
print("生产消费完成")
producer_consumer()3. Multiprocessing Programming
3.1 Process Creation and Management
import multiprocessing, os, time
def cpu_intensive_task(data_chunk):
pid = os.getpid()
result = sum(x*x for x in data_chunk)
print(f"进程 {pid} 处理了 {len(data_chunk)} 个元素,结果: {result}")
return result
def multiprocessing_example():
data = list(range(1_000_000))
chunk_size = len(data) // 4
chunks = [data[i:i+chunk_size] for i in range(0, len(data), chunk_size)]
with multiprocessing.Pool(processes=4) as pool:
results = pool.map(cpu_intensive_task, chunks)
total = sum(results)
print(f"最终结果: {total}")
multiprocessing_example()3.2 Inter‑Process Communication
import multiprocessing, time, queue
def worker_process(worker_id, task_queue, result_queue):
while True:
try:
task = task_queue.get(timeout=5)
if task == "STOP":
break
result = f"进程{worker_id}处理了: {task}"
result_queue.put(result)
time.sleep(0.1)
except Exception:
break
def process_communication():
task_queue = multiprocessing.Queue()
result_queue = multiprocessing.Queue()
processes = []
for i in range(3):
p = multiprocessing.Process(target=worker_process, args=(i, task_queue, result_queue))
processes.append(p)
p.start()
for i in range(10):
task_queue.put(f"任务-{i}")
for _ in range(3):
task_queue.put("STOP")
results = []
while len(results) < 10:
try:
results.append(result_queue.get(timeout=1))
except Exception:
break
for p in processes:
p.join()
print("所有结果:", results)
process_communication()4. Asynchronous Programming and Coroutines
4.1 asyncio Basics
import asyncio, aiohttp
async def async_fetch(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
content = await response.text()
return f"{url}: {len(content)} bytes"
async def concurrent_fetch():
urls = ["https://httpbin.org/delay/1", "https://httpbin.org/delay/2", "https://httpbin.org/delay/1"]
tasks = [async_fetch(url) for url in urls]
results = await asyncio.gather(*tasks)
for r in results:
print(r)
asyncio.run(concurrent_fetch())4.2 Asynchronous Producer‑Consumer
async def async_producer_consumer():
queue = asyncio.Queue(maxsize=3)
async def producer():
for i in range(10):
item = f"项目-{i}"
await queue.put(item)
print(f"生产: {item}")
await asyncio.sleep(0.1)
await queue.put(None)
async def consumer():
while True:
item = await queue.get()
if item is None:
await queue.put(None)
break
print(f"消费: {item}")
await asyncio.sleep(0.2)
queue.task_done()
prod = asyncio.create_task(producer())
cons = [asyncio.create_task(consumer()) for _ in range(2)]
await prod
await queue.join()
for c in cons:
c.cancel()
await asyncio.gather(*cons, return_exceptions=True)
print("异步生产消费完成")
asyncio.run(async_producer_consumer())5. Common Concurrency Issues and Solutions
5.1 Deadlock Prevention
import threading, time
lock_a = threading.Lock()
lock_b = threading.Lock()
def safe_task1():
with lock_a:
print("安全任务1获取锁A")
time.sleep(0.1)
with lock_b:
print("安全任务1获取锁B")
def safe_task2():
with lock_a:
print("安全任务2获取锁A")
time.sleep(0.1)
with lock_b:
print("安全任务2获取锁B")
threads = [threading.Thread(target=safe_task1), threading.Thread(target=safe_task2)]
for t in threads:
t.start()
for t in threads:
t.join()
print("安全完成")5.2 Race Condition Handling
import threading, time
counter = 0
counter_lock = threading.Lock()
def unsafe_increment():
global counter
temp = counter
time.sleep(0.001)
counter = temp + 1
def safe_increment():
global counter
with counter_lock:
temp = counter
time.sleep(0.001)
counter = temp + 1
# Test unsafe version
counter = 0
threads = [threading.Thread(target=unsafe_increment) for _ in range(100)]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"不安全计数: {counter}")
# Test safe version
counter = 0
threads = [threading.Thread(target=safe_increment) for _ in range(100)]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"安全计数: {counter}")6. Performance Optimization and Debugging
6.1 Concurrency Performance Testing
import time, asyncio
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
def io_work():
time.sleep(0.01)
return "done"
def cpu_work(n):
return sum(i*i for i in range(n))
def concurrency_performance_test():
test_cases = [
("顺序IO", lambda: [io_work() for _ in range(100)]),
("线程池IO", lambda: list(ThreadPoolExecutor(10).map(lambda _: io_work(), range(100)))),
("顺序CPU", lambda: [cpu_work(1000) for _ in range(10)]),
("进程池CPU", lambda: list(ProcessPoolExecutor(4).map(cpu_work, [1000]*10))
]
for name, func in test_cases:
start = time.time()
result = func()
elapsed = time.time() - start
print(f"{name}: {elapsed:.3f}s, 结果数量: {len(result)}")
concurrency_performance_test()6.2 Debugging Tips
import logging, threading, asyncio, traceback
logging.basicConfig(level=logging.DEBUG)
def debug_task(name):
thread_id = threading.current_thread().ident
logging.debug(f"线程 {thread_id} 执行任务 {name}")
time.sleep(0.1)
return f"{name}-完成"
def failing_task():
try:
raise ValueError("模拟错误")
except Exception as e:
logging.error(f"任务失败: {e}
{traceback.format_exc()}")
raise
async def debug_concurrent_code():
with ThreadPoolExecutor(2) as executor:
future = executor.submit(failing_task)
try:
result = future.result(timeout=5)
except Exception as e:
logging.error(f"任务执行失败: {e}")
asyncio.run(debug_concurrent_code())7. Real‑World Application Cases
7.1 Concurrent Web Requests
import asyncio, aiohttp
async def fetch_with_retry(session, url, retries=3):
for attempt in range(retries):
try:
async with session.get(url, timeout=5) as response:
if response.status == 200:
return await response.text()
print(f"请求失败: {response.status}")
except Exception as e:
print(f"尝试 {attempt+1} 失败: {e}")
await asyncio.sleep(1)
return None
async def process_urls(urls):
async with aiohttp.ClientSession() as session:
tasks = [fetch_with_retry(session, url) for url in urls]
return await asyncio.gather(*tasks, return_exceptions=True)
urls = ["https://httpbin.org/get", "https://httpbin.org/delay/2", "https://httpbin.org/status/404", "https://httpbin.org/get"]
results = asyncio.run(process_urls(urls))
for url, result in zip(urls, results):
if isinstance(result, Exception):
print(f"{url}: 错误 - {result}")
else:
print(f"{url}: 成功 - {len(result)} 字节")7.2 Parallel Data Processing
import time, multiprocessing
def process_data_chunk(chunk):
return [item*2+1 for item in chunk]
def parallel_data_processing():
data = list(range(1_000_000))
chunk_size = len(data) // 8
# Sequential
start = time.time()
seq_result = process_data_chunk(data)
seq_time = time.time() - start
# Parallel
start = time.time()
with multiprocessing.Pool(8) as pool:
chunks = [data[i:i+chunk_size] for i in range(0, len(data), chunk_size)]
results = list(pool.map(process_data_chunk, chunks))
parallel_result = [item for sublist in results for item in sublist]
par_time = time.time() - start
print(f"顺序处理时间: {seq_time:.3f}s")
print(f"并行处理时间: {par_time:.3f}s")
print(f"加速比: {seq_time/par_time:.2f}x")
print(f"结果验证: {seq_result == parallel_result}")
parallel_data_processing()Conclusion: Choose the Right Concurrency Model
Scenario : I/O‑bound → Recommended : Multithreading / Async → Reason : Utilizes waiting time.
Scenario : CPU‑bound → Recommended : Multiprocessing → Reason : Bypasses GIL, uses multiple cores.
Scenario : Simple parallel tasks → Recommended : Process pool → Reason : Easy to use, automatic management.
Scenario : Complex coordination → Recommended : Manual thread/process management → Reason : Fine‑grained control.
Scenario : High‑concurrency I/O → Recommended : Async programming → Reason : Extremely high concurrency.
Best Practices :
Analyze first – determine I/O vs CPU bound.
Choose the appropriate model based on analysis.
Optimize with proper synchronization primitives and communication mechanisms.
Test thoroughly for correctness and performance under concurrent loads.
Feel free to share your toughest concurrency challenge and how you solved it in the comments!
Python Crawling & Data Mining
Life's short, I code in Python. This channel shares Python web crawling, data mining, analysis, processing, visualization, automated testing, DevOps, big data, AI, cloud computing, machine learning tools, resources, news, technical articles, tutorial videos and learning materials. Join us!
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
