Backend Development 7 min read

Using asyncio and aiohttp for Asynchronous HTTP Requests in Python

This article demonstrates how to use Python's asyncio library together with the aiohttp package to perform asynchronous HTTP requests, covering installation, concurrent request execution, GET and POST examples, error handling, timeouts, SSL verification, and proxy usage, with complete code snippets.

Test Development Learning Exchange
Test Development Learning Exchange
Test Development Learning Exchange
Using asyncio and aiohttp for Asynchronous HTTP Requests in Python

Introduction

In Python, the asyncio library enables asynchronous programming, and the third‑party aiohttp library provides an asynchronous HTTP client/server framework that integrates seamlessly with asyncio .

1. Install aiohttp

First, ensure aiohttp is installed:

pip install aiohttp

2. Example: Concurrently execute multiple HTTP requests

The following code shows how to fetch several URLs concurrently using asyncio.gather :

import asyncio
import aiohttp

async def fetch(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    urls = [
        "https://api.example.com/data1",
        "https://api.example.com/data2",
        "https://api.example.com/data3"
    ]
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url) for url in urls]
        responses = await asyncio.gather(*tasks)
    for response in responses:
        print(response)

asyncio.run(main())

3. Example: Simulate a GET request

Simple GET request example:

import asyncio
import aiohttp

async def fetch(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    url = "https://api.example.com/data"
    async with aiohttp.ClientSession() as session:
        response_text = await fetch(session, url)
        print(response_text)

asyncio.run(main())

4. Example: Simulate a POST request

import asyncio
import aiohttp

async def post(session, url, payload):
    async with session.post(url, json=payload) as response:
        return await response.text()

async def main():
    url = "https://api.example.com/data"
    payload = {"key": "value"}
    async with aiohttp.ClientSession() as session:
        response_text = await post(session, url, payload)
        print(response_text)

asyncio.run(main())

5. Example: Handle HTTP errors

import asyncio
import aiohttp

async def fetch(session, url):
    async with session.get(url) as response:
        if response.status == 200:
            return await response.text()
        else:
            raise Exception(f"Failed to fetch data from {url}")

async def main():
    url = "https://api.example.com/data"
    async with aiohttp.ClientSession() as session:
        try:
            response_text = await fetch(session, url)
            print(response_text)
        except Exception as e:
            print(f"Error: {e}")

asyncio.run(main())

6. Example: Use request timeout

import asyncio
import aiohttp

async def fetch(session, url):
    try:
        async with session.get(url, timeout=5) as response:
            return await response.text()
    except asyncio.TimeoutError:
        raise Exception("Request timed out")

async def main():
    url = "https://api.example.com/data"
    async with aiohttp.ClientSession() as session:
        try:
            response_text = await fetch(session, url)
            print(response_text)
        except Exception as e:
            print(f"Error: {e}")

asyncio.run(main())

7. Example: Use SSL verification

import asyncio
import aiohttp
import ssl

async def fetch(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    url = "https://api.example.com/data"
    ssl_context = ssl.create_default_context()
    async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(ssl=ssl_context)) as session:
        response_text = await fetch(session, url)
        print(response_text)

asyncio.run(main())

8. Example: Use a proxy

import asyncio
import aiohttp

async def fetch(session, url):
    async with session.get(url) as response:
        return await response.text()

async def main():
    url = "https://api.example.com/data"
    proxy_url = "http://proxy.example.com:8080"
    async with aiohttp.ClientSession(connector=aiohttp.TCPConnector(proxy=proxy_url)) as session:
        response_text = await fetch(session, url)
        print(response_text)

asyncio.run(main())

Conclusion

The examples above illustrate how to combine asyncio and aiohttp to perform asynchronous HTTP operations in Python, including concurrent requests, GET/POST methods, error handling, timeouts, SSL verification, and proxy usage.

BackendasynchronousHTTPasyncioaiohttp
Test Development Learning Exchange
Written by

Test Development Learning Exchange

Test Development Learning Exchange

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.