Beanie ODM
SKILL.md
Beanie ODM for MongoDB
This skill provides patterns for MongoDB integration using Beanie ODM with async Motor driver, optimized for FastAPI applications.
Database Initialization
Connection Setup
from beanie import init_beanie
from motor.motor_asyncio import AsyncIOMotorClient
from app.domains.users.models import User
from app.domains.products.models import Product
async def init_database(settings: Settings):
client = AsyncIOMotorClient(settings.mongodb_url)
await init_beanie(
database=client[settings.database_name],
document_models=[
User,
Product,
# Add all document models
]
)
Settings Configuration
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
mongodb_url: str = "mongodb://localhost:27017"
database_name: str = "app_db"
class Config:
env_file = ".env"
Document Models
Basic Document
from beanie import Document, Indexed
from pydantic import Field, EmailStr
from datetime import datetime
from typing import Optional
class User(Document):
email: Indexed(EmailStr, unique=True)
name: str
hashed_password: str
is_active: bool = True
created_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: datetime = Field(default_factory=datetime.utcnow)
class Settings:
name = "users" # Collection name
use_state_management = True
class Config:
json_schema_extra = {
"example": {
"email": "user@example.com",
"name": "John Doe"
}
}
Document with Relationships
from beanie import Document, Link, BackLink
from typing import List, Optional
class Author(Document):
name: str
books: List[BackLink["Book"]] = Field(original_field="author")
class Settings:
name = "authors"
class Book(Document):
title: str
author: Link[Author]
categories: List[Link["Category"]] = []
class Settings:
name = "books"
class Category(Document):
name: str
books: List[BackLink[Book]] = Field(original_field="categories")
class Settings:
name = "categories"
Embedded Documents
from beanie import Document
from pydantic import BaseModel
from typing import List
class Address(BaseModel):
street: str
city: str
country: str
postal_code: str
class Contact(BaseModel):
type: str # "email", "phone"
value: str
is_primary: bool = False
class Customer(Document):
name: str
addresses: List[Address] = []
contacts: List[Contact] = []
class Settings:
name = "customers"
Query Patterns
Basic CRUD Operations
# Create
user = User(email="user@example.com", name="John")
await user.insert()
# Create with validation
user = await User.insert_one(
User(email="user@example.com", name="John")
)
# Read by ID
user = await User.get(user_id)
# Read with filter
users = await User.find(User.is_active == True).to_list()
# Update
user.name = "Jane"
await user.save()
# Partial update
await user.set({User.name: "Jane", User.updated_at: datetime.utcnow()})
# Delete
await user.delete()
Advanced Queries
from beanie.operators import In, RegEx, And, Or
# Find with operators
active_users = await User.find(
And(
User.is_active == True,
User.created_at >= start_date
)
).to_list()
# Regex search
users = await User.find(
RegEx(User.name, "^John", options="i")
).to_list()
# In operator
users = await User.find(
In(User.email, ["a@test.com", "b@test.com"])
).to_list()
# Pagination
users = await User.find_all().skip(20).limit(10).to_list()
# Sorting
users = await User.find_all().sort(-User.created_at).to_list()
# Projection (select specific fields)
users = await User.find_all().project(UserSummary).to_list()
Aggregation Pipelines
from beanie import PydanticObjectId
class UserStats(BaseModel):
total_users: int
active_users: int
avg_age: float
# Aggregation pipeline
pipeline = [
{"$match": {"is_active": True}},
{"$group": {
"_id": None,
"total": {"$sum": 1},
"avg_age": {"$avg": "$age"}
}}
]
result = await User.aggregate(pipeline).to_list()
# Using Beanie aggregation
from beanie.odm.queries.aggregation import AggregationQuery
stats = await User.find(User.is_active == True).aggregate([
{"$group": {
"_id": "$department",
"count": {"$sum": 1}
}}
]).to_list()
Indexes
from beanie import Document, Indexed
from pymongo import IndexModel, ASCENDING, DESCENDING, TEXT
class Product(Document):
# Single field index
sku: Indexed(str, unique=True)
# Compound index defined in Settings
name: str
category: str
price: float
description: str
class Settings:
name = "products"
indexes = [
# Compound index
IndexModel(
[("category", ASCENDING), ("price", DESCENDING)],
name="category_price_idx"
),
# Text index
IndexModel(
[("name", TEXT), ("description", TEXT)],
name="search_idx"
),
# TTL index
IndexModel(
[("expires_at", ASCENDING)],
expireAfterSeconds=0,
name="ttl_idx"
)
]
Transactions
from beanie import Document
from motor.motor_asyncio import AsyncIOMotorClientSession
async def transfer_funds(
from_account_id: str,
to_account_id: str,
amount: float,
session: AsyncIOMotorClientSession
):
async with await session.start_transaction():
from_account = await Account.get(from_account_id, session=session)
to_account = await Account.get(to_account_id, session=session)
if from_account.balance < amount:
raise ValueError("Insufficient funds")
await from_account.set(
{Account.balance: from_account.balance - amount},
session=session
)
await to_account.set(
{Account.balance: to_account.balance + amount},
session=session
)
Service Layer Pattern
from typing import List, Optional
from beanie import PydanticObjectId
class UserService:
async def get_by_id(self, user_id: str) -> Optional[User]:
return await User.get(PydanticObjectId(user_id))
async def get_by_email(self, email: str) -> Optional[User]:
return await User.find_one(User.email == email)
async def get_all(
self,
skip: int = 0,
limit: int = 100,
is_active: Optional[bool] = None
) -> List[User]:
query = User.find_all()
if is_active is not None:
query = User.find(User.is_active == is_active)
return await query.skip(skip).limit(limit).to_list()
async def create(self, data: UserCreate) -> User:
user = User(**data.model_dump())
await user.insert()
return user
async def update(self, user_id: str, data: UserUpdate) -> Optional[User]:
user = await self.get_by_id(user_id)
if not user:
return None
update_data = data.model_dump(exclude_unset=True)
update_data["updated_at"] = datetime.utcnow()
await user.set(update_data)
return user
Additional Resources
Reference Files
For detailed patterns and migration guides:
references/migrations.md- Database migration strategiesreferences/performance.md- Query optimization tipsreferences/relationships.md- Link and BackLink patterns
Example Files
Working examples in examples/:
examples/document_models.py- Complete document definitionsexamples/aggregations.py- Aggregation pipeline examplesexamples/service.py- Service layer implementation