SQLAlchemy 2.0+
Installation
SKILL.md
SQLAlchemy 2.0+ Skill
Quick Start
Basic Setup
from sqlalchemy.ext.asyncio import AsyncAttrs, async_sessionmaker, create_async_engine, AsyncSession
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
import asyncio
# Base class for models
class Base(AsyncAttrs, DeclarativeBase):
pass
# Async engine
engine = create_async_engine("postgresql+asyncpg://user:pass@localhost/db")
# Session factory
async_session = async_sessionmaker(engine, expire_on_commit=False)
# Example model
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(50))
email: Mapped[str] = mapped_column(String(100))
Basic CRUD Operations
async def create_user(name: str, email: str) -> User:
async with async_session() as session:
async with session.begin():
user = User(name=name, email=email)
session.add(user)
await session.flush() # Get the ID
return user
async def get_user(user_id: int) -> User | None:
async with async_session() as session:
result = await session.execute(select(User).where(User.id == user_id))
return result.scalar_one_or_none()
async def update_user_email(user_id: int, new_email: str) -> bool:
async with async_session() as session:
result = await session.execute(
update(User).where(User.id == user_id).values(email=new_email)
)
await session.commit()
return result.rowcount > 0
Common Patterns
Models
Annotated Type-Safe Models (Recommended)
from typing_extensions import Annotated
from typing import List, Optional
# Reusable column types
intpk = Annotated[int, mapped_column(primary_key=True)]
str50 = Annotated[str, mapped_column(String(50))]
created_at = Annotated[datetime, mapped_column(insert_default=func.now())]
class Post(Base):
__tablename__ = "posts"
id: Mapped[intpk]
title: Mapped[str50]
content: Mapped[str] = mapped_column(Text)
author_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
created: Mapped[created_at]
# Relationships
author: Mapped["User"] = relationship(back_populates="posts")
tags: Mapped[List["Tag"]] = relationship(secondary="post_tags")
Classic Style Models
class Post(Base):
__tablename__ = "posts"
id = mapped_column(Integer, primary_key=True)
title = mapped_column(String(50))
content = mapped_column(Text)
author_id = mapped_column(ForeignKey("users.id"))
author = relationship("User", back_populates="posts")
Relationships
One-to-Many
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
posts: Mapped[List["Post"]] = relationship(
back_populates="author",
cascade="all, delete-orphan"
)
class Post(Base):
__tablename__ = "posts"
id: Mapped[int] = mapped_column(primary_key=True)
author_id: Mapped[int] = mapped_column(ForeignKey("users.id"))
author: Mapped["User"] = relationship(back_populates="posts")
Many-to-Many
association_table = Table(
"post_tags",
Base.metadata,
Column("post_id", ForeignKey("posts.id"), primary_key=True),
Column("tag_id", ForeignKey("tags.id"), primary_key=True)
)
class Post(Base):
__tablename__ = "posts"
id: Mapped[int] = mapped_column(primary_key=True)
tags: Mapped[List["Tag"]] = relationship(
secondary=association_table,
back_populates="posts"
)
class Tag(Base):
__tablename__ = "tags"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(50), unique=True)
posts: Mapped[List["Post"]] = relationship(
secondary=association_table,
back_populates="tags"
)
Queries
Basic Select
from sqlalchemy import select, and_, or_
# Get all users
async def get_all_users():
async with async_session() as session:
result = await session.execute(select(User))
return result.scalars().all()
# Filter with conditions
async def get_users_by_name(name: str):
async with async_session() as session:
stmt = select(User).where(User.name.ilike(f"%{name}%"))
result = await session.execute(stmt)
return result.scalars().all()
# Complex conditions
async def search_users(name: str = None, email: str = None):
async with async_session() as session:
conditions = []
if name:
conditions.append(User.name.ilike(f"%{name}%"))
if email:
conditions.append(User.email.ilike(f"%{email}%"))
if conditions:
stmt = select(User).where(and_(*conditions))
else:
stmt = select(User)
result = await session.execute(stmt)
return result.scalars().all()
Relationship Loading
from sqlalchemy.orm import selectinload, joinedload
# Eager load relationships
async def get_posts_with_author():
async with async_session() as session:
stmt = select(Post).options(selectinload(Post.author))
result = await session.execute(stmt)
return result.scalars().all()
# Joined loading for single relationships
async def get_post_with_tags(post_id: int):
async with async_session() as session:
stmt = select(Post).options(
joinedload(Post.author),
selectinload(Post.tags)
).where(Post.id == post_id)
result = await session.execute(stmt)
return result.scalar_one_or_none()
Pagination
async def get_posts_paginated(page: int, size: int):
async with async_session() as session:
offset = (page - 1) * size
stmt = select(Post).offset(offset).limit(size).order_by(Post.created.desc())
result = await session.execute(stmt)
return result.scalars().all()
Aggregations
from sqlalchemy import func
async def get_user_post_count():
async with async_session() as session:
stmt = (
select(User.name, func.count(Post.id).label("post_count"))
.join(Post)
.group_by(User.id, User.name)
.order_by(func.count(Post.id).desc())
)
result = await session.execute(stmt)
return result.all()
Sessions Management
Context Manager Pattern
async def create_post(title: str, content: str, author_id: int):
async with async_session() as session:
async with session.begin():
post = Post(title=title, content=content, author_id=author_id)
session.add(post)
return post
Dependency Injection (FastAPI)
from fastapi import Depends
async def get_db_session():
async with async_session() as session:
try:
yield session
finally:
await session.close()
async def create_user_endpoint(
user_data: UserCreate,
session: AsyncSession = Depends(get_db_session)
):
user = User(**user_data.dict())
session.add(user)
await session.commit()
await session.refresh(user)
return user
Scoped Sessions
from sqlalchemy.ext.asyncio import async_scoped_session
import asyncio
# Create scoped session
async_session_scope = async_scoped_session(
async_sessionmaker(engine, expire_on_commit=False),
scopefunc=asyncio.current_task
)
# Use in application
async def some_function():
session = async_session_scope()
# Use session normally
await session.commit()
Advanced Patterns
Write-Only Relationships (Memory Efficient)
from sqlalchemy.orm import WriteOnlyMapped
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
posts: WriteOnlyMapped["Post"] = relationship()
async def get_user_posts(user_id: int):
async with async_session() as session:
user = await session.get(User, user_id)
if user:
# Explicit select for collection
stmt = select(Post).where(Post.author_id == user_id)
result = await session.execute(stmt)
return result.scalars().all()
return []
Custom Session Classes
class AsyncSessionWithDefaults(AsyncSession):
async def execute_with_defaults(self, statement, **kwargs):
# Add default options
return await self.execute(statement, **kwargs)
# Use custom session
async_session = async_sessionmaker(
engine,
class_=AsyncSessionWithDefaults,
expire_on_commit=False
)
Connection Routing
class RoutingSession(Session):
def get_bind(self, mapper=None, clause=None, **kw):
if mapper and issubclass(mapper.class_, ReadOnlyModel):
return read_engine
return write_engine
class AsyncRoutingSession(AsyncSession):
sync_session_class = RoutingSession
Raw SQL
from sqlalchemy import text
async def run_raw_sql():
async with async_session() as session:
result = await session.execute(text("SELECT COUNT(*) FROM users"))
count = result.scalar()
return count
async def run_parameterized_query(user_id: int):
async with async_session() as session:
stmt = text("SELECT * FROM posts WHERE author_id = :user_id")
result = await session.execute(stmt, {"user_id": user_id})
return result.fetchall()
Performance Tips
- Use selectinload for collections: More efficient than lazy loading
- Batch operations: Use
add_all()for bulk inserts - Connection pooling: Configure pool size based on load
- Index columns: Add indexes for frequently queried columns
- Use streaming: For large result sets, use
stream()
# Streaming large results
async def process_all_users():
async with async_session() as session:
result = await session.stream(select(User))
async for user in result.scalars():
# Process user without loading all into memory
await process_user(user)
Requirements
uv add sqlalchemy[asyncio] # Core SQLAlchemy
uv add asyncpg # PostgreSQL async driver
# or
uv add aiosqlite # SQLite async driver
# or
uv add aiomysql # MySQL async driver
Database URLs
- PostgreSQL:
postgresql+asyncpg://user:pass@localhost/db - SQLite:
sqlite+aiosqlite:///database.db - MySQL:
mysql+aiomysql://user:pass@localhost/db
Migration Integration
Use Alembic for database migrations:
# Generate migration
uv run alembic revision --autogenerate -m "Add users table"
# Apply migrations
uv run alembic upgrade head
Related skills
More from bossjones/boss-file-utils
httpx
A next-generation HTTP client for Python with both sync and async support, perfect for modern Python applications
1pydantic
Data validation and settings management using Python type annotations with Pydantic v2
1pytest
Python testing framework for writing simple, scalable, and powerful tests
1alembic
Database migration management for SQLAlchemy projects using Alembic
1uvicorn
ASGI server for Python web applications - Fast, production-ready server for async frameworks
1apscheduler
Advanced Python Scheduler - Task scheduling and job queue system
1