FastAPI Observability
FastAPI Observability
This skill provides production-ready observability patterns including structured logging, Prometheus metrics, and OpenTelemetry tracing.
Structured Logging
Configuration with structlog
# app/core/logging.py
import structlog
import logging
import sys
from typing import Any
def setup_logging(log_level: str = "INFO", json_logs: bool = True):
"""Configure structured logging."""
# Shared processors
shared_processors = [
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
]
if json_logs:
# JSON format for production
processors = shared_processors + [
structlog.processors.format_exc_info,
structlog.processors.JSONRenderer()
]
else:
# Console format for development
processors = shared_processors + [
structlog.dev.ConsoleRenderer()
]
structlog.configure(
processors=processors,
wrapper_class=structlog.make_filtering_bound_logger(
getattr(logging, log_level.upper())
),
context_class=dict,
logger_factory=structlog.PrintLoggerFactory(),
cache_logger_on_first_use=True,
)
def get_logger(name: str = None) -> structlog.BoundLogger:
return structlog.get_logger(name)
Request Logging Middleware
# app/middleware/logging.py
import time
import uuid
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
import structlog
logger = structlog.get_logger()
class RequestLoggingMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
request_id = str(uuid.uuid4())
start_time = time.perf_counter()
# Bind context for all logs in this request
structlog.contextvars.clear_contextvars()
structlog.contextvars.bind_contextvars(
request_id=request_id,
method=request.method,
path=request.url.path,
client_ip=request.client.host if request.client else None
)
# Add request ID to response headers
response = await call_next(request)
response.headers["X-Request-ID"] = request_id
# Calculate duration
duration_ms = (time.perf_counter() - start_time) * 1000
# Log request completion
logger.info(
"request_completed",
status_code=response.status_code,
duration_ms=round(duration_ms, 2),
content_length=response.headers.get("content-length")
)
return response
Application Logging
from app.core.logging import get_logger
logger = get_logger(__name__)
async def create_user(data: UserCreate) -> User:
logger.info("creating_user", email=data.email)
try:
user = await User(**data.model_dump()).insert()
logger.info("user_created", user_id=str(user.id))
return user
except Exception as e:
logger.error("user_creation_failed", error=str(e), email=data.email)
raise
Prometheus Metrics
Setup with prometheus-fastapi-instrumentator
# app/core/metrics.py
from prometheus_fastapi_instrumentator import Instrumentator
from prometheus_client import Counter, Histogram, Gauge
from functools import wraps
# Custom metrics
REQUEST_COUNT = Counter(
"app_requests_total",
"Total request count",
["method", "endpoint", "status"]
)
REQUEST_LATENCY = Histogram(
"app_request_latency_seconds",
"Request latency",
["method", "endpoint"],
buckets=[0.01, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0]
)
ACTIVE_REQUESTS = Gauge(
"app_active_requests",
"Number of active requests"
)
DB_QUERY_LATENCY = Histogram(
"app_db_query_latency_seconds",
"Database query latency",
["operation", "collection"]
)
CACHE_HITS = Counter(
"app_cache_hits_total",
"Cache hit count",
["cache_name"]
)
CACHE_MISSES = Counter(
"app_cache_misses_total",
"Cache miss count",
["cache_name"]
)
def setup_metrics(app):
"""Setup Prometheus metrics instrumentation."""
Instrumentator().instrument(app).expose(app, endpoint="/metrics")
Custom Metric Decorators
import time
from functools import wraps
def track_db_query(operation: str, collection: str):
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
start = time.perf_counter()
try:
return await func(*args, **kwargs)
finally:
duration = time.perf_counter() - start
DB_QUERY_LATENCY.labels(
operation=operation,
collection=collection
).observe(duration)
return wrapper
return decorator
def track_cache(cache_name: str):
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
result = await func(*args, **kwargs)
if result is not None:
CACHE_HITS.labels(cache_name=cache_name).inc()
else:
CACHE_MISSES.labels(cache_name=cache_name).inc()
return result
return wrapper
return decorator
# Usage
class UserRepository:
@track_db_query("find", "users")
async def find_by_id(self, user_id: str):
return await User.get(user_id)
@track_cache("users")
async def get_cached(self, user_id: str):
return await cache.get(f"user:{user_id}")
OpenTelemetry Tracing
Setup
# app/core/tracing.py
from opentelemetry import trace
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor
from opentelemetry.instrumentation.redis import RedisInstrumentor
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.sdk.resources import Resource
def setup_tracing(app, service_name: str, otlp_endpoint: str):
"""Setup OpenTelemetry tracing."""
# Create resource
resource = Resource.create({
"service.name": service_name,
"service.version": "1.0.0",
})
# Setup tracer provider
provider = TracerProvider(resource=resource)
# Add OTLP exporter
otlp_exporter = OTLPSpanExporter(endpoint=otlp_endpoint)
processor = BatchSpanProcessor(otlp_exporter)
provider.add_span_processor(processor)
trace.set_tracer_provider(provider)
# Instrument FastAPI
FastAPIInstrumentor.instrument_app(app)
# Instrument HTTP client
HTTPXClientInstrumentor().instrument()
# Instrument Redis
RedisInstrumentor().instrument()
def get_tracer(name: str) -> trace.Tracer:
return trace.get_tracer(name)
Custom Spans
from opentelemetry import trace
from opentelemetry.trace import Status, StatusCode
tracer = trace.get_tracer(__name__)
async def process_order(order_id: str):
with tracer.start_as_current_span("process_order") as span:
span.set_attribute("order.id", order_id)
try:
# Validate order
with tracer.start_as_current_span("validate_order"):
order = await validate_order(order_id)
span.set_attribute("order.total", order.total)
# Process payment
with tracer.start_as_current_span("process_payment"):
payment = await process_payment(order)
span.set_attribute("payment.id", payment.id)
# Update inventory
with tracer.start_as_current_span("update_inventory"):
await update_inventory(order.items)
span.set_status(Status(StatusCode.OK))
return order
except Exception as e:
span.set_status(Status(StatusCode.ERROR, str(e)))
span.record_exception(e)
raise
Health Check Endpoints
# app/routes/health.py
from fastapi import APIRouter, Response
from typing import Dict, Any
from enum import Enum
class HealthStatus(str, Enum):
HEALTHY = "healthy"
DEGRADED = "degraded"
UNHEALTHY = "unhealthy"
router = APIRouter(tags=["Health"])
@router.get("/health")
async def health() -> Dict[str, str]:
"""Kubernetes liveness probe."""
return {"status": "ok"}
@router.get("/health/ready")
async def ready(
db: Database = Depends(get_db),
cache: RedisCache = Depends(get_cache)
) -> Response:
"""Kubernetes readiness probe with dependency checks."""
checks = {}
status = HealthStatus.HEALTHY
# MongoDB check
try:
await db.command("ping")
checks["mongodb"] = {"status": "ok", "latency_ms": 0}
except Exception as e:
checks["mongodb"] = {"status": "error", "error": str(e)}
status = HealthStatus.UNHEALTHY
# Redis check
try:
start = time.perf_counter()
await cache.client.ping()
latency = (time.perf_counter() - start) * 1000
checks["redis"] = {"status": "ok", "latency_ms": round(latency, 2)}
except Exception as e:
checks["redis"] = {"status": "error", "error": str(e)}
status = HealthStatus.DEGRADED # Cache failure = degraded
response_data = {
"status": status.value,
"checks": checks,
"timestamp": datetime.utcnow().isoformat()
}
status_code = 200 if status != HealthStatus.UNHEALTHY else 503
return Response(
content=json.dumps(response_data),
status_code=status_code,
media_type="application/json"
)
@router.get("/health/live")
async def live() -> Dict[str, str]:
"""Simple liveness check."""
return {"status": "alive"}
Application Integration
from fastapi import FastAPI
from app.core.logging import setup_logging
from app.core.metrics import setup_metrics
from app.core.tracing import setup_tracing
def create_app() -> FastAPI:
# Setup logging first
setup_logging(
log_level=settings.log_level,
json_logs=settings.environment == "production"
)
app = FastAPI(title="API Service")
# Setup metrics
setup_metrics(app)
# Setup tracing
if settings.otlp_endpoint:
setup_tracing(
app,
service_name="api-service",
otlp_endpoint=settings.otlp_endpoint
)
# Add middleware
app.add_middleware(RequestLoggingMiddleware)
return app
Additional Resources
Reference Files
For detailed configuration:
references/grafana-dashboards.md- Grafana dashboard JSONreferences/alerting.md- Prometheus alerting rulesreferences/elk-setup.md- Elasticsearch/Kibana log aggregation
Example Files
Working examples in examples/:
examples/logging_config.py- Complete logging setupexamples/metrics_middleware.py- Custom metrics middlewareexamples/tracing_service.py- Service with tracing
More from lobbi-docs/claude
vision-multimodal
Vision and multimodal capabilities for Claude including image analysis, PDF processing, and document understanding. Activate for image input, base64 encoding, multiple images, and visual analysis.
242design-system
Apply and manage the AI-powered design system with 50+ curated styles
126complex-reasoning
Multi-step reasoning patterns and frameworks for systematic problem solving. Activate for Chain-of-Thought, Tree-of-Thought, hypothesis-driven debugging, and structured analytical approaches that leverage extended thinking.
105gcp
Google Cloud Platform services including GKE, Cloud Run, Cloud Storage, BigQuery, and Pub/Sub. Activate for GCP infrastructure, Google Cloud deployment, and GCP integration.
73kanban
Kanban methodology including boards, WIP limits, flow metrics, and continuous delivery. Activate for Kanban boards, workflow visualization, and lean project management.
62debugging
Debugging techniques for Python, JavaScript, and distributed systems. Activate for troubleshooting, error analysis, log investigation, and performance debugging. Includes extended thinking integration for complex debugging scenarios.
59