multi-tenancy

SKILL.md

Multi-Tenancy Patterns

Implement multi-tenant SaaS architectures with subdomain routing, schema isolation, tenant middleware, and security verification.

When to Use This Skill

  • Building multi-tenant SaaS applications
  • Implementing tenant resolution from subdomains or headers
  • Designing schema-per-tenant database isolation
  • Creating tenant-aware middleware in FastAPI
  • Implementing Row-Level Security as an alternative to schema isolation
  • Testing cross-tenant data isolation

Tenant Resolution

Subdomain Extraction Middleware (FastAPI)

import re
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from starlette.responses import Response


class TenantMiddleware(BaseHTTPMiddleware):
    """Resolve tenant from subdomain, header, or JWT claim."""

    SKIP_PATHS = {"/docs", "/redoc", "/openapi.json", "/health", "/v1/auth"}
    SUBDOMAIN_PATTERN = re.compile(r"^([a-z][a-z0-9-]{1,62})\.app\.")

    async def dispatch(self, request: Request, call_next) -> Response:
        if request.url.path in self.SKIP_PATHS:
            return await call_next(request)

        tenant_slug = self._resolve_tenant(request)
        request.state.tenant_slug = tenant_slug

        return await call_next(request)

    def _resolve_tenant(self, request: Request) -> str | None:
        """Resolve tenant using priority: subdomain > header > JWT claim."""
        # 1. Subdomain: acme.app.easyfactu.es → "acme"
        host = request.headers.get("host", "")
        match = self.SUBDOMAIN_PATTERN.match(host)
        if match:
            return match.group(1)

        # 2. Header: X-Tenant-ID
        header_tenant = request.headers.get("X-Tenant-ID")
        if header_tenant:
            return header_tenant.lower().strip()

        # 3. JWT claim (resolved later in auth dependency)
        return None

Tenant Context Dependency

from fastapi import Depends, HTTPException, Request
from pydantic import BaseModel


class TenantContext(BaseModel):
    """Immutable tenant context for the current request."""

    slug: str
    schema_name: str


async def get_current_tenant(
    request: Request,
    user: AuthUser = Depends(get_current_user),
) -> TenantContext:
    """Resolve and validate the current tenant."""
    # Priority: middleware-resolved slug > JWT tenant_id
    slug = getattr(request.state, "tenant_slug", None) or user.tenant_id

    if not slug:
        raise HTTPException(
            status_code=400,
            detail="Tenant context required. Use subdomain or X-Tenant-ID header.",
        )

    # Validate slug format
    if not re.match(r"^[a-z][a-z0-9-]{1,62}$", slug):
        raise HTTPException(status_code=400, detail="Invalid tenant identifier")

    return TenantContext(
        slug=slug,
        schema_name=f"tenant_{slug}",
    )

JWT Claim-Based Tenant

from jose import jwt

# When creating JWT claims (during login/signup)
def create_tenant_jwt_claims(user_id: str, tenant_id: str, role: str) -> dict:
    """Add tenant context to Supabase user metadata."""
    return {
        "sub": user_id,
        "tenant_id": tenant_id,
        "role": role,
        "aud": "authenticated",
    }

# In auth dependency, extract tenant from JWT
async def get_current_user(
    credentials: HTTPAuthorizationCredentials = Depends(security),
) -> AuthUser:
    payload = jwt.decode(credentials.credentials, SECRET, algorithms=["HS256"])
    return AuthUser(
        id=payload["sub"],
        tenant_id=payload.get("tenant_id"),
        role=payload.get("role", "user"),
    )

Schema-per-Tenant (PostgreSQL)

Dynamic Schema Creation

from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession


async def create_tenant_schema(
    session: AsyncSession,
    tenant_slug: str,
) -> None:
    """Create a new schema for a tenant with standard tables."""
    schema_name = f"tenant_{tenant_slug}"

    # Create schema
    await session.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name}"))

    # Create standard tables
    await session.execute(text(f"""
        CREATE TABLE IF NOT EXISTS {schema_name}.invoices (
            id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
            invoice_number TEXT NOT NULL,
            customer_id UUID NOT NULL,
            amount DECIMAL(12,2) NOT NULL CHECK (amount > 0),
            currency TEXT NOT NULL DEFAULT 'EUR',
            status TEXT NOT NULL DEFAULT 'draft',
            created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
            updated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
            UNIQUE(invoice_number)
        )
    """))

    await session.execute(text(f"""
        CREATE TABLE IF NOT EXISTS {schema_name}.customers (
            id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
            name TEXT NOT NULL,
            nif TEXT NOT NULL,
            email TEXT,
            created_at TIMESTAMPTZ NOT NULL DEFAULT now(),
            UNIQUE(nif)
        )
    """))

    await session.commit()

Schema Migration Strategy (Alembic)

# alembic/env.py
import os
from alembic import context
from sqlalchemy import text


