Fundamentals 25 min read

Master Python Concurrency: Threads, Processes, Async and Best Practices

This comprehensive guide explores Python concurrency and parallelism, covering threading, multiprocessing, async/await, the GIL, thread safety, thread pools, process pools, producer‑consumer patterns, performance testing, and practical best‑practice recommendations for choosing the right concurrency model.

Python Crawling & Data Mining
Python Crawling & Data Mining
Python Crawling & Data Mining
Master Python Concurrency: Threads, Processes, Async and Best Practices

Python Concurrency and Parallelism Guide

Welcome! In modern computing, leveraging multiple CPU cores and concurrent execution is essential for performance. This article dives deep into Python concurrency, from basic concepts to advanced techniques, helping you master multithreading, multiprocessing, and asynchronous programming.

1. Concurrency Basics

Concurrency means multiple tasks making progress simultaneously, while parallelism means tasks truly running at the same time on multiple cores. Python’s threading module provides threads, but the Global Interpreter Lock (GIL) limits true parallel execution for CPU‑bound code.

1.1 Multithreading

import time, threading, multiprocessing

def sequential_task():
    for i in range(3):
        time.sleep(1)
        print(f"顺序任务 {i}")

def concurrent_threading():
    def task(name):
        for i in range(3):
            time.sleep(1)
            print(f"{name} 任务 {i}")
    threads = []
    for name in ["线程A", "线程B"]:
        t = threading.Thread(target=task, args=(name,))
        threads.append(t)
        t.start()
    for t in threads:
        t.join()

def parallel_processing():
    def task(name):
        for i in range(3):
            time.sleep(1)
            print(f"{name} 任务 {i}")
    processes = []
    for name in ["进程A", "进程B"]:
        p = multiprocessing.Process(target=task, args=(name,))
        processes.append(p)
        p.start()
    for p in processes:
        p.join()

print("=== 顺序执行 ===")
start = time.time()
sequential_task()
print(f"顺序执行时间: {time.time() - start:.2f}s")

print("=== 多线程并发 ===")
start = time.time()
concurrent_threading()
print(f"多线程时间: {time.time() - start:.2f}s")

print("=== 多进程并行 ===")
start = time.time()
parallel_processing()
print(f"多进程时间: {time.time() - start:.2f}s")

The GIL allows only one thread to execute Python bytecode at a time, making multithreading unsuitable for CPU‑intensive tasks but still valuable for I/O‑bound workloads.

1.2 The Global Interpreter Lock (GIL)

Limitation : Only one thread runs Python bytecode simultaneously.

Impact : Multithreading does not speed up CPU‑bound tasks.

Exception : I/O‑bound tasks still benefit from threading.

Solution : Use multiprocessing or C extensions to bypass the GIL.

2. Multithreaded Programming

2.1 Thread Creation and Management

import threading, queue, random, time

class WorkerThread(threading.Thread):
    def __init__(self, thread_id, task_queue):
        super().__init__()
        self.thread_id = thread_id
        self.task_queue = task_queue
        self.daemon = True
    def run(self):
        while True:
            try:
                task = self.task_queue.get(timeout=1)
                print(f"线程 {self.thread_id} 处理任务: {task}")
                time.sleep(random.uniform(0.1, 0.5))
                self.task_queue.task_done()
            except queue.Empty:
                break

def threading_example():
    task_queue = queue.Queue()
    for i in range(20):
        task_queue.put(f"任务-{i}")
    threads = []
    for i in range(4):
        t = WorkerThread(i, task_queue)
        threads.append(t)
        t.start()
    task_queue.join()
    print("所有任务完成!")

threading_example()

2.2 Thread Synchronization and Locks

import threading, time

class BankAccount:
    def __init__(self, initial_balance=0):
        self.balance = initial_balance
        self.lock = threading.Lock()
    def deposit(self, amount):
        with self.lock:
            old = self.balance
            time.sleep(0.001)
            self.balance = old + amount
    def withdraw(self, amount):
        with self.lock:
            if self.balance >= amount:
                old = self.balance
                time.sleep(0.001)
                self.balance = old - amount
                return True
            return False
    def get_balance(self):
        with self.lock:
            return self.balance

def test_thread_safety():
    account = BankAccount(1000)
    def deposit_task():
        for _ in range(100):
            account.deposit(1)
    def withdraw_task():
        for _ in range(100):
            account.withdraw(1)
    threads = []
    for _ in range(5):
        threads.append(threading.Thread(target=deposit_task))
        threads.append(threading.Thread(target=withdraw_task))
    for t in threads:
        t.start()
    for t in threads:
        t.join()
    print(f"最终余额: {account.get_balance()}")
    print("预期余额: 1000 + 5*100 - 5*100 = 1000")

test_thread_safety()

2.3 Thread Communication (Producer‑Consumer)

import threading, queue, random, time

