multi-tenancy
SKILL.md
Multi-Tenancy Patterns
Implement multi-tenant SaaS architectures with subdomain routing, schema isolation, tenant middleware, and security verification.
When to Use This Skill
- Building multi-tenant SaaS applications
- Implementing tenant resolution from subdomains or headers
- Designing schema-per-tenant database isolation
- Creating tenant-aware middleware in FastAPI
- Implementing Row-Level Security as an alternative to schema isolation
- Testing cross-tenant data isolation
Tenant Resolution
Subdomain Extraction Middleware (FastAPI)
import re
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.responses import Response
class TenantMiddleware(BaseHTTPMiddleware):
"""Resolve tenant from subdomain, header, or JWT claim."""
SKIP_PATHS = {"/docs", "/redoc", "/openapi.json", "/health", "/v1/auth"}
SUBDOMAIN_PATTERN = re.compile(r"^([a-z][a-z0-9-]{1,62})\.app\.")
async def dispatch(self, request: Request, call_next) -> Response:
if request.url.path in self.SKIP_PATHS:
return await call_next(request)
tenant_slug = self._resolve_tenant(request)
request.state.tenant_slug = tenant_slug
return await call_next(request)
def _resolve_tenant(self, request: Request) -> str | None:
"""Resolve tenant using priority: subdomain > header > JWT claim."""
# 1. Subdomain: acme.app.easyfactu.es → "acme"
host = request.headers.get("host", "")
match = self.SUBDOMAIN_PATTERN.match(host)
if match:
return match.group(1)
# 2. Header: X-Tenant-ID
header_tenant = request.headers.get("X-Tenant-ID")
if header_tenant:
return header_tenant.lower().strip()
# 3. JWT claim (resolved later in auth dependency)
return None
Tenant Context Dependency
from fastapi import Depends, HTTPException, Request
from pydantic import BaseModel
class TenantContext(BaseModel):
"""Immutable tenant context for the current request."""
slug: str
schema_name: str
async def get_current_tenant(
request: Request,
user: AuthUser = Depends(get_current_user),
) -> TenantContext:
"""Resolve and validate the current tenant."""
# Priority: middleware-resolved slug > JWT tenant_id
slug = getattr(request.state, "tenant_slug", None) or user.tenant_id
if not slug:
raise HTTPException(
status_code=400,
detail="Tenant context required. Use subdomain or X-Tenant-ID header.",
)
# Validate slug format
if not re.match(r"^[a-z][a-z0-9-]{1,62}$", slug):
raise HTTPException(status_code=400, detail="Invalid tenant identifier")
return TenantContext(
slug=slug,
schema_name=f"tenant_{slug}",
)
JWT Claim-Based Tenant
from jose import jwt
# When creating JWT claims (during login/signup)
def create_tenant_jwt_claims(user_id: str, tenant_id: str, role: str) -> dict:
"""Add tenant context to Supabase user metadata."""
return {
"sub": user_id,
"tenant_id": tenant_id,
"role": role,
"aud": "authenticated",
}
# In auth dependency, extract tenant from JWT
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
) -> AuthUser:
payload = jwt.decode(credentials.credentials, SECRET, algorithms=["HS256"])
return AuthUser(
id=payload["sub"],
tenant_id=payload.get("tenant_id"),
role=payload.get("role", "user"),
)
Schema-per-Tenant (PostgreSQL)
Dynamic Schema Creation
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession
async def create_tenant_schema(
session: AsyncSession,
tenant_slug: str,
) -> None:
"""Create a new schema for a tenant with standard tables."""
schema_name = f"tenant_{tenant_slug}"
# Create schema
await session.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name}"))
# Create standard tables
await session.execute(text(f"""
CREATE TABLE IF NOT EXISTS {schema_name}.invoices (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
invoice_number TEXT NOT NULL,
customer_id UUID NOT NULL,
amount DECIMAL(12,2) NOT NULL CHECK (amount > 0),
currency TEXT NOT NULL DEFAULT 'EUR',
status TEXT NOT NULL DEFAULT 'draft',
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
UNIQUE(invoice_number)
)
"""))
await session.execute(text(f"""
CREATE TABLE IF NOT EXISTS {schema_name}.customers (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name TEXT NOT NULL,
nif TEXT NOT NULL,
email TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
UNIQUE(nif)
)
"""))
await session.commit()
Schema Migration Strategy (Alembic)
# alembic/env.py
import os
from alembic import context
from sqlalchemy import text
def run_migrations_online():
"""Run migrations for a specific tenant schema."""
schema = os.environ.get("TENANT_SCHEMA", "public")
connectable = engine_from_config(config.get_section("alembic"))
with connectable.connect() as connection:
# Set search_path to target schema
connection.execute(text(f"SET search_path TO {schema}, public"))
context.configure(
connection=connection,
target_metadata=metadata,
version_table_schema=schema,
)
with context.begin_transaction():
context.run_migrations()
# Run migration for a specific tenant
TENANT_SCHEMA=tenant_acme alembic upgrade head
# Run for all tenants
for schema in $(psql -t -c "SELECT schema_name FROM information_schema.schemata WHERE schema_name LIKE 'tenant_%'"); do
TENANT_SCHEMA=$schema alembic upgrade head
done
Connection with search_path Switching
from collections.abc import AsyncIterator
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker
async def get_tenant_session(
session_factory: async_sessionmaker,
tenant: TenantContext,
) -> AsyncIterator[AsyncSession]:
"""Provide a database session scoped to a tenant's schema."""
async with session_factory() as session:
await session.execute(
text(f"SET search_path TO {tenant.schema_name}, public")
)
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
# Reset search_path
await session.execute(text("SET search_path TO public"))
FastAPI Dependency
from fastapi import Depends, Request
async def get_db(
request: Request,
tenant: TenantContext = Depends(get_current_tenant),
) -> AsyncIterator[AsyncSession]:
"""Database session scoped to the current tenant."""
session_factory = request.app.state.session_factory
async for session in get_tenant_session(session_factory, tenant):
yield session
Row-Level Security (Alternative)
RLS Policy Creation
-- Enable RLS
ALTER TABLE invoices ENABLE ROW LEVEL SECURITY;
-- Tenant isolation policy
CREATE POLICY "tenant_isolation" ON invoices
FOR ALL
USING (tenant_id = current_setting('app.current_tenant', true))
WITH CHECK (tenant_id = current_setting('app.current_tenant', true));
-- Role-based access within a tenant
CREATE POLICY "admin_full_access" ON invoices
FOR ALL
USING (
tenant_id = current_setting('app.current_tenant', true)
AND current_setting('app.current_role', true) = 'admin'
);
CREATE POLICY "user_read_own" ON invoices
FOR SELECT
USING (
tenant_id = current_setting('app.current_tenant', true)
AND created_by = current_setting('app.current_user', true)
);
Setting Session Variables
async def set_tenant_context(
session: AsyncSession,
tenant_id: str,
user_id: str,
role: str,
) -> None:
"""Set session variables for RLS evaluation."""
await session.execute(text(f"SET app.current_tenant = '{tenant_id}'"))
await session.execute(text(f"SET app.current_user = '{user_id}'"))
await session.execute(text(f"SET app.current_role = '{role}'"))
Performance Considerations
| Aspect | Schema-per-Tenant | RLS |
|---|---|---|
| Query overhead | None (schema isolation) | Per-query policy evaluation |
| Index efficiency | Smaller indexes per schema | Larger shared indexes |
| Migration complexity | Migrate each schema | One migration for all |
| Number of tenants | Dozens (schema limit) | Thousands (no limit) |
| Connection pooling | Needs SET search_path per request |
Needs SET session vars |
| Backup/restore | Per-schema possible | Whole database only |
Frontend Integration
Subdomain Detection (React)
function getTenantFromSubdomain(): string | null {
const hostname = window.location.hostname;
const parts = hostname.split(".");
// acme.app.easyfactu.es → ["acme", "app", "easyfactu", "es"]
if (parts.length >= 4 && parts[1] === "app") {
return parts[0];
}
// Local development: check URL params or localStorage
if (hostname === "localhost") {
return new URLSearchParams(window.location.search).get("tenant")
?? localStorage.getItem("dev_tenant");
}
return null;
}
Tenant-Aware API Client
import { api } from "@/lib/api";
class TenantApiClient {
private tenantId: string;
constructor(tenantId: string) {
this.tenantId = tenantId;
}
private get headers() {
return { "X-Tenant-ID": this.tenantId };
}
async getInvoices() {
return api.get("/v1/invoices", { headers: this.headers });
}
async createInvoice(data: InvoiceCreate) {
return api.post("/v1/invoices", data, { headers: this.headers });
}
}
// Usage with React context
const TenantApiContext = createContext<TenantApiClient | null>(null);
export function TenantApiProvider({ children }: { children: ReactNode }) {
const tenantId = getTenantFromSubdomain();
if (!tenantId) {
return <Navigate to="/select-tenant" />;
}
const client = useMemo(() => new TenantApiClient(tenantId), [tenantId]);
return (
<TenantApiContext.Provider value={client}>
{children}
</TenantApiContext.Provider>
);
}
export function useTenantApi() {
const ctx = useContext(TenantApiContext);
if (!ctx) throw new Error("useTenantApi must be used within TenantApiProvider");
return ctx;
}
Tenant-Scoped State
import { create } from "zustand";
import { persist } from "zustand/middleware";
interface TenantStore {
currentTenant: string | null;
setTenant: (tenant: string) => void;
clearTenant: () => void;
}
export const useTenantStore = create<TenantStore>()(
persist(
(set) => ({
currentTenant: null,
setTenant: (tenant) => set({ currentTenant: tenant }),
clearTenant: () => set({ currentTenant: null }),
}),
{
name: "tenant-store",
},
),
);
Data Isolation Testing
Cross-Tenant Access Prevention Tests
import pytest
from httpx import AsyncClient
@pytest.mark.anyio
async def test_tenant_isolation(
client: AsyncClient,
acme_auth_headers: dict,
globex_auth_headers: dict,
) -> None:
"""Verify tenants cannot access each other's data."""
# Create invoice as Acme tenant
response = await client.post(
"/v1/invoices",
json={"invoice_number": "INV-001", "amount": "100.00"},
headers=acme_auth_headers,
)
assert response.status_code == 201
invoice_id = response.json()["id"]
# Try to access as Globex tenant — should NOT be visible
response = await client.get(
f"/v1/invoices/{invoice_id}",
headers=globex_auth_headers,
)
assert response.status_code == 404 # Not found in Globex's context
@pytest.mark.anyio
async def test_tenant_list_isolation(
client: AsyncClient,
acme_auth_headers: dict,
globex_auth_headers: dict,
) -> None:
"""Verify tenant list only shows own data."""
# Create invoices for Acme
await client.post(
"/v1/invoices",
json={"invoice_number": "ACME-001", "amount": "50.00"},
headers=acme_auth_headers,
)
# Create invoices for Globex
await client.post(
"/v1/invoices",
json={"invoice_number": "GLOB-001", "amount": "75.00"},
headers=globex_auth_headers,
)
# Acme should only see their own
response = await client.get("/v1/invoices", headers=acme_auth_headers)
invoices = response.json()["data"]
assert all(inv["invoice_number"].startswith("ACME") for inv in invoices)
# Globex should only see their own
response = await client.get("/v1/invoices", headers=globex_auth_headers)
invoices = response.json()["data"]
assert all(inv["invoice_number"].startswith("GLOB") for inv in invoices)
Schema Isolation Verification
@pytest.mark.anyio
async def test_schema_isolation(db_engine) -> None:
"""Verify schemas are physically isolated."""
async with db_engine.connect() as conn:
# Set to Acme's schema
await conn.execute(text("SET search_path TO tenant_acme, public"))
acme_invoices = (await conn.execute(text("SELECT count(*) FROM invoices"))).scalar()
# Set to Globex's schema
await conn.execute(text("SET search_path TO tenant_globex, public"))
globex_invoices = (await conn.execute(text("SELECT count(*) FROM invoices"))).scalar()
# They should have independent data
# (exact counts depend on test fixtures)
assert isinstance(acme_invoices, int)
assert isinstance(globex_invoices, int)
Tenant Onboarding Flow
from fastapi import APIRouter, Depends
router = APIRouter(prefix="/v1/tenants")
@router.post("", status_code=201)
async def create_tenant(
data: TenantCreate,
user: AuthUser = Depends(require_admin),
db: AsyncSession = Depends(get_admin_db), # Admin session, no tenant scope
) -> TenantResponse:
"""Create a new tenant with its own schema."""
# 1. Validate tenant slug uniqueness
existing = await db.execute(
text("SELECT 1 FROM shared.tenants WHERE slug = :slug"),
{"slug": data.slug},
)
if existing.scalar():
raise HTTPException(status_code=409, detail="Tenant already exists")
# 2. Register tenant in shared schema
await db.execute(
text("""
INSERT INTO shared.tenants (slug, name, owner_id, created_at)
VALUES (:slug, :name, :owner_id, now())
"""),
{"slug": data.slug, "name": data.name, "owner_id": user.id},
)
# 3. Create tenant schema with standard tables
await create_tenant_schema(db, data.slug)
# 4. Run migrations for the new schema
# (In production, queue this as a background task)
await run_tenant_migrations(data.slug)
await db.commit()
return TenantResponse(slug=data.slug, name=data.name)
Guidelines
- Use subdomain routing as the primary tenant resolution method
- Fall back to X-Tenant-ID header for API clients and local development
- Use schema-per-tenant for strict isolation (compliance, billing)
- Use RLS for simpler setups with fewer tenants
- Always validate tenant slug format to prevent SQL injection
- Write cross-tenant isolation tests for every data endpoint
- Set search_path per request — never reuse connections with stale context
- Reset search_path after each request in the connection pool
- Use a shared schema for cross-tenant data (tenant registry, plans)
Weekly Installs
1
Repository
franciscosanche…factu-esFirst Seen
13 days ago
Security Audits
Installed on
mcpjam1
claude-code1
junie1
windsurf1
zencoder1
crush1