Boost Your Asyncio Apps: 9 Proven Tricks to Reach 3000+ Requests/sec
This article reveals nine advanced, battle‑tested asyncio performance techniques—including uvloop, memory‑aware task patterns, semaphore limits, executor offloading, graceful cancellation, debugging helpers, connection pooling, loop saturation monitoring, and leak‑free background tasks—that can transform a modest 200‑request‑per‑second service into a 3000+‑request‑per‑second production‑grade system.
Today I’ll share lesser‑known techniques for building high‑performance asyncio projects that can turn a 200‑req/s application into a 3000+‑req/s powerhouse.
Secret #1: The event loop isn’t magic (you can make it faster)
Most developers don’t know that the default asyncio event loop is decent, but for true high performance you should switch to uvloop .
import asyncio
import uvloop
# This line can give 2‑4× performance boost
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
# Or for Python 3.11+
with asyncio.Runner(loop_factory=uvloop.new_event_loop) as runner:
runner.run(main())uvloop is implemented in Cython and uses libuv—the same C library behind Node.js. In my benchmark switching to uvloop raised echo‑server throughput by 300% and also improves memory allocation and garbage collection.
Secret #2: Memory management is your silent performance killer
In asyncio the biggest performance trap isn’t CPU usage but memory leaks caused by poor task handling; I’ve seen production systems balloon from 100 MB to 2 GB.
Task reference leaks
# ❌ This pattern leaks memory
async def bad_pattern():
tasks = []
for i in range(10000):
task = asyncio.create_task(some_coroutine(i))
tasks.append(task)
# If some_coroutine() raises, task references stay forever
# Worse: we only await the first 100 tasks
await asyncio.gather(*tasks[:100])
# ✅ Correct task management
async def good_pattern():
tasks = []
try:
for i in range(10000):
task = asyncio.create_task(some_coroutine(i))
tasks.append(task)
# Always await all tasks and handle exceptions
results = await asyncio.gather(*tasks, return_exceptions=True)
for i, result in enumerate(results):
if isinstance(result, Exception):
logger.error(f"Task {i} failed: {result}")
finally:
# Ensure all tasks are cleaned up
for task in tasks:
if not task.done():
task.cancel()
if tasks:
await asyncio.gather(*tasks, return_exceptions=True)Secret #3: The secret semaphore pattern
This pattern completely changed how I handle memory‑intensive operations.
async def memory_conscious_processor():
# Limit concurrent memory‑intensive operations to 50
semaphore = asyncio.Semaphore(50)
async def process_item(item, semaphore):
async with semaphore:
result = await heavy_processing(item)
return result
items = range(10000)
tasks = [asyncio.create_task(process_item(item, semaphore)) for item in items]
results = await asyncio.gather(*tasks, return_exceptions=True)
return [r for r in results if not isinstance(r, Exception)]Secret #4: run_in_executor performance trick
When your asyncio code contains CPU‑bound work, using run_in_executor is not just a best practice—it’s a performance multiplier.
import asyncio
import concurrent.futures
from functools import partial
# ❌ This blocks the whole event loop
async def slow_sync_operation():
result = expensive_cpu_calculation() # blocks 2 seconds
return result
# ✅ Keep the loop responsive
async def fast_async_operation():
loop = asyncio.get_event_loop()
# Run CPU‑bound work in a thread pool
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
result = await loop.run_in_executor(executor, expensive_cpu_calculation)
return result
# ✅ Better: reuse the executor
class PerformantService:
def __init__(self):
self.executor = concurrent.futures.ThreadPoolExecutor(max_workers=4)
async def process(self, data):
loop = asyncio.get_event_loop()
return await loop.run_in_executor(self.executor, partial(expensive_cpu_calculation, data))
async def cleanup(self):
self.executor.shutdown(wait=True)Secret #5: Truly effective task cancellation
Cancelling asyncio tasks sounds simple, but doing it correctly in production is an art. The following pattern works reliably.
class GracefulTaskManager:
def __init__(self):
self.tasks = set()
def create_task(self, coro):
task = asyncio.create_task(coro)
self.tasks.add(task)
task.add_done_callback(self.tasks.discard)
return task
async def cancel_all(self, timeout=5.0):
if not self.tasks:
return
for task in self.tasks:
task.cancel()
try:
await asyncio.wait_for(asyncio.gather(*self.tasks, return_exceptions=True), timeout=timeout)
except asyncio.TimeoutError:
for task in self.tasks:
if not task.done():
task.cancel()
await asyncio.gather(*self.tasks, return_exceptions=True)
self.tasks.clear()
# Usage example
async def main():
manager = GracefulTaskManager()
for i in range(100):
manager.create_task(long_running_operation(i))
await manager.cancel_all()Secret #6: Debugging tips that save your sanity
Debugging asyncio in production can be painful; this decorator helps monitor slow coroutines.
import asyncio
import functools
import time
import logging
def monitor_slow_coroutines(threshold=0.1):
"""Decorator to log coroutines that exceed a duration threshold."""
def decorator(func):
@functools.wraps(func)
async def wrapper(*args, **kwargs):
start = time.perf_counter()
try:
result = await func(*args, **kwargs)
return result
finally:
duration = time.perf_counter() - start
if duration > threshold:
logging.warning(f"Slow coroutine: {func.__name__} took {duration:.3f}s")
return wrapper
return decorator
# Enable asyncio debug mode in development
asyncio.run(main(), debug=True)
loop = asyncio.get_event_loop()
loop.slow_callback_duration = 0.1Secret #7: Truly scalable connection pool
Creating a new aiohttp session for every request is a performance killer. Reuse a configured session instead.
# ❌ New session per request
async def slow_http_client():
async with aiohttp.ClientSession() as session:
async with session.get('https://api.example.com') as response:
return await response.json()
# ✅ Reuse connections with a pool
class HighPerformanceHTTPClient:
def __init__(self):
connector = aiohttp.TCPConnector(limit=100, limit_per_host=30, ttl_dns_cache=300, use_dns_cache=True, keepalive_timeout=30, enable_cleanup_closed=True)
timeout = aiohttp.ClientTimeout(total=30, connect=5, sock_read=10)
self.session = aiohttp.ClientSession(connector=connector, timeout=timeout)
async def get(self, url):
async with self.session.get(url) as response:
return await response.json()
async def close(self):
await self.session.close()
# Usage
client = HighPerformanceHTTPClient()
try:
results = await asyncio.gather(*[client.get(f'https://api.example.com/item/{i}') for i in range(1000)])
finally:
await client.close()Secret #8: Event loop saturation detection mode
Detect when the event loop is overloaded by measuring actual sleep intervals.
import asyncio
import time
import logging
class EventLoopMonitor:
def __init__(self, check_interval=1.0):
self.check_interval = check_interval
self.last_check = time.perf_counter()
self.monitoring_task = None
async def start_monitoring(self):
self.monitoring_task = asyncio.create_task(self._monitor_loop())
async def _monitor_loop(self):
while True:
await asyncio.sleep(self.check_interval)
current_time = time.perf_counter()
actual_interval = current_time - self.last_check
if actual_interval > self.check_interval * 1.5:
logging.warning(f"Event loop saturation detected: expected {self.check_interval}s, actual {actual_interval:.3f}s")
self.last_check = current_time
async def stop_monitoring(self):
if self.monitoring_task:
self.monitoring_task.cancel()
await asyncio.gather(self.monitoring_task, return_exceptions=True)Secret #9: No‑leak background task pattern
Background tasks are notorious for leaking memory. The following manager tracks and cleans them up safely.
class BackgroundTaskManager:
def __init__(self):
self.background_tasks = set()
def create_background_task(self, coro):
task = asyncio.create_task(coro)
self.background_tasks.add(task)
task.add_done_callback(self.background_tasks.discard)
task.add_done_callback(self._handle_task_result)
return task
def _handle_task_result(self, task):
try:
task.result()
except asyncio.CancelledError:
pass
except Exception as e:
logging.exception(f"Background task failed: {e}")
async def shutdown(self):
for task in list(self.background_tasks):
task.cancel()
if self.background_tasks:
await asyncio.gather(*self.background_tasks, return_exceptions=True)
# Usage
manager = BackgroundTaskManager()
manager.create_background_task(periodic_cleanup())
manager.create_background_task(heartbeat_monitor())
await manager.shutdown()Real‑world performance test
Below is a complete example that combines all the tricks.
import asyncio
import aiohttp
import uvloop
import time
from contextlib import asynccontextmanager
class HighPerformanceAsyncService:
def __init__(self):
self.semaphore = asyncio.Semaphore(100)
self.task_manager = BackgroundTaskManager()
self.monitor = EventLoopMonitor()
connector = aiohttp.TCPConnector(limit=100, limit_per_host=20, ttl_dns_cache=300, use_dns_cache=True)
self.session = aiohttp.ClientSession(connector=connector)
@asynccontextmanager
async def managed_lifecycle(self):
await self.monitor.start_monitoring()
try:
yield self
finally:
await self.task_manager.shutdown()
await self.monitor.stop_monitoring()
await self.session.close()
async def process_batch(self, urls):
async def process_url(url):
async with self.semaphore:
async with self.session.get(url) as response:
return await response.json()
tasks = [self.task_manager.create_background_task(process_url(url)) for url in urls]
return await asyncio.gather(*tasks, return_exceptions=True)
async def main():
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
urls = [f'https://api.example.com/item/{i}' for i in range(1000)]
async with HighPerformanceAsyncService().managed_lifecycle() as service:
start_time = time.perf_counter()
results = await service.process_batch(urls)
duration = time.perf_counter() - start_time
success_count = sum(1 for r in results if not isinstance(r, Exception))
print(f"Processed {success_count}/{len(urls)} URLs in {duration:.2f}s")
print(f"Rate: {len(urls)/duration:.1f} requests/second")
if __name__ == "__main__":
asyncio.run(main())Conclusion
These patterns are not just theoretical optimizations—they are battle‑tested techniques that can elevate an asyncio application from amateur to production‑grade. The gap between a slow and a fast asyncio service lies not only in syntax but in deep understanding of memory management, task lifecycles, and event‑loop tuning.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Code Mala Tang
Read source code together, write articles together, and enjoy spicy hot pot together.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
