Databases 17 min read

Asynchronous Database Operations in Python with aiomysql, asyncpg, and SQLAlchemy

This guide demonstrates how to perform asynchronous CRUD operations on MySQL, PostgreSQL, and Redis using Python libraries such as aiomysql, asyncpg, and SQLAlchemy, covering connection setup, querying, inserting, updating, deleting, and handling result objects with practical code examples.

Python Programming Learning Circle
Python Programming Learning Circle
Python Programming Learning Circle
Asynchronous Database Operations in Python with aiomysql, asyncpg, and SQLAlchemy

Python 3.8+ provides asynchronous support for database drivers, allowing web services to handle other requests while waiting for database I/O.

The article first shows how to use aiomysql to connect to a MySQL server, create an async engine, acquire a connection, and execute SELECT statements. It demonstrates fetching a single record, all records, or a limited number of rows and shows how the returned RowProxy objects can be treated like dictionaries.

import asyncio
import aiomysql.sa as aio_sa

async def main():
    engine = await aio_sa.create_engine(host="xx.xxx.xx.xxx",
                                         port=3306,
                                         user="root",
                                         password="root",
                                         db="_hanser",
                                         connect_timeout=10)
    async with engine.acquire() as conn:
        result = await conn.execute("SELECT * FROM girl")
        data = await result.fetchone()
        print(data.keys())
        print(list(data.keys()))
        print(data.values())
        print(list(data.values()))
        print(data.items())
        print(list(data.items()))
        print(dict(data))
    engine.close()
    await engine.wait_closed()

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

Insertion with aiomysql is performed by building an INSERT statement via SQLAlchemy’s Table.insert() , executing it inside a transaction, and retrieving lastrowid and rowcount . Updating and deleting follow a similar pattern using Table.update() and Table.delete() .

import asyncio
from pprint import pprint
import aiomysql.sa as aio_sa
from sqlalchemy import Table, MetaData, create_engine

async def main():
    async with aio_sa.create_engine(host="xx.xx.xx.xxx",
                                    port=3306,
                                    user="root",
                                    password="root",
                                    db="_hanser",
                                    connect_timeout=10) as engine:
        async with engine.acquire() as conn:
            s_engine = create_engine("mysql+pymysql://root:[email protected]:3306/_hanser")
            tbl = Table("girl", MetaData(bind=s_engine), autoload=True)
            insert_sql = tbl.insert().values([
                {"name": "十六夜咲夜", "age": 17, "place": "红魔馆"},
                {"name": "琪露诺", "age": 60, "place": "雾之湖"}
            ])
            async with conn.begin():
                result = await conn.execute(insert_sql)
                print(result.lastrowid)
                print(result.rowcount)
        async with engine.acquire() as conn:
            data = await (await conn.execute("select * from girl")).fetchall()
            data = list(map(dict, data))
            pprint(data)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

For PostgreSQL the article compares asyncpg and aiopg , recommending asyncpg for its performance. It shows how to create a connection, fetch a single record with fetchrow , fetch multiple records with fetch , and explains that the returned Record behaves like a dictionary, supporting key access, get , and iteration over keys, values, and items.

import asyncio
from pprint import pprint
import asyncpg

async def main():
    conn = await asyncpg.connect(host="localhost",
                                 port=5432,
                                 user="postgres",
                                 password="zgghyys123",
                                 database="postgres",
                                 timeout=10)
    row1 = await conn.fetchrow("select * from girl")
    row2 = await conn.fetch("select * from girl")
    print(row1)
    pprint(row2)
    await conn.close()

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

The article also demonstrates how to work with the Record object, accessing fields by key, using get with default values, and converting the record to a dictionary.

import asyncio
import asyncpg

async def main():
    conn = await asyncpg.connect("postgres://postgres:zgghyys123@localhost:5432/postgres")
    row = await conn.fetchrow("select * from girl")
    print(type(row))
    print(row)
    print(row["id"], row["name"])
    print(row.get("id"), row.get("name"))
    print(row.get("xxx"), row.get("xxx", "不存在的字段"))
    print(list(row.keys()))
    print(list(row.values()))
    print(dict(row.items()))
    await conn.close()

if __name__ == '__main__':
    asyncio.run(main())

Insertion in PostgreSQL uses conn.execute for a single row and conn.executemany for bulk inserts, noting that executemany returns None and does not provide transaction semantics.

import asyncio
import asyncpg

async def main():
    conn = await asyncpg.connect("postgres://postgres:zgghyys123@localhost:5432/postgres")
    row = await conn.execute("insert into girl(name, age, place) values ($1, $2, $3)",
                             '十六夜咲夜', 17, '红魔馆')
    pprint(row)
    await conn.close()

asyncio.run(main())
import asyncio
import asyncpg

async def main():
    conn = await asyncpg.connect("postgres://postgres:zgghyys123@localhost:5432/postgres")
    rows = await conn.executemany(
        "insert into girl(name, age, place) values ($1, $2, $3)",
        [
            ('十六夜咲夜', 17, '红魔馆'),
            ('琪露诺', 60, '雾之湖')
        ]
    )
    print(rows)
    await conn.close()

asyncio.run(main())

The guide also shows how to combine asyncpg with SQLAlchemy’s Select objects by converting the SQL expression to a string before passing it to conn.fetch , and demonstrates placeholder usage with the $1 syntax.

import asyncio
from pprint import pprint
import asyncpg
from sqlalchemy.sql.selectable import Select
from sqlalchemy import text

async def main():
    conn = await asyncpg.connect("postgres://postgres:zgghyys123@localhost:5432/postgres")
    sql = Select([text("id, name, place")], whereclause=text("id != 1"), from_obj=text("girl"))
    rows = await conn.fetch(str(sql))
    pprint(list(map(dict, rows)))
    await conn.close()

if __name__ == '__main__':
    asyncio.run(main())

Finally, the article ends with a QR‑code promotion offering free Python learning resources.

PythonDatabaseMySQLPostgreSQLasyncioSQLAlchemy
Python Programming Learning Circle
Written by

Python Programming Learning Circle

A global community of Chinese Python developers offering technical articles, columns, original video tutorials, and problem sets. Topics include web full‑stack development, web scraping, data analysis, natural language processing, image processing, machine learning, automated testing, DevOps automation, and big data.

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.