def run_migrations_online():
    """Run migrations for a specific tenant schema."""
    schema = os.environ.get("TENANT_SCHEMA", "public")

    connectable = engine_from_config(config.get_section("alembic"))

    with connectable.connect() as connection:
        # Set search_path to target schema
        connection.execute(text(f"SET search_path TO {schema}, public"))

        context.configure(
            connection=connection,
            target_metadata=metadata,
            version_table_schema=schema,
        )

        with context.begin_transaction():
            context.run_migrations()
# Run migration for a specific tenant
TENANT_SCHEMA=tenant_acme alembic upgrade head

# Run for all tenants
for schema in $(psql -t -c "SELECT schema_name FROM information_schema.schemata WHERE schema_name LIKE 'tenant_%'"); do
    TENANT_SCHEMA=$schema alembic upgrade head
done

Connection with search_path Switching

from collections.abc import AsyncIterator
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker


async def get_tenant_session(
    session_factory: async_sessionmaker,
    tenant: TenantContext,
) -> AsyncIterator[AsyncSession]:
    """Provide a database session scoped to a tenant's schema."""
    async with session_factory() as session:
        await session.execute(
            text(f"SET search_path TO {tenant.schema_name}, public")
        )
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise
        finally:
            # Reset search_path
            await session.execute(text("SET search_path TO public"))

FastAPI Dependency

from fastapi import Depends, Request


async def get_db(
    request: Request,
    tenant: TenantContext = Depends(get_current_tenant),
) -> AsyncIterator[AsyncSession]:
    """Database session scoped to the current tenant."""
    session_factory = request.app.state.session_factory

    async for session in get_tenant_session(session_factory, tenant):
        yield session

Row-Level Security (Alternative)

RLS Policy Creation

-- Enable RLS
ALTER TABLE invoices ENABLE ROW LEVEL SECURITY;

-- Tenant isolation policy
CREATE POLICY "tenant_isolation" ON invoices
    FOR ALL
    USING (tenant_id = current_setting('app.current_tenant', true))
    WITH CHECK (tenant_id = current_setting('app.current_tenant', true));

-- Role-based access within a tenant
CREATE POLICY "admin_full_access" ON invoices
    FOR ALL
    USING (
        tenant_id = current_setting('app.current_tenant', true)
        AND current_setting('app.current_role', true) = 'admin'
    );

CREATE POLICY "user_read_own" ON invoices
    FOR SELECT
    USING (
        tenant_id = current_setting('app.current_tenant', true)
        AND created_by = current_setting('app.current_user', true)
    );

Setting Session Variables

async def set_tenant_context(
    session: AsyncSession,
    tenant_id: str,
    user_id: str,
    role: str,
) -> None:
    """Set session variables for RLS evaluation."""
    await session.execute(text(f"SET app.current_tenant = '{tenant_id}'"))
    await session.execute(text(f"SET app.current_user = '{user_id}'"))
    await session.execute(text(f"SET app.current_role = '{role}'"))

Performance Considerations

Aspect Schema-per-Tenant RLS
Query overhead None (schema isolation) Per-query policy evaluation
Index efficiency Smaller indexes per schema Larger shared indexes
Migration complexity Migrate each schema One migration for all
Number of tenants Dozens (schema limit) Thousands (no limit)
Connection pooling Needs SET search_path per request Needs SET session vars
Backup/restore Per-schema possible Whole database only

Frontend Integration

Subdomain Detection (React)

function getTenantFromSubdomain(): string | null {
  const hostname = window.location.hostname;
  const parts = hostname.split(".");

  // acme.app.easyfactu.es → ["acme", "app", "easyfactu", "es"]
  if (parts.length >= 4 && parts[1] === "app") {
    return parts[0];
  }

  // Local development: check URL params or localStorage
  if (hostname === "localhost") {
    return new URLSearchParams(window.location.search).get("tenant")
      ?? localStorage.getItem("dev_tenant");
  }

  return null;
}

Tenant-Aware API Client

import { api } from "@/lib/api";

class TenantApiClient {
  private tenantId: string;

  constructor(tenantId: string) {
    this.tenantId = tenantId;
  }

  private get headers() {
    return { "X-Tenant-ID": this.tenantId };
  }

  async getInvoices() {
    return api.get("/v1/invoices", { headers: this.headers });
  }

  async createInvoice(data: InvoiceCreate) {
    return api.post("/v1/invoices", data, { headers: this.headers });
  }
}

// Usage with React context
const TenantApiContext = createContext<TenantApiClient | null>(null);

export function TenantApiProvider({ children }: { children: ReactNode }) {
  const tenantId = getTenantFromSubdomain();

  if (!tenantId) {
    return <Navigate to="/select-tenant" />;
  }

  const client = useMemo(() => new TenantApiClient(tenantId), [tenantId]);

  return (
    <TenantApiContext.Provider value={client}>
      {children}
    </TenantApiContext.Provider>
  );
}

export function useTenantApi() {
  const ctx = useContext(TenantApiContext);
  if (!ctx) throw new Error("useTenantApi must be used within TenantApiProvider");
  return ctx;
}

Tenant-Scoped State

import { create } from "zustand";
import { persist } from "zustand/middleware";

