FastAPI Observability
SKILL.md
FastAPI Observability
This skill provides production-ready observability patterns including structured logging, Prometheus metrics, and OpenTelemetry tracing.
Structured Logging
Configuration with structlog
# app/core/logging.py
import structlog
import logging
import sys
from typing import Any
def setup_logging(log_level: str = "INFO", json_logs: bool = True):
"""Configure structured logging."""
# Shared processors
shared_processors = [
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
]
if json_logs:
# JSON format for production
processors = shared_processors + [
structlog.processors.format_exc_info,
structlog.processors.JSONRenderer()
]
else:
# Console format for development
processors = shared_processors + [
structlog.dev.ConsoleRenderer()
]
structlog.configure(
processors=processors,
wrapper_class=structlog.make_filtering_bound_logger(
getattr(logging, log_level.upper())
),
context_class=dict,
logger_factory=structlog.PrintLoggerFactory(),
cache_logger_on_first_use=True,
)
def get_logger(name: str = None) -> structlog.BoundLogger:
return structlog.get_logger(name)
Request Logging Middleware
# app/middleware/logging.py
import time
import uuid
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
import structlog
logger = structlog.get_logger()
class RequestLoggingMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
request_id = str(uuid.uuid4())
start_time = time.perf_counter()
# Bind context for all logs in this request
structlog.contextvars.clear_contextvars()
structlog.contextvars.bind_contextvars(
request_id=request_id,
method=request.method,
path=request.url.path,
client_ip=request.client.host if request.client else None
)
# Add request ID to response headers
response = await call_next(request)
response.headers["X-Request-ID"] = request_id
# Calculate duration
duration_ms = (time.perf_counter() - start_time) * 1000
# Log request completion
logger.info(
"request_completed",
status_code=response.status_code,
duration_ms=round(duration_ms, 2),
content_length=response.headers.get("content-length")
)
return response
Application Logging
from app.core.logging import get_logger
logger = get_logger(__name__)
async def create_user(data: UserCreate) -> User:
logger.info("creating_user", email=data.email)
try:
user = await User(**data.model_dump()).insert()
logger.info("user_created", user_id=str(user.id))
return user
except Exception as e:
logger.error("user_creation_failed", error=str(e), email=data.email)
raise
Prometheus Metrics
Setup with prometheus-fastapi-instrumentator
# app/core/metrics.py
from prometheus_fastapi_instrumentator import Instrumentator
from prometheus_client import Counter, Histogram, Gauge
from functools import wraps
# Custom metrics
REQUEST_COUNT = Counter(
"app_requests_total",
"Total request count",
["method", "endpoint", "status"]
)
REQUEST_LATENCY = Histogram(
"app_request_latency_seconds",
"Request latency",
["method", "endpoint"],
buckets=[0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0]
)
ACTIVE_REQUESTS = Gauge(
"app_active_requests",
"Number of active requests"
)
DB_QUERY_LATENCY = Histogram(
"app_db_query_latency_seconds",
"Database query latency",
["operation", "collection"]
)
CACHE_HITS = Counter(
"app_cache_hits_total",
"Cache hit count",
["cache_name"]
)
CACHE_MISSES = Counter(
"app_cache_misses_total",
"Cache miss count",
["cache_name"]
)
def setup_metrics(app):
"""Setup Prometheus metrics instrumentation."""
Instrumentator().instrument(app).expose(app, endpoint="/metrics")
Custom Metric Decorators
import time
from functools import wraps
def track_db_query(operation: str, collection: str):
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
start = time.perf_counter()
try:
return await func(*args, **kwargs)
finally:
duration = time.perf_counter() - start
DB_QUERY_LATENCY.labels(
operation=operation,
collection=collection
).observe(duration)
return wrapper
return decorator
def track_cache(cache_name: str):
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
result = await func(*args, **kwargs)
if result is not None:
CACHE_HITS.labels(cache_name=cache_name).inc()
else:
CACHE_MISSES.labels(cache_name=cache_name).inc()
return result
return wrapper
return decorator
# Usage
class UserRepository:
@track_db_query("find", "users")
async def find_by_id(self, user_id: str):
return await User.get(user_id)
@track_cache("users")
async def get_cached(self, user_id: str):
return await cache.get(f"user:{user_id}")
OpenTelemetry Tracing
Setup
# app/core/tracing.py
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor
from opentelemetry.instrumentation.redis import RedisInstrumentor
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.resources import Resource
def setup_tracing(app, service_name: str, otlp_endpoint: str):
"""Setup OpenTelemetry tracing."""
# Create resource
resource = Resource.create({
"service.name": service_name,
"service.version": "1.0.0",
})
# Setup tracer provider
provider = TracerProvider(resource=resource)
# Add OTLP exporter
otlp_exporter = OTLPSpanExporter(endpoint=otlp_endpoint)
processor = BatchSpanProcessor(otlp_exporter)
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)
# Instrument FastAPI
FastAPIInstrumentor.instrument_app(app)
# Instrument HTTP client
HTTPXClientInstrumentor().instrument()
# Instrument Redis
RedisInstrumentor().instrument()
def get_tracer(name: str) -> trace.Tracer:
return trace.get_tracer(name)
Custom Spans
from opentelemetry import trace
from opentelemetry.trace import Status, StatusCode
tracer = trace.get_tracer(__name__)
async def process_order(order_id: str):
with tracer.start_as_current_span("process_order") as span:
span.set_attribute("order.id", order_id)
try:
# Validate order
with tracer.start_as_current_span("validate_order"):
order = await validate_order(order_id)
span.set_attribute("order.total", order.total)
# Process payment
with tracer.start_as_current_span("process_payment"):
payment = await process_payment(order)
span.set_attribute("payment.id", payment.id)
# Update inventory
with tracer.start_as_current_span("update_inventory"):
await update_inventory(order.items)
span.set_status(Status(StatusCode.OK))
return order
except Exception as e:
span.set_status(Status(StatusCode.ERROR, str(e)))
span.record_exception(e)
raise
Health Check Endpoints
# app/routes/health.py
from fastapi import APIRouter, Response
from typing import Dict, Any
from enum import Enum
class HealthStatus(str, Enum):
HEALTHY = "healthy"
DEGRADED = "degraded"
UNHEALTHY = "unhealthy"
router = APIRouter(tags=["Health"])
@router.get("/health")
async def health() -> Dict[str, str]:
"""Kubernetes liveness probe."""
return {"status": "ok"}
@router.get("/health/ready")
async def ready(
db: Database = Depends(get_db),
cache: RedisCache = Depends(get_cache)
) -> Response:
"""Kubernetes readiness probe with dependency checks."""
checks = {}
status = HealthStatus.HEALTHY
# MongoDB check
try:
await db.command("ping")
checks["mongodb"] = {"status": "ok", "latency_ms": 0}
except Exception as e:
checks["mongodb"] = {"status": "error", "error": str(e)}
status = HealthStatus.UNHEALTHY
# Redis check
try:
start = time.perf_counter()
await cache.client.ping()
latency = (time.perf_counter() - start) * 1000
checks["redis"] = {"status": "ok", "latency_ms": round(latency, 2)}
except Exception as e:
checks["redis"] = {"status": "error", "error": str(e)}
status = HealthStatus.DEGRADED # Cache failure = degraded
response_data = {
"status": status.value,
"checks": checks,
"timestamp": datetime.utcnow().isoformat()
}
status_code = 200 if status != HealthStatus.UNHEALTHY else 503
return Response(
content=json.dumps(response_data),
status_code=status_code,
media_type="application/json"
)
@router.get("/health/live")
async def live() -> Dict[str, str]:
"""Simple liveness check."""
return {"status": "alive"}
Application Integration
from fastapi import FastAPI
from app.core.logging import setup_logging
from app.core.metrics import setup_metrics
from app.core.tracing import setup_tracing
def create_app() -> FastAPI:
# Setup logging first
setup_logging(
log_level=settings.log_level,
json_logs=settings.environment == "production"
)
app = FastAPI(title="API Service")
# Setup metrics
setup_metrics(app)
# Setup tracing
if settings.otlp_endpoint:
setup_tracing(
app,
service_name="api-service",
otlp_endpoint=settings.otlp_endpoint
)
# Add middleware
app.add_middleware(RequestLoggingMiddleware)
return app
Additional Resources
Reference Files
For detailed configuration:
references/grafana-dashboards.md- Grafana dashboard JSONreferences/alerting.md- Prometheus alerting rulesreferences/elk-setup.md- Elasticsearch/Kibana log aggregation
Example Files
Working examples in examples/:
examples/logging_config.py- Complete logging setupexamples/metrics_middleware.py- Custom metrics middlewareexamples/tracing_service.py- Service with tracing