skills/lobbi-docs/claude/FastAPI Observability

FastAPI Observability

SKILL.md

FastAPI Observability

This skill provides production-ready observability patterns including structured logging, Prometheus metrics, and OpenTelemetry tracing.

Structured Logging

Configuration with structlog

# app/core/logging.py
import structlog
import logging
import sys
from typing import Any

def setup_logging(log_level: str = "INFO", json_logs: bool = True):
    """Configure structured logging."""

    # Shared processors
    shared_processors = [
        structlog.contextvars.merge_contextvars,
        structlog.processors.add_log_level,
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.StackInfoRenderer(),
    ]

    if json_logs:
        # JSON format for production
        processors = shared_processors + [
            structlog.processors.format_exc_info,
            structlog.processors.JSONRenderer()
        ]
    else:
        # Console format for development
        processors = shared_processors + [
            structlog.dev.ConsoleRenderer()
        ]

    structlog.configure(
        processors=processors,
        wrapper_class=structlog.make_filtering_bound_logger(
            getattr(logging, log_level.upper())
        ),
        context_class=dict,
        logger_factory=structlog.PrintLoggerFactory(),
        cache_logger_on_first_use=True,
    )

def get_logger(name: str = None) -> structlog.BoundLogger:
    return structlog.get_logger(name)

Request Logging Middleware

# app/middleware/logging.py
import time
import uuid
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
import structlog

logger = structlog.get_logger()

class RequestLoggingMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        request_id = str(uuid.uuid4())
        start_time = time.perf_counter()

        # Bind context for all logs in this request
        structlog.contextvars.clear_contextvars()
        structlog.contextvars.bind_contextvars(
            request_id=request_id,
            method=request.method,
            path=request.url.path,
            client_ip=request.client.host if request.client else None
        )

        # Add request ID to response headers
        response = await call_next(request)
        response.headers["X-Request-ID"] = request_id

        # Calculate duration
        duration_ms = (time.perf_counter() - start_time) * 1000

        # Log request completion
        logger.info(
            "request_completed",
            status_code=response.status_code,
            duration_ms=round(duration_ms, 2),
            content_length=response.headers.get("content-length")
        )

        return response

Application Logging

from app.core.logging import get_logger

logger = get_logger(__name__)

async def create_user(data: UserCreate) -> User:
    logger.info("creating_user", email=data.email)

    try:
        user = await User(**data.model_dump()).insert()
        logger.info("user_created", user_id=str(user.id))
        return user
    except Exception as e:
        logger.error("user_creation_failed", error=str(e), email=data.email)
        raise

Prometheus Metrics

Setup with prometheus-fastapi-instrumentator

# app/core/metrics.py
from prometheus_fastapi_instrumentator import Instrumentator
from prometheus_client import Counter, Histogram, Gauge
from functools import wraps

# Custom metrics
REQUEST_COUNT = Counter(
    "app_requests_total",
    "Total request count",
    ["method", "endpoint", "status"]
)

REQUEST_LATENCY = Histogram(
    "app_request_latency_seconds",
    "Request latency",
    ["method", "endpoint"],
    buckets=[0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0]
)

ACTIVE_REQUESTS = Gauge(
    "app_active_requests",
    "Number of active requests"
)

DB_QUERY_LATENCY = Histogram(
    "app_db_query_latency_seconds",
    "Database query latency",
    ["operation", "collection"]
)

CACHE_HITS = Counter(
    "app_cache_hits_total",
    "Cache hit count",
    ["cache_name"]
)

CACHE_MISSES = Counter(
    "app_cache_misses_total",
    "Cache miss count",
    ["cache_name"]
)

def setup_metrics(app):
    """Setup Prometheus metrics instrumentation."""
    Instrumentator().instrument(app).expose(app, endpoint="/metrics")

Custom Metric Decorators

import time
from functools import wraps

def track_db_query(operation: str, collection: str):
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            start = time.perf_counter()
            try:
                return await func(*args, **kwargs)
            finally:
                duration = time.perf_counter() - start
                DB_QUERY_LATENCY.labels(
                    operation=operation,
                    collection=collection
                ).observe(duration)
        return wrapper
    return decorator

def track_cache(cache_name: str):
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            result = await func(*args, **kwargs)
            if result is not None:
                CACHE_HITS.labels(cache_name=cache_name).inc()
            else:
                CACHE_MISSES.labels(cache_name=cache_name).inc()
            return result
        return wrapper
    return decorator

# Usage
class UserRepository:
    @track_db_query("find", "users")
    async def find_by_id(self, user_id: str):
        return await User.get(user_id)

    @track_cache("users")
    async def get_cached(self, user_id: str):
        return await cache.get(f"user:{user_id}")

OpenTelemetry Tracing

Setup

# app/core/tracing.py
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor
from opentelemetry.instrumentation.redis import RedisInstrumentor
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.resources import Resource

