Understanding Concurrency, Parallelism, Synchronization, Asynchronous, Blocking, and Non‑blocking in Python with Code Examples
This article explains the key concepts of concurrency, parallelism, synchronization, asynchronous execution, blocking, and non‑blocking in Python, providing clear explanations and practical code samples for each concept, including API automation examples for HTTP requests.
In Python, understanding concurrency, parallelism, synchronization, asynchronous execution, blocking, and non‑blocking is essential for building high‑performance applications.
1. Concurrency
Concurrency means a program can handle multiple tasks within the same time slice, appearing to run simultaneously while actually interleaving execution.
Example: multithreading
import threading
import time
def worker():
print(f"Thread {threading.current_thread().name} started")
time.sleep(2)
print(f"Thread {threading.current_thread().name} finished")
# Create multiple threads
threads = []
for i in range(5):
thread = threading.Thread(target=worker, name=f"Thread-{i}")
threads.append(thread)
thread.start()
# Wait for all threads to finish
for thread in threads:
thread.join()
print("All threads finished")2. Parallelism
Parallelism refers to the ability of a program to truly execute multiple tasks at the same time, typically requiring multi‑core hardware.
Example: multiprocessing
import multiprocessing
import time
def worker():
print(f"Process {multiprocessing.current_process().name} started")
time.sleep(2)
print(f"Process {multiprocessing.current_process().name} finished")
# Create multiple processes
processes = []
for i in range(5):
process = multiprocessing.Process(target=worker, name=f"Process-{i}")
processes.append(process)
process.start()
# Wait for all processes to finish
for process in processes:
process.join()
print("All processes finished")3. Synchronization
Synchronization ensures safe access to shared resources in multithreaded or multiprocess environments using locks or other mechanisms.
Example: lock
import threading
import time
def worker(lock):
with lock:
print(f"Thread {threading.current_thread().name} started")
time.sleep(2)
print(f"Thread {threading.current_thread().name} finished")
lock = threading.Lock()
threads = []
for i in range(5):
thread = threading.Thread(target=worker, args=(lock,), name=f"Thread-{i}")
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
print("All threads finished")4. Asynchronous
Asynchronous programming allows a program to continue executing other tasks while waiting for an operation to complete, typically using callbacks or coroutines.
Example: asyncio
import asyncio
async def worker():
print(f"Worker {asyncio.current_task().get_name()} started")
await asyncio.sleep(2)
print(f"Worker {asyncio.current_task().get_name()} finished")
async def main():
tasks = []
for i in range(5):
task = asyncio.create_task(worker(), name=f"Worker-{i}")
tasks.append(task)
await asyncio.gather(*tasks)
asyncio.run(main())5. Blocking
Blocking operations pause program execution until the operation completes, such as a blocking I/O call.
Example: time.sleep
import time
def blocking_io():
print("Starting blocking IO")
time.sleep(5)
print("Finished blocking IO")
blocking_io()6. Non‑blocking
Non‑blocking operations allow the program to continue executing other tasks without waiting for the operation to finish, often used in network or file I/O.
Example: select with sockets
import select
import socket
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_socket.bind(('localhost', 8000))
server_socket.listen(5)
sockets_list = [server_socket]
def handle_client(client_socket):
request = client_socket.recv(1024)
print(f"Received: {request.decode()}")
client_socket.send(b"Hello, World!\n")
client_socket.close()
while True:
read_sockets, _, exception_sockets = select.select(sockets_list, [], sockets_list)
for notified_socket in read_sockets:
if notified_socket == server_socket:
client_socket, _ = server_socket.accept()
sockets_list.append(client_socket)
else:
handle_client(notified_socket)
for notified_socket in exception_sockets:
sockets_list.remove(notified_socket)
notified_socket.close()API Automation Code Samples
Below are practical examples of using the above concepts for HTTP request automation.
1. Concurrency – multithreaded HTTP requests
import threading
import requests
import time
def send_request(url, headers, payload):
response = requests.post(url, headers=headers, json=payload)
print(f"Response status code: {response.status_code}")
print(f"Response content: {response.text}")
test_cases = [
{"url": "https://api.example.com/v1/resource", "headers": {"Authorization": "Bearer token1"}, "payload": {"key": "value1"}},
{"url": "https://api.example.com/v1/resource", "headers": {"Authorization": "Bearer token2"}, "payload": {"key": "value2"}}
]
threads = []
for tc in test_cases:
t = threading.Thread(target=send_request, args=(tc["url"], tc["headers"], tc["payload"]))
threads.append(t)
t.start()
for t in threads:
t.join()
print("All requests finished")2. Parallelism – multiprocessing HTTP requests
import multiprocessing
import requests
import time
def send_request(url, headers, payload):
response = requests.post(url, headers=headers, json=payload)
print(f"Response status code: {response.status_code}")
print(f"Response content: {response.text}")
test_cases = [
{"url": "https://api.example.com/v1/resource", "headers": {"Authorization": "Bearer token1"}, "payload": {"key": "value1"}},
{"url": "https://api.example.com/v1/resource", "headers": {"Authorization": "Bearer token2"}, "payload": {"key": "value2"}}
]
processes = []
for tc in test_cases:
p = multiprocessing.Process(target=send_request, args=(tc["url"], tc["headers"], tc["payload"]))
processes.append(p)
p.start()
for p in processes:
p.join()
print("All requests finished")3. Synchronization – lock‑protected multithreaded HTTP requests
import threading
import requests
import time
def send_request(lock, url, headers, payload):
with lock:
response = requests.post(url, headers=headers, json=payload)
print(f"Response status code: {response.status_code}")
print(f"Response content: {response.text}")
test_cases = [
{"url": "https://api.example.com/v1/resource", "headers": {"Authorization": "Bearer token1"}, "payload": {"key": "value1"}},
{"url": "https://api.example.com/v1/resource", "headers": {"Authorization": "Bearer token2"}, "payload": {"key": "value2"}}
]
lock = threading.Lock()
threads = []
for tc in test_cases:
t = threading.Thread(target=send_request, args=(lock, tc["url"], tc["headers"], tc["payload"]))
threads.append(t)
t.start()
for t in threads:
t.join()
print("All requests finished")4. Asynchronous – asyncio HTTP requests with aiohttp
import asyncio
import aiohttp
async def send_request(url, headers, payload):
async with aiohttp.ClientSession() as session:
async with session.post(url, headers=headers, json=payload) as response:
print(f"Response status code: {response.status}")
text = await response.text()
print(f"Response content: {text}")
test_cases = [
{"url": "https://api.example.com/v1/resource", "headers": {"Authorization": "Bearer token1"}, "payload": {"key": "value1"}},
{"url": "https://api.example.com/v1/resource", "headers": {"Authorization": "Bearer token2"}, "payload": {"key": "value2"}}
]
async def main():
tasks = []
for tc in test_cases:
tasks.append(asyncio.create_task(send_request(tc["url"], tc["headers"], tc["payload"])) )
await asyncio.gather(*tasks)
asyncio.run(main())5. Blocking – sequential HTTP requests
import requests
import time
def send_request(url, headers, payload):
response = requests.post(url, headers=headers, json=payload)
print(f"Response status code: {response.status_code}")
print(f"Response content: {response.text}")
test_cases = [
{"url": "https://api.example.com/v1/resource", "headers": {"Authorization": "Bearer token1"}, "payload": {"key": "value1"}},
{"url": "https://api.example.com/v1/resource", "headers": {"Authorization": "Bearer token2"}, "payload": {"key": "value2"}}
]
for tc in test_cases:
send_request(tc["url"], tc["headers"], tc["payload"])
print("All requests finished")6. Non‑blocking – select‑based concurrent HTTP requests
import select
import socket
import requests
import time
def send_request(url, headers, payload):
response = requests.post(url, headers=headers, json=payload)
print(f"Response status code: {response.status_code}")
print(f"Response content: {response.text}")
test_cases = [
{"url": "https://api.example.com/v1/resource", "headers": {"Authorization": "Bearer token1"}, "payload": {"key": "value1"}},
{"url": "https://api.example.com/v1/resource", "headers": {"Authorization": "Bearer token2"}, "payload": {"key": "value2"}}
]
sockets_list = []
for tc in test_cases:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(("localhost", 8000))
sockets_list.append(sock)
while sockets_list:
ready, _, _ = select.select(sockets_list, [], [])
for sock in ready:
idx = sockets_list.index(sock)
send_request(test_cases[idx]["url"], test_cases[idx]["headers"], test_cases[idx]["payload"])
sockets_list.remove(sock)
print("All requests finished")Conclusion
The examples above illustrate how Python handles concurrency, parallelism, synchronization, asynchronous execution, blocking, and non‑blocking operations, providing practical code snippets that can be adapted for real‑world API automation and high‑performance application development.
Test Development Learning Exchange
Test Development Learning Exchange
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.