interface TenantStore {
  currentTenant: string | null;
  setTenant: (tenant: string) => void;
  clearTenant: () => void;
}

export const useTenantStore = create<TenantStore>()(
  persist(
    (set) => ({
      currentTenant: null,
      setTenant: (tenant) => set({ currentTenant: tenant }),
      clearTenant: () => set({ currentTenant: null }),
    }),
    {
      name: "tenant-store",
    },
  ),
);

Data Isolation Testing

Cross-Tenant Access Prevention Tests

import pytest
from httpx import AsyncClient


@pytest.mark.anyio
async def test_tenant_isolation(
    client: AsyncClient,
    acme_auth_headers: dict,
    globex_auth_headers: dict,
) -> None:
    """Verify tenants cannot access each other's data."""
    # Create invoice as Acme tenant
    response = await client.post(
        "/v1/invoices",
        json={"invoice_number": "INV-001", "amount": "100.00"},
        headers=acme_auth_headers,
    )
    assert response.status_code == 201
    invoice_id = response.json()["id"]

    # Try to access as Globex tenant — should NOT be visible
    response = await client.get(
        f"/v1/invoices/{invoice_id}",
        headers=globex_auth_headers,
    )
    assert response.status_code == 404  # Not found in Globex's context


@pytest.mark.anyio
async def test_tenant_list_isolation(
    client: AsyncClient,
    acme_auth_headers: dict,
    globex_auth_headers: dict,
) -> None:
    """Verify tenant list only shows own data."""
    # Create invoices for Acme
    await client.post(
        "/v1/invoices",
        json={"invoice_number": "ACME-001", "amount": "50.00"},
        headers=acme_auth_headers,
    )

    # Create invoices for Globex
    await client.post(
        "/v1/invoices",
        json={"invoice_number": "GLOB-001", "amount": "75.00"},
        headers=globex_auth_headers,
    )

    # Acme should only see their own
    response = await client.get("/v1/invoices", headers=acme_auth_headers)
    invoices = response.json()["data"]
    assert all(inv["invoice_number"].startswith("ACME") for inv in invoices)

    # Globex should only see their own
    response = await client.get("/v1/invoices", headers=globex_auth_headers)
    invoices = response.json()["data"]
    assert all(inv["invoice_number"].startswith("GLOB") for inv in invoices)

Schema Isolation Verification

@pytest.mark.anyio
async def test_schema_isolation(db_engine) -> None:
    """Verify schemas are physically isolated."""
    async with db_engine.connect() as conn:
        # Set to Acme's schema
        await conn.execute(text("SET search_path TO tenant_acme, public"))
        acme_invoices = (await conn.execute(text("SELECT count(*) FROM invoices"))).scalar()

        # Set to Globex's schema
        await conn.execute(text("SET search_path TO tenant_globex, public"))
        globex_invoices = (await conn.execute(text("SELECT count(*) FROM invoices"))).scalar()

        # They should have independent data
        # (exact counts depend on test fixtures)
        assert isinstance(acme_invoices, int)
        assert isinstance(globex_invoices, int)

Tenant Onboarding Flow

from fastapi import APIRouter, Depends

router = APIRouter(prefix="/v1/tenants")


@router.post("", status_code=201)
async def create_tenant(
    data: TenantCreate,
    user: AuthUser = Depends(require_admin),
    db: AsyncSession = Depends(get_admin_db),  # Admin session, no tenant scope
) -> TenantResponse:
    """Create a new tenant with its own schema."""
    # 1. Validate tenant slug uniqueness
    existing = await db.execute(
        text("SELECT 1 FROM shared.tenants WHERE slug = :slug"),
        {"slug": data.slug},
    )
    if existing.scalar():
        raise HTTPException(status_code=409, detail="Tenant already exists")

    # 2. Register tenant in shared schema
    await db.execute(
        text("""
            INSERT INTO shared.tenants (slug, name, owner_id, created_at)
            VALUES (:slug, :name, :owner_id, now())
        """),
        {"slug": data.slug, "name": data.name, "owner_id": user.id},
    )

    # 3. Create tenant schema with standard tables
    await create_tenant_schema(db, data.slug)

    # 4. Run migrations for the new schema
    # (In production, queue this as a background task)
    await run_tenant_migrations(data.slug)

    await db.commit()

    return TenantResponse(slug=data.slug, name=data.name)

Guidelines

  • Use subdomain routing as the primary tenant resolution method
  • Fall back to X-Tenant-ID header for API clients and local development
  • Use schema-per-tenant for strict isolation (compliance, billing)
  • Use RLS for simpler setups with fewer tenants
  • Always validate tenant slug format to prevent SQL injection
  • Write cross-tenant isolation tests for every data endpoint
  • Set search_path per request — never reuse connections with stale context
  • Reset search_path after each request in the connection pool
  • Use a shared schema for cross-tenant data (tenant registry, plans)
Weekly Installs
1
First Seen
13 days ago
Installed on
mcpjam1
claude-code1
junie1
windsurf1
zencoder1
crush1