def setup_tracing(app, service_name: str, otlp_endpoint: str):
    """Setup OpenTelemetry tracing."""

    # Create resource
    resource = Resource.create({
        "service.name": service_name,
        "service.version": "1.0.0",
    })

    # Setup tracer provider
    provider = TracerProvider(resource=resource)

    # Add OTLP exporter
    otlp_exporter = OTLPSpanExporter(endpoint=otlp_endpoint)
    processor = BatchSpanProcessor(otlp_exporter)
    provider.add_span_processor(processor)

    trace.set_tracer_provider(provider)

    # Instrument FastAPI
    FastAPIInstrumentor.instrument_app(app)

    # Instrument HTTP client
    HTTPXClientInstrumentor().instrument()

    # Instrument Redis
    RedisInstrumentor().instrument()

def get_tracer(name: str) -> trace.Tracer:
    return trace.get_tracer(name)

Custom Spans

from opentelemetry import trace
from opentelemetry.trace import Status, StatusCode

tracer = trace.get_tracer(__name__)

async def process_order(order_id: str):
    with tracer.start_as_current_span("process_order") as span:
        span.set_attribute("order.id", order_id)

        try:
            # Validate order
            with tracer.start_as_current_span("validate_order"):
                order = await validate_order(order_id)
                span.set_attribute("order.total", order.total)

            # Process payment
            with tracer.start_as_current_span("process_payment"):
                payment = await process_payment(order)
                span.set_attribute("payment.id", payment.id)

            # Update inventory
            with tracer.start_as_current_span("update_inventory"):
                await update_inventory(order.items)

            span.set_status(Status(StatusCode.OK))
            return order

        except Exception as e:
            span.set_status(Status(StatusCode.ERROR, str(e)))
            span.record_exception(e)
            raise

Health Check Endpoints

# app/routes/health.py
from fastapi import APIRouter, Response
from typing import Dict, Any
from enum import Enum

class HealthStatus(str, Enum):
    HEALTHY = "healthy"
    DEGRADED = "degraded"
    UNHEALTHY = "unhealthy"

router = APIRouter(tags=["Health"])

@router.get("/health")
async def health() -> Dict[str, str]:
    """Kubernetes liveness probe."""
    return {"status": "ok"}

@router.get("/health/ready")
async def ready(
    db: Database = Depends(get_db),
    cache: RedisCache = Depends(get_cache)
) -> Response:
    """Kubernetes readiness probe with dependency checks."""
    checks = {}
    status = HealthStatus.HEALTHY

    # MongoDB check
    try:
        await db.command("ping")
        checks["mongodb"] = {"status": "ok", "latency_ms": 0}
    except Exception as e:
        checks["mongodb"] = {"status": "error", "error": str(e)}
        status = HealthStatus.UNHEALTHY

    # Redis check
    try:
        start = time.perf_counter()
        await cache.client.ping()
        latency = (time.perf_counter() - start) * 1000
        checks["redis"] = {"status": "ok", "latency_ms": round(latency, 2)}
    except Exception as e:
        checks["redis"] = {"status": "error", "error": str(e)}
        status = HealthStatus.DEGRADED  # Cache failure = degraded

    response_data = {
        "status": status.value,
        "checks": checks,
        "timestamp": datetime.utcnow().isoformat()
    }

    status_code = 200 if status != HealthStatus.UNHEALTHY else 503
    return Response(
        content=json.dumps(response_data),
        status_code=status_code,
        media_type="application/json"
    )

@router.get("/health/live")
async def live() -> Dict[str, str]:
    """Simple liveness check."""
    return {"status": "alive"}

Application Integration

from fastapi import FastAPI
from app.core.logging import setup_logging
from app.core.metrics import setup_metrics
from app.core.tracing import setup_tracing

def create_app() -> FastAPI:
    # Setup logging first
    setup_logging(
        log_level=settings.log_level,
        json_logs=settings.environment == "production"
    )

    app = FastAPI(title="API Service")

    # Setup metrics
    setup_metrics(app)

    # Setup tracing
    if settings.otlp_endpoint:
        setup_tracing(
            app,
            service_name="api-service",
            otlp_endpoint=settings.otlp_endpoint
        )

    # Add middleware
    app.add_middleware(RequestLoggingMiddleware)

    return app

Additional Resources

Reference Files

For detailed configuration:

  • references/grafana-dashboards.md - Grafana dashboard JSON
  • references/alerting.md - Prometheus alerting rules
  • references/elk-setup.md - Elasticsearch/Kibana log aggregation

Example Files

Working examples in examples/:

  • examples/logging_config.py - Complete logging setup
  • examples/metrics_middleware.py - Custom metrics middleware
  • examples/tracing_service.py - Service with tracing
Weekly Installs
0
GitHub Stars
9
First Seen
Jan 1, 1970