fastapi-patterns
FastAPI Patterns Skill
Provides comprehensive FastAPI development patterns and best practices for building modern, production-ready Python APIs.
When to Use This Skill
Activate this skill when working with:
- FastAPI application architecture and design
- Pydantic models and data validation
- SQLAlchemy ORM patterns and relationships
- Async/await patterns and performance optimization
- Dependency injection and middleware
- JWT authentication and authorization
- API documentation and OpenAPI schemas
Quick Reference
Development Commands
# Development server with auto-reload
uvicorn main:app --reload --host 0.0.0.0 --port 8000
# Production with workers
uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4
# With Gunicorn (recommended for production)
gunicorn main:app -w 4 -k uvicorn.workers.UvicornWorker -b 0.0.0.0:8000
# Install dependencies
pip install fastapi uvicorn[standard] sqlalchemy pydantic-settings python-jose passlib bcrypt
# Run tests
pytest tests/ -v --cov=app
Application Structure
app/
├── main.py # Application entry point
├── config.py # Settings and configuration
├── dependencies.py # Dependency injection
├── database.py # Database connection
├── models/ # SQLAlchemy models
│ ├── __init__.py
│ ├── user.py
│ ├── organization.py
│ └── base.py
├── schemas/ # Pydantic schemas
│ ├── __init__.py
│ ├── user.py
│ └── organization.py
├── routers/ # API endpoints
│ ├── __init__.py
│ ├── auth.py
│ ├── users.py
│ └── organizations.py
├── services/ # Business logic
│ ├── __init__.py
│ ├── auth_service.py
│ └── user_service.py
├── middleware/ # Custom middleware
│ └── auth.py
└── utils/ # Utility functions
├── security.py
└── validators.py
Application Setup with Lifespan
# main.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from contextlib import asynccontextmanager
from sqlalchemy.ext.asyncio import create_async_engine
from app.config import settings
from app.routers import auth, users, organizations
from app.database import init_db
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup: Initialize database connection
await init_db()
print("Database initialized")
yield
# Shutdown: Close connections
print("Shutting down...")
app = FastAPI(
title=settings.PROJECT_NAME,
version=settings.VERSION,
openapi_url=f"{settings.API_V1_STR}/openapi.json",
lifespan=lifespan
)
# CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=settings.ALLOWED_ORIGINS,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Include routers
app.include_router(auth.router, prefix=f"{settings.API_V1_STR}/auth", tags=["auth"])
app.include_router(users.router, prefix=f"{settings.API_V1_STR}/users", tags=["users"])
app.include_router(organizations.router, prefix=f"{settings.API_V1_STR}/organizations", tags=["organizations"])
@app.get("/health")
async def health_check():
return {"status": "healthy", "version": settings.VERSION}
Pydantic Settings and Configuration
# config.py
from pydantic_settings import BaseSettings, SettingsConfigDict
from typing import List
class Settings(BaseSettings):
# Application
PROJECT_NAME: str = "Zenith API"
VERSION: str = "1.0.0"
API_V1_STR: str = "/api/v1"
# Database
DATABASE_URL: str
DB_POOL_SIZE: int = 10
DB_MAX_OVERFLOW: int = 20
# Security
SECRET_KEY: str
ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
REFRESH_TOKEN_EXPIRE_DAYS: int = 7
# CORS
ALLOWED_ORIGINS: List[str] = ["http://localhost:3000"]
# Redis
REDIS_URL: str = "redis://localhost:6379"
model_config = SettingsConfigDict(
env_file=".env",
case_sensitive=True
)
settings = Settings()
SQLAlchemy Models with Async
# models/base.py
from sqlalchemy.ext.asyncio import AsyncAttrs
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from datetime import datetime
class Base(AsyncAttrs, DeclarativeBase):
pass
class TimestampMixin:
created_at: Mapped[datetime] = mapped_column(default=datetime.utcnow)
updated_at: Mapped[datetime] = mapped_column(default=datetime.utcnow, onupdate=datetime.utcnow)
# models/user.py
from sqlalchemy import String, Boolean
from sqlalchemy.orm import Mapped, mapped_column, relationship
from app.models.base import Base, TimestampMixin
class User(Base, TimestampMixin):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True, index=True)
email: Mapped[str] = mapped_column(String(255), unique=True, index=True)
hashed_password: Mapped[str] = mapped_column(String(255))
full_name: Mapped[str] = mapped_column(String(255))
is_active: Mapped[bool] = mapped_column(Boolean, default=True)
is_superuser: Mapped[bool] = mapped_column(Boolean, default=False)
# Relationships
organizations = relationship("Organization", secondary="user_organizations", back_populates="users")
Pydantic Schemas with Validation
# schemas/user.py
from pydantic import BaseModel, EmailStr, Field, ConfigDict
from datetime import datetime
from typing import Optional
class UserBase(BaseModel):
email: EmailStr
full_name: str = Field(..., min_length=1, max_length=255)
is_active: bool = True
class UserCreate(UserBase):
password: str = Field(..., min_length=8, max_length=100)
class UserUpdate(BaseModel):
email: Optional[EmailStr] = None
full_name: Optional[str] = Field(None, min_length=1, max_length=255)
password: Optional[str] = Field(None, min_length=8, max_length=100)
class UserInDB(UserBase):
id: int
created_at: datetime
updated_at: datetime
model_config = ConfigDict(from_attributes=True)
class UserResponse(UserInDB):
pass
Router with Dependency Injection
# routers/users.py
from fastapi import APIRouter, Depends, HTTPException, status, Query
from sqlalchemy.ext.asyncio import AsyncSession
from typing import List
from app.schemas.user import UserCreate, UserUpdate, UserResponse
from app.services.user_service import UserService
from app.dependencies import get_current_user, get_db
from app.models.user import User
router = APIRouter()
@router.post("/", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
async def create_user(
user_in: UserCreate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Create a new user."""
if not current_user.is_superuser:
raise HTTPException(status_code=403, detail="Not enough permissions")
service = UserService(db)
return await service.create(user_in)
@router.get("/", response_model=List[UserResponse])
async def list_users(
skip: int = Query(0, ge=0),
limit: int = Query(100, ge=1, le=1000),
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""List all users with pagination."""
service = UserService(db)
return await service.list(skip=skip, limit=limit)
@router.get("/{user_id}", response_model=UserResponse)
async def get_user(
user_id: int,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Get user by ID."""
service = UserService(db)
user = await service.get(user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
@router.patch("/{user_id}", response_model=UserResponse)
async def update_user(
user_id: int,
user_update: UserUpdate,
db: AsyncSession = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Update user."""
if current_user.id != user_id and not current_user.is_superuser:
raise HTTPException(status_code=403, detail="Not enough permissions")
service = UserService(db)
updated_user = await service.update(user_id, user_update)
if not updated_user:
raise HTTPException(status_code=404, detail="User not found")
return updated_user
Service Layer Pattern
# services/user_service.py
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from typing import List, Optional
from app.models.user import User
from app.schemas.user import UserCreate, UserUpdate
from app.utils.security import get_password_hash
class UserService:
def __init__(self, db: AsyncSession):
self.db = db
async def create(self, user_in: UserCreate) -> User:
"""Create a new user."""
db_user = User(
email=user_in.email,
full_name=user_in.full_name,
hashed_password=get_password_hash(user_in.password),
is_active=user_in.is_active
)
self.db.add(db_user)
await self.db.commit()
await self.db.refresh(db_user)
return db_user
async def get(self, user_id: int) -> Optional[User]:
"""Get user by ID."""
result = await self.db.execute(select(User).where(User.id == user_id))
return result.scalar_one_or_none()
async def get_by_email(self, email: str) -> Optional[User]:
"""Get user by email."""
result = await self.db.execute(select(User).where(User.email == email))
return result.scalar_one_or_none()
async def list(self, skip: int = 0, limit: int = 100) -> List[User]:
"""List users with pagination."""
result = await self.db.execute(select(User).offset(skip).limit(limit))
return list(result.scalars().all())
async def update(self, user_id: int, user_update: UserUpdate) -> Optional[User]:
"""Update user."""
user = await self.get(user_id)
if not user:
return None
update_data = user_update.model_dump(exclude_unset=True)
if "password" in update_data:
update_data["hashed_password"] = get_password_hash(update_data.pop("password"))
for field, value in update_data.items():
setattr(user, field, value)
await self.db.commit()
await self.db.refresh(user)
return user
JWT Authentication
# utils/security.py
from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
from passlib.context import CryptContext
from app.config import settings
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def get_password_hash(password: str) -> str:
return pwd_context.hash(password)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
return encoded_jwt
def decode_access_token(token: str) -> Optional[dict]:
try:
payload = jwt.decode(token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
return payload
except JWTError:
return None
# dependencies.py
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.ext.asyncio import AsyncSession
from app.database import get_db
from app.services.user_service import UserService
from app.utils.security import decode_access_token
from app.models.user import User
security = HTTPBearer()
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: AsyncSession = Depends(get_db)
) -> User:
"""Get current authenticated user from JWT token."""
token = credentials.credentials
payload = decode_access_token(token)
if payload is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials"
)
email: str = payload.get("sub")
if email is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials"
)
service = UserService(db)
user = await service.get_by_email(email)
if user is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found"
)
if not user.is_active:
raise HTTPException(status_code=400, detail="Inactive user")
return user
Database Connection Pool
# database.py
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from app.config import settings
from app.models.base import Base
engine = create_async_engine(
settings.DATABASE_URL,
echo=True,
pool_size=settings.DB_POOL_SIZE,
max_overflow=settings.DB_MAX_OVERFLOW,
pool_pre_ping=True
)
AsyncSessionLocal = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False
)
async def init_db():
"""Initialize database tables."""
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
async def get_db() -> AsyncSession:
"""Dependency for database sessions."""
async with AsyncSessionLocal() as session:
try:
yield session
finally:
await session.close()
Background Tasks
from fastapi import BackgroundTasks
import asyncio
async def send_email_notification(email: str, message: str):
"""Background task to send email."""
await asyncio.sleep(2) # Simulate email sending
print(f"Email sent to {email}: {message}")
@router.post("/register")
async def register_user(
user_in: UserCreate,
background_tasks: BackgroundTasks,
db: AsyncSession = Depends(get_db)
):
"""Register user and send welcome email in background."""
service = UserService(db)
user = await service.create(user_in)
# Add background task
background_tasks.add_task(
send_email_notification,
user.email,
"Welcome to Zenith!"
)
return user
Exception Handling
from fastapi import Request
from fastapi.responses import JSONResponse
from sqlalchemy.exc import IntegrityError
@app.exception_handler(IntegrityError)
async def integrity_error_handler(request: Request, exc: IntegrityError):
return JSONResponse(
status_code=400,
content={"detail": "Database integrity error. Resource may already exist."}
)
@app.exception_handler(ValueError)
async def value_error_handler(request: Request, exc: ValueError):
return JSONResponse(
status_code=400,
content={"detail": str(exc)}
)
Best Practices
- Structure: Use clear separation of concerns (routers, services, models, schemas)
- Async: Leverage async/await for all I/O operations
- Dependencies: Use dependency injection for testability and reusability
- Validation: Utilize Pydantic for request/response validation
- Security: Implement JWT authentication with proper token validation
- Database: Use connection pooling and proper session management
- Error Handling: Define custom exception handlers for common errors
- Documentation: FastAPI auto-generates OpenAPI docs at
/docs - Testing: Write unit tests for services and integration tests for endpoints
- Configuration: Use Pydantic Settings for environment-based config
Performance Tips
- Use
asyncdatabase drivers (asyncpg for PostgreSQL) - Implement caching with Redis for frequently accessed data
- Use background tasks for non-critical operations
- Leverage connection pooling for database efficiency
- Implement pagination for list endpoints
- Use
SELECTspecific columns instead of loading entire models when possible - Consider using
lazy loadingfor relationships - Monitor with middleware for request timing and logging
More from lobbi-docs/claude
vision-multimodal
Vision and multimodal capabilities for Claude including image analysis, PDF processing, and document understanding. Activate for image input, base64 encoding, multiple images, and visual analysis.
242design-system
Apply and manage the AI-powered design system with 50+ curated styles
126complex-reasoning
Multi-step reasoning patterns and frameworks for systematic problem solving. Activate for Chain-of-Thought, Tree-of-Thought, hypothesis-driven debugging, and structured analytical approaches that leverage extended thinking.
105gcp
Google Cloud Platform services including GKE, Cloud Run, Cloud Storage, BigQuery, and Pub/Sub. Activate for GCP infrastructure, Google Cloud deployment, and GCP integration.
73kanban
Kanban methodology including boards, WIP limits, flow metrics, and continuous delivery. Activate for Kanban boards, workflow visualization, and lean project management.
62debugging
Debugging techniques for Python, JavaScript, and distributed systems. Activate for troubleshooting, error analysis, log investigation, and performance debugging. Includes extended thinking integration for complex debugging scenarios.
59