observability-instrument-with-otel
Instrument with OpenTelemetry
Purpose
Add OpenTelemetry (OTEL) instrumentation to service methods for distributed tracing, structured logging, and observability across Clean Architecture layers.
When to Use
Use this skill when:
- Adding new service methods that need observability
- Debugging complex execution flows across layers
- Investigating performance issues or bottlenecks
- Need distributed tracing for multi-service operations
- Adding structured logging with trace correlation
- Instrumenting Application, Infrastructure, or Interface layers
- Replacing print() statements with proper logging
- Tracking operations across method boundaries
Table of Contents
Core Sections
- Quick Start
- Simple example to get started immediately
- Instructions
- Examples
Advanced Topics
- Common Patterns
- Observability Best Practices
- Guidelines for effective instrumentation
- Troubleshooting
- Requirements
- Dependencies and prerequisites
Supporting Resources
- references/reference.md - Advanced OTEL configuration
- ARCHITECTURE.md - Clean Architecture layers
- Core monitoring module:
src/project_watch_mcp/core/monitoring/
Utility Scripts
- scripts/add_tracing.py - Auto-add OpenTelemetry instrumentation to service files
- scripts/analyze_traces.py - Analyze trace coverage in Python service files
- scripts/validate_instrumentation.py - Validate OTEL instrumentation patterns
Quick Start
Add OTEL tracing to a service method:
from project_watch_mcp.core.monitoring import get_logger, traced
logger = get_logger(__name__)
class MyService:
@traced
async def my_method(self, param: str) -> ServiceResult[Data]:
logger.info(f"Processing {param}")
# Method implementation
return ServiceResult.ok(result)
Instructions
Step 1: Import OTEL Components
Add imports at the top of your file (fail-fast principle):
from project_watch_mcp.core.monitoring import get_logger, traced
For advanced instrumentation (context management, correlation IDs):
from project_watch_mcp.core.monitoring import (
get_logger,
traced,
trace_span,
correlation_context,
add_context,
)
Step 2: Get Logger Instance
Replace any existing logger initialization with OTEL logger:
logger = get_logger(__name__)
Why: OTEL logger automatically includes trace_id, span_id, and correlation IDs in all log messages.
Step 3: Apply @traced Decorator
Add @traced decorator to methods you want to trace:
For Application Layer (Commands/Queries):
class IndexFileHandler(CommandHandler[IndexFileCommand]):
@traced
async def handle(self, command: IndexFileCommand) -> ServiceResult[None]:
logger.info(f"Indexing file: {command.file_path}")
# Implementation
For Infrastructure Services:
class EmbeddingService:
@traced
async def generate_embeddings(self, texts: list[str]) -> ServiceResult[list[float]]:
logger.info(f"Generating embeddings for {len(texts)} texts")
# Implementation
For MCP Tools (Interface Layer):
@traced
async def search_code(query: str, search_type: str = "semantic") -> dict:
logger.info(f"Search request: query='{query}', type={search_type}")
# Implementation
Step 4: Add Structured Logging
Use logger with contextual information (avoid magic numbers, use clear descriptions):
Good Examples:
logger.info(f"Processing file: {file_path}")
logger.debug(f"Query execution time: {elapsed_ms}ms")
logger.warning(f"Retry attempt {attempt}/{max_retries} failed")
logger.error(f"Failed to connect to Neo4j: {str(e)}")
What @traced Automatically Adds:
trace_id: Distributed tracing ID (propagated across services)span_id: Current operation ID (unique per method call)- Function name as span name (e.g.,
IndexFileHandler.handle) - Function arguments as span attributes (primitives only)
Step 5: Add Manual Spans for Complex Operations (Optional)
For fine-grained tracing within a method:
@traced
async def complex_operation(self, data: Data) -> ServiceResult[Result]:
logger.info("Starting complex operation")
# Manual span for specific sub-operation
with trace_span("validate_data", data_size=len(data)) as span:
logger.info("Validating data")
validation_result = await self._validate(data)
span.set_attribute("validation_passed", validation_result.success)
# Another manual span
with trace_span("process_chunks", chunk_count=10) as span:
logger.info("Processing chunks")
results = await self._process_chunks(data)
span.set_attribute("chunks_processed", len(results))
return ServiceResult.ok(results)
Step 6: Add Correlation Context for Request Tracking (Optional)
For tracking operations across multiple method calls:
async def index_repository(self, repo_path: str) -> ServiceResult[None]:
with correlation_context() as cid:
logger.info(f"Starting repository indexing: {repo_path}")
# All subsequent logs will include this correlation_id
for file_path in files:
await self.index_file(file_path) # Logs include same correlation_id
logger.info("Repository indexing complete")
Step 7: Verify Instrumentation
Run tests to ensure tracing works:
uv run pytest tests/path/to/test.py -v
Check logs for trace/span IDs:
tail -f logs/project-watch-mcp.log | grep "trace:"
Expected log format:
2025-10-18 14:30:45 - [trace:a1b2c3d4 | span:e5f6g7h8] - module.name - INFO - Processing file
Examples
Example 1: Add Tracing to Command Handler
Before:
class IndexFileHandler(CommandHandler[IndexFileCommand]):
async def handle(self, command: IndexFileCommand) -> ServiceResult[None]:
print(f"Indexing {command.file_path}")
return await self._index(command)
After:
from project_watch_mcp.core.monitoring import get_logger, traced
logger = get_logger(__name__)
class IndexFileHandler(CommandHandler[IndexFileCommand]):
@traced
async def handle(self, command: IndexFileCommand) -> ServiceResult[None]:
logger.info(f"Indexing file: {command.file_path}")
return await self._index(command)
Example 2: Add Tracing to Infrastructure Service
Before:
class Neo4jCodeRepository:
async def save_file(self, file: File) -> bool:
logging.info("Saving file")
# Implementation
After:
from project_watch_mcp.core.monitoring import get_logger, traced
logger = get_logger(__name__)
class Neo4jCodeRepository:
@traced
async def save_file(self, file: File) -> ServiceResult[None]:
logger.info(f"Saving file to Neo4j: {file.path}")
# Implementation
return ServiceResult.ok(None)
Example 3: Add Manual Spans for Performance Tracking
from project_watch_mcp.core.monitoring import get_logger, traced, trace_span
logger = get_logger(__name__)
class EmbeddingService:
@traced
async def batch_generate(self, texts: list[str]) -> ServiceResult[list[Embedding]]:
logger.info(f"Batch generating embeddings for {len(texts)} texts")
# Track API call performance
with trace_span("voyage_api_call", text_count=len(texts)) as span:
logger.debug("Calling Voyage AI API")
embeddings = await self.client.embed(texts)
span.set_attribute("embeddings_generated", len(embeddings))
# Track storage performance
with trace_span("store_embeddings", embedding_count=len(embeddings)) as span:
logger.debug("Storing embeddings in Neo4j")
await self.repository.save_embeddings(embeddings)
span.set_attribute("storage_success", True)
return ServiceResult.ok(embeddings)
Example 4: Add Correlation Context for Multi-File Operations
from project_watch_mcp.core.monitoring import get_logger, traced, correlation_context
logger = get_logger(__name__)
class RepositoryIndexer:
@traced
async def index_repository(self, repo_path: str) -> ServiceResult[None]:
# Generate correlation ID for this indexing operation
with correlation_context() as cid:
logger.info(f"Starting repository indexing: {repo_path} (correlation_id: {cid})")
files = await self.discover_files(repo_path)
logger.info(f"Discovered {len(files)} files to index")
for file_path in files:
# All logs from index_file will include same correlation_id
await self.index_file_handler.handle(IndexFileCommand(file_path))
logger.info(f"Repository indexing complete: {len(files)} files processed")
return ServiceResult.ok(None)
Requirements
- OpenTelemetry initialized (handled by
initialize_otel_logger()in core/monitoring) - Python 3.11+ (for modern type hints)
- Async/await support for async methods
- ServiceResult pattern used for return types
- Access to
project_watch_mcp.core.monitoringmodule
Common Patterns
Pattern 1: Service Method Instrumentation
from project_watch_mcp.core.monitoring import get_logger, traced
logger = get_logger(__name__)
class MyService:
@traced
async def operation(self, param: str) -> ServiceResult[Data]:
logger.info(f"Starting operation with param: {param}")
result = await self._execute(param)
logger.info(f"Operation completed successfully")
return ServiceResult.ok(result)
Pattern 2: Error Logging with Trace Context
@traced
async def risky_operation(self) -> ServiceResult[Data]:
try:
logger.info("Attempting risky operation")
result = await self._risky_call()
return ServiceResult.ok(result)
except Exception as e:
logger.error(f"Operation failed: {str(e)}")
return ServiceResult.fail(f"Operation failed: {str(e)}")
Pattern 3: Multi-Step Operation Tracking
@traced
async def multi_step_process(self, data: Data) -> ServiceResult[Result]:
with trace_span("step_1_validation") as span:
logger.info("Step 1: Validating input")
validation = await self._validate(data)
span.set_attribute("valid", validation.success)
with trace_span("step_2_processing") as span:
logger.info("Step 2: Processing data")
processed = await self._process(data)
span.set_attribute("items_processed", len(processed))
with trace_span("step_3_storage") as span:
logger.info("Step 3: Storing results")
await self._store(processed)
span.set_attribute("items_stored", len(processed))
return ServiceResult.ok(processed)
Observability Best Practices
- Always use @traced on public methods - Commands, Queries, Services
- Never use print() statements - Use logger instead (fail-fast principle)
- Add context to logs - Include relevant parameters, counts, durations
- Use trace_span for expensive operations - API calls, database queries, file I/O
- Use correlation_context for request tracking - Multi-file operations, batch processing
- Log at appropriate levels - DEBUG (detailed), INFO (milestones), WARNING (degraded), ERROR (failures)
- Avoid logging sensitive data - API keys, passwords, PII
- Set span attributes for metrics - Counts, durations, success/failure indicators
Troubleshooting
Logs Missing Trace IDs
Issue: Logs don't show [trace:... | span:...]
Solution: Ensure OTEL is initialized before any logging:
from project_watch_mcp.core.monitoring import initialize_otel_logger
initialize_otel_logger() # Call once at application startup
@traced Not Working on Async Methods
Issue: Decorator doesn't capture spans for async methods
Solution: The @traced decorator handles both sync and async automatically. Ensure you're using async def:
@traced
async def my_method(self): # ✅ Works
pass
@traced
def my_method(self): # ✅ Also works for sync
pass
Span Attributes Not Appearing
Issue: Custom attributes not visible in traces
Solution: Use span.set_attribute() within the span context:
with trace_span("operation") as span:
span.set_attribute("key", "value") # ✅ Correct
See Also
- references/reference.md - Advanced OTEL configuration
- ARCHITECTURE.md - Clean Architecture layers
- Core monitoring module:
src/project_watch_mcp/core/monitoring/