def producer_consumer():
    shared_queue = queue.Queue(maxsize=5)
    def producer():
        for i in range(10):
            item = f"产品-{i}"
            shared_queue.put(item)
            print(f"生产: {item}")
            time.sleep(random.uniform(0.1, 0.3))
        shared_queue.put(None)  # stop signal
    def consumer():
        while True:
            item = shared_queue.get()
            if item is None:
                shared_queue.put(None)
                break
            print(f"消费: {item}")
            time.sleep(random.uniform(0.2, 0.4))
            shared_queue.task_done()
    prod_thread = threading.Thread(target=producer)
    cons_threads = [threading.Thread(target=consumer) for _ in range(2)]
    prod_thread.start()
    for t in cons_threads:
        t.start()
    prod_thread.join()
    for t in cons_threads:
        t.join()
    print("生产消费完成")

producer_consumer()

3. Multiprocessing Programming

3.1 Process Creation and Management

import multiprocessing, os, time

def cpu_intensive_task(data_chunk):
    pid = os.getpid()
    result = sum(x*x for x in data_chunk)
    print(f"进程 {pid} 处理了 {len(data_chunk)} 个元素,结果: {result}")
    return result

def multiprocessing_example():
    data = list(range(1_000_000))
    chunk_size = len(data) // 4
    chunks = [data[i:i+chunk_size] for i in range(0, len(data), chunk_size)]
    with multiprocessing.Pool(processes=4) as pool:
        results = pool.map(cpu_intensive_task, chunks)
    total = sum(results)
    print(f"最终结果: {total}")

multiprocessing_example()

3.2 Inter‑Process Communication

import multiprocessing, time, queue

def worker_process(worker_id, task_queue, result_queue):
    while True:
        try:
            task = task_queue.get(timeout=5)
            if task == "STOP":
                break
            result = f"进程{worker_id}处理了: {task}"
            result_queue.put(result)
            time.sleep(0.1)
        except Exception:
            break

def process_communication():
    task_queue = multiprocessing.Queue()
    result_queue = multiprocessing.Queue()
    processes = []
    for i in range(3):
        p = multiprocessing.Process(target=worker_process, args=(i, task_queue, result_queue))
        processes.append(p)
        p.start()
    for i in range(10):
        task_queue.put(f"任务-{i}")
    for _ in range(3):
        task_queue.put("STOP")
    results = []
    while len(results) < 10:
        try:
            results.append(result_queue.get(timeout=1))
        except Exception:
            break
    for p in processes:
        p.join()
    print("所有结果:", results)

process_communication()

4. Asynchronous Programming and Coroutines

4.1 asyncio Basics

import asyncio, aiohttp

