python-aiomysql
aiomysql — Async MySQL for Python
aiomysql provides asyncio-native access to MySQL databases. It wraps PyMySQL with async/await support and exposes Connection, Cursor, and Pool primitives that mirror the synchronous DBAPI interface.
Requirements: Python 3.9+, PyMySQL. Install with:
pip install aiomysql
# Optional SQLAlchemy expression layer:
pip install aiomysql sqlalchemy
Core Principles
- Always use a connection pool (
create_pool) in production — never bareconnect()for long-lived services. - Always use async context managers (
async with) to guarantee connection and cursor release. - Never format SQL strings manually. Always pass parameters as the second argument to
execute(). - Commit explicitly;
autocommitdefaults toFalse. - Close the pool cleanly on shutdown:
pool.close()thenawait pool.wait_closed().
Connection Pool (Preferred Pattern)
Create one pool at application startup and share it across the lifetime of the service.
import asyncio
import aiomysql
async def create_app_pool() -> aiomysql.Pool:
return await aiomysql.create_pool(
host="127.0.0.1",
port=3306,
user="appuser",
password="secret",
db="mydb",
minsize=2,
maxsize=10,
autocommit=False,
pool_recycle=3600, # recycle stale connections after 1 h
charset="utf8mb4",
)
async def main() -> None:
pool = await create_app_pool()
try:
async with pool.acquire() as conn:
async with conn.cursor() as cur:
await cur.execute("SELECT 1")
(result,) = await cur.fetchone()
print(result) # 1
finally:
pool.close()
await pool.wait_closed()
asyncio.run(main())
Key parameters:
| Parameter | Default | Purpose |
|---|---|---|
minsize |
1 | Connections pre-created at startup |
maxsize |
10 | Hard ceiling on pool size |
pool_recycle |
-1 (off) | Seconds before a connection is recycled |
autocommit |
False | Set True for read-only workloads |
charset |
'' |
Use utf8mb4 for full Unicode support |
Parameterized Queries — SQL Injection Prevention
Pass values as the second argument to execute(). Never use f-strings or .format() for SQL parameters.
# CORRECT — parameterized
await cur.execute(
"SELECT id, name FROM users WHERE email = %s AND active = %s",
(email, True),
)
# CORRECT — bulk insert via executemany (batched automatically)
rows = [("Alice", "alice@example.com"), ("Bob", "bob@example.com")]
await cur.executemany(
"INSERT INTO users (name, email) VALUES (%s, %s)",
rows,
)
# WRONG — string interpolation, never do this
await cur.execute(f"SELECT * FROM users WHERE email = '{email}'")
Cursor Types
Choose the right cursor for the job:
| Cursor | Import | Returns | Use when |
|---|---|---|---|
Cursor (default) |
built-in | tuple |
General queries, small result sets |
DictCursor |
aiomysql.DictCursor |
dict |
Named-column access, readability |
SSCursor |
aiomysql.SSCursor |
tuple |
Large result sets (unbuffered) |
SSDictCursor |
aiomysql.SSDictCursor |
dict |
Large result sets, named columns |
# DictCursor — access columns by name
async with conn.cursor(aiomysql.DictCursor) as cur:
await cur.execute("SELECT id, name FROM users WHERE id = %s", (42,))
row = await cur.fetchone()
print(row["name"]) # "Alice"
# SSCursor — stream large result sets without buffering all rows in memory
async with conn.cursor(aiomysql.SSCursor) as cur:
await cur.execute("SELECT * FROM large_table")
async for row in cur:
process(row)
Transaction Management
Explicit transaction handling avoids silent data loss and partial writes.
async def transfer_funds(
pool: aiomysql.Pool, from_id: int, to_id: int, amount: float
) -> None:
async with pool.acquire() as conn:
async with conn.cursor() as cur:
try:
await conn.begin()
await cur.execute(
"UPDATE accounts SET balance = balance - %s WHERE id = %s",
(amount, from_id),
)
await cur.execute(
"UPDATE accounts SET balance = balance + %s WHERE id = %s",
(amount, to_id),
)
await conn.commit()
except Exception:
await conn.rollback()
raise
Never rely on auto-rollback. Always call conn.rollback() explicitly in the except block when autocommit=False.
Fetch Strategies
Choose the fetch method based on result size:
# Single row — stops fetching immediately
row = await cur.fetchone()
# All rows — fine for small to medium result sets
rows = await cur.fetchall()
# Paginated — process in chunks to bound memory usage
while True:
chunk = await cur.fetchmany(size=500)
if not chunk:
break
process_chunk(chunk)
Use SSCursor (unbuffered) for very large result sets instead of fetchall().
Accessing INSERT IDs and Row Counts
await cur.execute(
"INSERT INTO orders (user_id, total) VALUES (%s, %s)",
(user_id, total),
)
await conn.commit()
new_id = cur.lastrowid # AUTO_INCREMENT value of inserted row
affected = cur.rowcount # rows affected by last DML statement
Lifecycle: Single Connection (Scripts / Tests)
Use bare connect() only in short-lived scripts or test fixtures — not in services.
import aiomysql
async def run_script() -> None:
conn = await aiomysql.connect(
host="127.0.0.1", port=3306,
user="root", password="", db="mydb",
charset="utf8mb4",
)
try:
async with conn.cursor() as cur:
await cur.execute("SELECT VERSION()")
(version,) = await cur.fetchone()
print(f"MySQL {version}")
finally:
conn.close() # synchronous; flushes and closes socket
Note: conn.close() is synchronous. Use await conn.ensure_closed() when you need the async variant that sends a quit command before closing.
SQLAlchemy Expression Layer (aiomysql.sa)
Use aiomysql.sa for type-safe query construction when raw SQL becomes unwieldy.
import asyncio
import sqlalchemy as sa
from aiomysql.sa import create_engine
metadata = sa.MetaData()
users = sa.Table(
"users",
metadata,
sa.Column("id", sa.Integer, primary_key=True),
sa.Column("name", sa.String(128)),
sa.Column("email", sa.String(255)),
)
async def main() -> None:
engine = await create_engine(
user="appuser", password="secret",
host="127.0.0.1", db="mydb",
)
async with engine.acquire() as conn:
async with conn.begin() as tx:
await conn.execute(users.insert().values(name="Alice", email="a@x.com"))
await tx.commit()
result = await conn.execute(users.select())
async for row in result:
print(row.id, row.name)
engine.close()
await engine.wait_closed()
asyncio.run(main())
The SAConnection wraps aiomysql.Connection and provides .begin(), .begin_nested() (SAVEPOINT), .scalar(), and .execute() with SQLAlchemy expression support.
Common Mistakes
Forgetting await on cursor close: cur.close() is a coroutine — omitting await silently skips cleanup. Prefer async with conn.cursor() as cur to avoid this entirely.
Not recycling stale connections: Without pool_recycle, connections held longer than MySQL's wait_timeout (default 8 hours) become invalid. Set pool_recycle to a value shorter than the server's timeout.
Using fetchall() on large tables: Loads the entire result set into memory. Use fetchmany(size=N) or SSCursor for large datasets.
Mixing autocommit modes: Setting autocommit=True on the pool and calling await conn.commit() is harmless but redundant. Be explicit and consistent per workload.
Passing loop parameter in Python 3.10+: The loop parameter is deprecated and ignored in modern Python. Remove it from all connect() and create_pool() calls.
Additional Resources
Reference Files
references/connection-pool.md— Pool configuration,pool_recycle, sizing guidelines, and graceful shutdown patternsreferences/transactions-cursors.md— Transaction isolation levels, SAVEPOINT nesting, cursor type comparison, andcallprocusage
External Documentation
More from the-perfect-developer/the-perfect-opencode
json-style
This skill should be used when the user asks to "format JSON", "design JSON API", "write JSON response", "structure JSON data", or needs guidance on JSON naming conventions and best practices based on Google's JSON Style Guide.
3implementation
This skill should be used when the user asks to "implement a plan", "execute implementation", "build from plan", "implement feature", or needs to orchestrate execution of an implementation plan with specialized engineering agents.
3ideation
This skill should be used when the user asks to "ideate on", "brainstorm an idea", "let's argue this through", "challenge my thinking", "help me refine this concept", "let's stress-test this", or needs a structured framework for collaborative idea generation, adversarial refinement, and convergence toward the best possible solution. Load this skill before running any ideation or brainstorming session.
3capacitor
This skill should be used when the user asks to "build a Capacitor app", "add Capacitor to a web project", "use Capacitor plugins", "configure Capacitor for iOS or Android", or needs guidance on Capacitor best practices, security, storage, deep links, or the development workflow.
2python-mypy
This skill should be used when the user asks to "add mypy to a project", "set up static type checking", "fix mypy errors", "configure mypy", "annotate Python code with types", or needs guidance on Python type checking best practices with mypy.
2command-creation
This skill should be used when the user asks to "create a command", "add a custom command", "make a slash command", "create /command", or needs guidance on creating custom commands in OpenCode.
2