async def async_fetch(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            content = await response.text()
            return f"{url}: {len(content)} bytes"

async def concurrent_fetch():
    urls = ["https://httpbin.org/delay/1", "https://httpbin.org/delay/2", "https://httpbin.org/delay/1"]
    tasks = [async_fetch(url) for url in urls]
    results = await asyncio.gather(*tasks)
    for r in results:
        print(r)

asyncio.run(concurrent_fetch())

4.2 Asynchronous Producer‑Consumer

async def async_producer_consumer():
    queue = asyncio.Queue(maxsize=3)
    async def producer():
        for i in range(10):
            item = f"项目-{i}"
            await queue.put(item)
            print(f"生产: {item}")
            await asyncio.sleep(0.1)
        await queue.put(None)
    async def consumer():
        while True:
            item = await queue.get()
            if item is None:
                await queue.put(None)
                break
            print(f"消费: {item}")
            await asyncio.sleep(0.2)
            queue.task_done()
    prod = asyncio.create_task(producer())
    cons = [asyncio.create_task(consumer()) for _ in range(2)]
    await prod
    await queue.join()
    for c in cons:
        c.cancel()
    await asyncio.gather(*cons, return_exceptions=True)
    print("异步生产消费完成")

asyncio.run(async_producer_consumer())

5. Common Concurrency Issues and Solutions

5.1 Deadlock Prevention

import threading, time
lock_a = threading.Lock()
lock_b = threading.Lock()

def safe_task1():
    with lock_a:
        print("安全任务1获取锁A")
        time.sleep(0.1)
        with lock_b:
            print("安全任务1获取锁B")

def safe_task2():
    with lock_a:
        print("安全任务2获取锁A")
        time.sleep(0.1)
        with lock_b:
            print("安全任务2获取锁B")
threads = [threading.Thread(target=safe_task1), threading.Thread(target=safe_task2)]
for t in threads:
    t.start()
for t in threads:
    t.join()
print("安全完成")

5.2 Race Condition Handling

import threading, time
counter = 0
counter_lock = threading.Lock()

def unsafe_increment():
    global counter
    temp = counter
    time.sleep(0.001)
    counter = temp + 1

def safe_increment():
    global counter
    with counter_lock:
        temp = counter
        time.sleep(0.001)
        counter = temp + 1
# Test unsafe version
counter = 0
threads = [threading.Thread(target=unsafe_increment) for _ in range(100)]
for t in threads:
    t.start()
for t in threads:
    t.join()
print(f"不安全计数: {counter}")
# Test safe version
counter = 0
threads = [threading.Thread(target=safe_increment) for _ in range(100)]
for t in threads:
    t.start()
for t in threads:
    t.join()
print(f"安全计数: {counter}")

6. Performance Optimization and Debugging

6.1 Concurrency Performance Testing

import time, asyncio
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor

def io_work():
    time.sleep(0.01)
    return "done"

def cpu_work(n):
    return sum(i*i for i in range(n))

def concurrency_performance_test():
    test_cases = [
        ("顺序IO", lambda: [io_work() for _ in range(100)]),
        ("线程池IO", lambda: list(ThreadPoolExecutor(10).map(lambda _: io_work(), range(100)))),
        ("顺序CPU", lambda: [cpu_work(1000) for _ in range(10)]),
        ("进程池CPU", lambda: list(ProcessPoolExecutor(4).map(cpu_work, [1000]*10))
    ]
    for name, func in test_cases:
        start = time.time()
        result = func()
        elapsed = time.time() - start
        print(f"{name}: {elapsed:.3f}s, 结果数量: {len(result)}")

concurrency_performance_test()

6.2 Debugging Tips

import logging, threading, asyncio, traceback
logging.basicConfig(level=logging.DEBUG)

def debug_task(name):
    thread_id = threading.current_thread().ident
    logging.debug(f"线程 {thread_id} 执行任务 {name}")
    time.sleep(0.1)
    return f"{name}-完成"

def failing_task():
    try:
        raise ValueError("模拟错误")
    except Exception as e:
        logging.error(f"任务失败: {e}
{traceback.format_exc()}")
        raise

async def debug_concurrent_code():
    with ThreadPoolExecutor(2) as executor:
        future = executor.submit(failing_task)
        try:
            result = future.result(timeout=5)
        except Exception as e:
            logging.error(f"任务执行失败: {e}")

asyncio.run(debug_concurrent_code())

7. Real‑World Application Cases

7.1 Concurrent Web Requests

import asyncio, aiohttp

async def fetch_with_retry(session, url, retries=3):
    for attempt in range(retries):
        try:
            async with session.get(url, timeout=5) as response:
                if response.status == 200:
                    return await response.text()
                print(f"请求失败: {response.status}")
        except Exception as e:
            print(f"尝试 {attempt+1} 失败: {e}")
            await asyncio.sleep(1)
    return None

async def process_urls(urls):
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_with_retry(session, url) for url in urls]
        return await asyncio.gather(*tasks, return_exceptions=True)

urls = ["https://httpbin.org/get", "https://httpbin.org/delay/2", "https://httpbin.org/status/404", "https://httpbin.org/get"]
results = asyncio.run(process_urls(urls))
for url, result in zip(urls, results):
    if isinstance(result, Exception):
        print(f"{url}: 错误 - {result}")
    else:
        print(f"{url}: 成功 - {len(result)} 字节")

7.2 Parallel Data Processing

import time, multiprocessing

def process_data_chunk(chunk):
    return [item*2+1 for item in chunk]

def parallel_data_processing():
    data = list(range(1_000_000))
    chunk_size = len(data) // 8
    # Sequential
    start = time.time()
    seq_result = process_data_chunk(data)
    seq_time = time.time() - start
    # Parallel
    start = time.time()
    with multiprocessing.Pool(8) as pool:
        chunks = [data[i:i+chunk_size] for i in range(0, len(data), chunk_size)]
        results = list(pool.map(process_data_chunk, chunks))
    parallel_result = [item for sublist in results for item in sublist]
    par_time = time.time() - start
    print(f"顺序处理时间: {seq_time:.3f}s")
    print(f"并行处理时间: {par_time:.3f}s")
    print(f"加速比: {seq_time/par_time:.2f}x")
    print(f"结果验证: {seq_result == parallel_result}")

parallel_data_processing()

Conclusion: Choose the Right Concurrency Model

Scenario : I/O‑bound → Recommended : Multithreading / Async → Reason : Utilizes waiting time.

Scenario : CPU‑bound → Recommended : Multiprocessing → Reason : Bypasses GIL, uses multiple cores.

Scenario : Simple parallel tasks → Recommended : Process pool → Reason : Easy to use, automatic management.

Scenario : Complex coordination → Recommended : Manual thread/process management → Reason : Fine‑grained control.

Scenario : High‑concurrency I/O → Recommended : Async programming → Reason : Extremely high concurrency.

Best Practices :

Analyze first – determine I/O vs CPU bound.

Choose the appropriate model based on analysis.

Optimize with proper synchronization primitives and communication mechanisms.

Test thoroughly for correctness and performance under concurrent loads.

Feel free to share your toughest concurrency challenge and how you solved it in the comments!

Python Concurrency
Python Concurrency
PerformanceConcurrencyThread Safetybest-practicesGILasynciomultiprocessing
Python Crawling & Data Mining
Written by

Python Crawling & Data Mining

Life's short, I code in Python. This channel shares Python web crawling, data mining, analysis, processing, visualization, automated testing, DevOps, big data, AI, cloud computing, machine learning tools, resources, news, technical articles, tutorial videos and learning materials. Join us!

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.