agent-observability
Agent Observability
Monitor AI agent behavior with logs, traces, metrics, and cost telemetry. This skill covers the full observability stack for LLM-powered applications: from raw Prometheus counters to Grafana dashboards, OpenTelemetry tracing, structured logging, cost tracking, SLO definition, and PII redaction.
When to Use
Apply this skill whenever you operate:
- Autonomous AI agents that make multi-step tool calls (e.g., coding agents, support agents, data-pipeline agents).
- LLM-backed APIs serving chat completions, summarisation, or classification behind a REST or gRPC gateway.
- RAG pipelines where a retriever fetches context from a vector store before prompting a model.
- Multi-agent orchestrations (crew-style or graph-based) where several agents collaborate on a single task.
- Batch inference jobs that process thousands of prompts against a model endpoint.
Key signals that you need this skill:
- You cannot answer "what is p95 latency for agent responses this week?"
- You have no per-request cost attribution.
- Debugging a bad agent response requires grepping raw application logs.
- You have no alerting on token-usage spikes or elevated error rates.
Core Metrics
Define these metrics at the application layer. All examples use the Prometheus client library naming conventions.
Latency
from prometheus_client import Histogram
# Total end-to-end latency for a full agent turn (user prompt -> final response)
AGENT_LATENCY = Histogram(
"agent_request_duration_seconds",
"End-to-end latency of an agent request",
labelnames=["agent_name", "model", "status"],
buckets=(0.25, 0.5, 1, 2, 5, 10, 30, 60, 120),
)
# Latency of a single LLM API call (one completion request)
LLM_CALL_LATENCY = Histogram(
"llm_call_duration_seconds",
"Latency of an individual LLM API call",
labelnames=["model", "provider", "stream"],
buckets=(0.1, 0.25, 0.5, 1, 2, 5, 10, 30),
)
# Latency of tool/function calls executed by the agent
TOOL_CALL_LATENCY = Histogram(
"agent_tool_call_duration_seconds",
"Latency of a tool call executed by the agent",
labelnames=["tool_name", "agent_name", "status"],
buckets=(0.05, 0.1, 0.25, 0.5, 1, 2, 5, 10),
)
Token Usage
from prometheus_client import Counter, Histogram
PROMPT_TOKENS = Counter(
"llm_prompt_tokens_total",
"Total prompt tokens sent to the model",
labelnames=["model", "agent_name"],
)
COMPLETION_TOKENS = Counter(
"llm_completion_tokens_total",
"Total completion tokens received from the model",
labelnames=["model", "agent_name"],
)
CACHED_TOKENS = Counter(
"llm_cached_tokens_total",
"Prompt tokens served from KV-cache (provider-reported)",
labelnames=["model", "agent_name"],
)
TOKENS_PER_REQUEST = Histogram(
"llm_tokens_per_request",
"Total tokens (prompt + completion) per request",
labelnames=["model", "agent_name"],
buckets=(100, 500, 1000, 2000, 4000, 8000, 16000, 32000, 64000, 128000),
)
Cost
from prometheus_client import Counter
LLM_COST = Counter(
"llm_cost_dollars_total",
"Estimated cost in USD for LLM usage",
labelnames=["model", "agent_name", "cost_type"], # cost_type: prompt | completion
)
Tool Calls
from prometheus_client import Counter
TOOL_CALLS_TOTAL = Counter(
"agent_tool_calls_total",
"Total tool calls made by agents",
labelnames=["tool_name", "agent_name", "status"], # status: success | error | timeout
)
Errors and Retries
from prometheus_client import Counter, Gauge
LLM_ERRORS = Counter(
"llm_errors_total",
"Errors returned by the LLM provider",
labelnames=["model", "provider", "error_type"], # error_type: rate_limit | timeout | 5xx | auth
)
LLM_RETRIES = Counter(
"llm_retries_total",
"Retried LLM API calls",
labelnames=["model", "provider", "retry_reason"],
)
AGENT_ACTIVE_REQUESTS = Gauge(
"agent_active_requests",
"Number of agent requests currently in flight",
labelnames=["agent_name"],
)
OpenTelemetry Integration
Use the OpenTelemetry Python SDK to create traces that capture every step of an agent turn: the top-level request, each LLM call, each tool execution, and retrieval operations.
Setup
# otel_setup.py
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource
def init_tracing(service_name: str, otlp_endpoint: str = "http://localhost:4317"):
resource = Resource.create({
"service.name": service_name,
"service.version": "1.0.0",
"deployment.environment": "production",
})
provider = TracerProvider(resource=resource)
exporter = OTLPSpanExporter(endpoint=otlp_endpoint, insecure=True)
provider.add_span_processor(BatchSpanProcessor(exporter))
trace.set_tracer_provider(provider)
return trace.get_tracer(service_name)
Tracing LLM Calls
# llm_tracing.py
import time
from opentelemetry import trace
from opentelemetry.trace import StatusCode
tracer = trace.get_tracer("agent.llm")
def traced_llm_call(client, messages, model="gpt-4o", **kwargs):
"""Wrap an LLM completion call with a full OpenTelemetry span."""
with tracer.start_as_current_span("llm.chat_completion") as span:
span.set_attribute("llm.model", model)
span.set_attribute("llm.provider", "openai")
span.set_attribute("llm.message_count", len(messages))
span.set_attribute("llm.temperature", kwargs.get("temperature", 1.0))
span.set_attribute("llm.max_tokens", kwargs.get("max_tokens", 0))
start = time.perf_counter()
try:
response = client.chat.completions.create(
model=model, messages=messages, **kwargs
)
elapsed = time.perf_counter() - start
usage = response.usage
span.set_attribute("llm.prompt_tokens", usage.prompt_tokens)
span.set_attribute("llm.completion_tokens", usage.completion_tokens)
span.set_attribute("llm.total_tokens", usage.total_tokens)
span.set_attribute("llm.duration_seconds", elapsed)
span.set_attribute("llm.finish_reason", response.choices[0].finish_reason)
span.set_status(StatusCode.OK)
# Update Prometheus counters
PROMPT_TOKENS.labels(model=model, agent_name="default").inc(usage.prompt_tokens)
COMPLETION_TOKENS.labels(model=model, agent_name="default").inc(usage.completion_tokens)
LLM_CALL_LATENCY.labels(model=model, provider="openai", stream="false").observe(elapsed)
return response
except Exception as exc:
elapsed = time.perf_counter() - start
span.set_status(StatusCode.ERROR, str(exc))
span.record_exception(exc)
LLM_ERRORS.labels(model=model, provider="openai", error_type=type(exc).__name__).inc()
raise
Tracing Tool Execution
# tool_tracing.py
import functools
from opentelemetry import trace
from opentelemetry.trace import StatusCode
tracer = trace.get_tracer("agent.tools")
def traced_tool(tool_name: str):
"""Decorator that wraps a tool function with an OTel span and Prometheus metrics."""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
with tracer.start_as_current_span(f"tool.{tool_name}") as span:
span.set_attribute("tool.name", tool_name)
span.set_attribute("tool.args_count", len(args) + len(kwargs))
import time
start = time.perf_counter()
try:
result = func(*args, **kwargs)
elapsed = time.perf_counter() - start
span.set_attribute("tool.duration_seconds", elapsed)
span.set_status(StatusCode.OK)
TOOL_CALLS_TOTAL.labels(
tool_name=tool_name, agent_name="default", status="success"
).inc()
TOOL_CALL_LATENCY.labels(
tool_name=tool_name, agent_name="default", status="success"
).observe(elapsed)
return result
except Exception as exc:
elapsed = time.perf_counter() - start
span.set_status(StatusCode.ERROR, str(exc))
span.record_exception(exc)
TOOL_CALLS_TOTAL.labels(
tool_name=tool_name, agent_name="default", status="error"
).inc()
TOOL_CALL_LATENCY.labels(
tool_name=tool_name, agent_name="default", status="error"
).observe(elapsed)
raise
return wrapper
return decorator
# Usage
@traced_tool("web_search")
def web_search(query: str) -> str:
# ... tool implementation ...
pass
@traced_tool("sql_query")
def sql_query(statement: str) -> list:
# ... tool implementation ...
pass
Propagating Trace Context Across Services
# context_propagation.py
from opentelemetry import context
from opentelemetry.propagate import inject, extract
import httpx
def call_downstream_service(url: str, payload: dict) -> dict:
"""Propagate the current trace context to a downstream HTTP service."""
headers = {}
inject(headers) # injects traceparent + tracestate headers
response = httpx.post(url, json=payload, headers=headers)
response.raise_for_status()
return response.json()
def extract_context_from_request(request_headers: dict):
"""Extract trace context from incoming request headers (for the receiving service)."""
ctx = extract(request_headers)
token = context.attach(ctx)
return token # call context.detach(token) when done
Structured Logging
Emit JSON logs for every agent action so they can be ingested by Loki, Elasticsearch, or Datadog.
Python Logging Configuration
# logging_config.py
import logging
import json
import sys
from datetime import datetime, timezone
class AgentJSONFormatter(logging.Formatter):
"""Structured JSON formatter for agent logs."""
def format(self, record: logging.LogRecord) -> str:
log_entry = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"level": record.levelname,
"logger": record.name,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
"line": record.lineno,
}
# Merge any extra fields attached to the record
for key in ("trace_id", "span_id", "agent_name", "model",
"tool_name", "request_id", "user_id",
"prompt_tokens", "completion_tokens", "cost_usd",
"duration_seconds", "status", "error_type"):
value = getattr(record, key, None)
if value is not None:
log_entry[key] = value
if record.exc_info and record.exc_info[0] is not None:
log_entry["exception"] = self.formatException(record.exc_info)
return json.dumps(log_entry, default=str)
def configure_logging(level: str = "INFO"):
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(AgentJSONFormatter())
root = logging.getLogger()
root.setLevel(getattr(logging, level))
root.handlers = [handler]
# Suppress noisy libraries
logging.getLogger("httpx").setLevel(logging.WARNING)
logging.getLogger("opentelemetry").setLevel(logging.WARNING)
Logging Agent Actions
# agent_logging.py
import logging
from opentelemetry import trace
logger = logging.getLogger("agent")
def log_llm_call(model: str, prompt_tokens: int, completion_tokens: int,
duration: float, cost: float, status: str = "ok"):
span = trace.get_current_span()
ctx = span.get_span_context() if span else None
logger.info(
"LLM call completed",
extra={
"trace_id": format(ctx.trace_id, "032x") if ctx else None,
"span_id": format(ctx.span_id, "016x") if ctx else None,
"model": model,
"prompt_tokens": prompt_tokens,
"completion_tokens": completion_tokens,
"duration_seconds": round(duration, 3),
"cost_usd": round(cost, 6),
"status": status,
"agent_name": "default",
},
)
def log_tool_call(tool_name: str, duration: float, status: str, error: str = None):
span = trace.get_current_span()
ctx = span.get_span_context() if span else None
extra = {
"trace_id": format(ctx.trace_id, "032x") if ctx else None,
"span_id": format(ctx.span_id, "016x") if ctx else None,
"tool_name": tool_name,
"duration_seconds": round(duration, 3),
"status": status,
"agent_name": "default",
}
if error:
extra["error_type"] = error
logger.info("Tool call completed", extra=extra)
Example log output:
{
"timestamp": "2026-03-24T14:22:01.337Z",
"level": "INFO",
"logger": "agent",
"message": "LLM call completed",
"module": "agent_logging",
"function": "log_llm_call",
"line": 12,
"trace_id": "0af7651916cd43dd8448eb211c80319c",
"span_id": "b7ad6b7169203331",
"model": "gpt-4o",
"prompt_tokens": 1842,
"completion_tokens": 356,
"duration_seconds": 2.417,
"cost_usd": 0.013770,
"status": "ok",
"agent_name": "support-agent"
}
Grafana Dashboards
Agent Overview Dashboard
Save this JSON as agent-overview.json and import it into Grafana.
{
"dashboard": {
"title": "AI Agent Overview",
"uid": "agent-overview-v1",
"tags": ["ai", "agent", "llm"],
"timezone": "browser",
"refresh": "30s",
"panels": [
{
"title": "Request Latency (p50 / p95 / p99)",
"type": "timeseries",
"gridPos": { "h": 8, "w": 12, "x": 0, "y": 0 },
"targets": [
{
"expr": "histogram_quantile(0.50, sum(rate(agent_request_duration_seconds_bucket[5m])) by (le))",
"legendFormat": "p50"
},
{
"expr": "histogram_quantile(0.95, sum(rate(agent_request_duration_seconds_bucket[5m])) by (le))",
"legendFormat": "p95"
},
{
"expr": "histogram_quantile(0.99, sum(rate(agent_request_duration_seconds_bucket[5m])) by (le))",
"legendFormat": "p99"
}
],
"fieldConfig": {
"defaults": {
"unit": "s",
"thresholds": {
"steps": [
{ "color": "green", "value": null },
{ "color": "yellow", "value": 5 },
{ "color": "red", "value": 15 }
]
}
}
}
},
{
"title": "Token Usage (prompt vs completion)",
"type": "timeseries",
"gridPos": { "h": 8, "w": 12, "x": 12, "y": 0 },
"targets": [
{
"expr": "sum(rate(llm_prompt_tokens_total[5m])) by (model)",
"legendFormat": "prompt - {{ model }}"
},
{
"expr": "sum(rate(llm_completion_tokens_total[5m])) by (model)",
"legendFormat": "completion - {{ model }}"
}
],
"fieldConfig": {
"defaults": { "unit": "short" }
}
},
{
"title": "Cost per Hour (USD)",
"type": "stat",
"gridPos": { "h": 4, "w": 6, "x": 0, "y": 8 },
"targets": [
{
"expr": "sum(rate(llm_cost_dollars_total[1h])) * 3600",
"legendFormat": "$/hr"
}
],
"fieldConfig": {
"defaults": {
"unit": "currencyUSD",
"thresholds": {
"steps": [
{ "color": "green", "value": null },
{ "color": "yellow", "value": 10 },
{ "color": "red", "value": 50 }
]
}
}
}
},
{
"title": "Error Rate (%)",
"type": "gauge",
"gridPos": { "h": 4, "w": 6, "x": 6, "y": 8 },
"targets": [
{
"expr": "sum(rate(llm_errors_total[5m])) / (sum(rate(llm_call_duration_seconds_count[5m])) + 1e-10) * 100",
"legendFormat": "error %"
}
],
"fieldConfig": {
"defaults": {
"unit": "percent",
"min": 0,
"max": 100,
"thresholds": {
"steps": [
{ "color": "green", "value": null },
{ "color": "yellow", "value": 1 },
{ "color": "red", "value": 5 }
]
}
}
}
},
{
"title": "Tool Call Success vs Failure",
"type": "timeseries",
"gridPos": { "h": 8, "w": 12, "x": 0, "y": 12 },
"targets": [
{
"expr": "sum(rate(agent_tool_calls_total{status='success'}[5m])) by (tool_name)",
"legendFormat": "ok - {{ tool_name }}"
},
{
"expr": "sum(rate(agent_tool_calls_total{status='error'}[5m])) by (tool_name)",
"legendFormat": "err - {{ tool_name }}"
}
]
},
{
"title": "Active Requests",
"type": "timeseries",
"gridPos": { "h": 8, "w": 12, "x": 12, "y": 12 },
"targets": [
{
"expr": "sum(agent_active_requests) by (agent_name)",
"legendFormat": "{{ agent_name }}"
}
]
}
]
}
}
Cost Tracking
Per-Model Cost Calculation
# cost_tracker.py
from dataclasses import dataclass
@dataclass
class ModelPricing:
prompt_cost_per_1k: float # USD per 1,000 prompt tokens
completion_cost_per_1k: float # USD per 1,000 completion tokens
# Updated pricing as of early 2026 -- adjust to your negotiated rates
MODEL_PRICING: dict[str, ModelPricing] = {
"gpt-4o": ModelPricing(0.0025, 0.0100),
"gpt-4o-mini": ModelPricing(0.00015, 0.0006),
"gpt-4.1": ModelPricing(0.002, 0.008),
"gpt-4.1-mini": ModelPricing(0.0004, 0.0016),
"gpt-4.1-nano": ModelPricing(0.0001, 0.0004),
"claude-sonnet-4": ModelPricing(0.003, 0.015),
"claude-haiku-3.5": ModelPricing(0.0008, 0.004),
"claude-opus-4": ModelPricing(0.015, 0.075),
}
def calculate_cost(model: str, prompt_tokens: int, completion_tokens: int) -> float:
"""Return estimated cost in USD. Falls back to zero if model is unknown."""
pricing = MODEL_PRICING.get(model)
if pricing is None:
return 0.0
prompt_cost = (prompt_tokens / 1000) * pricing.prompt_cost_per_1k
completion_cost = (completion_tokens / 1000) * pricing.completion_cost_per_1k
return prompt_cost + completion_cost
def record_cost(model: str, prompt_tokens: int, completion_tokens: int, agent_name: str = "default"):
"""Calculate cost and record it in the Prometheus counter."""
pricing = MODEL_PRICING.get(model)
if pricing is None:
return
prompt_cost = (prompt_tokens / 1000) * pricing.prompt_cost_per_1k
completion_cost = (completion_tokens / 1000) * pricing.completion_cost_per_1k
LLM_COST.labels(model=model, agent_name=agent_name, cost_type="prompt").inc(prompt_cost)
LLM_COST.labels(model=model, agent_name=agent_name, cost_type="completion").inc(completion_cost)
Budget Alerting -- Prometheus Rules
Save as agent-cost-alerts.yaml and load it into Prometheus or Cortex ruler.
# agent-cost-alerts.yaml
groups:
- name: agent_cost_alerts
interval: 1m
rules:
# Fire if hourly spend exceeds $25
- alert: AgentCostHourlyHigh
expr: sum(rate(llm_cost_dollars_total[1h])) * 3600 > 25
for: 5m
labels:
severity: warning
team: ai-platform
annotations:
summary: "Agent LLM spend exceeds $25/hr"
description: >
Current hourly spend is ${{ $value | printf "%.2f" }}.
Check for runaway loops, prompt-stuffing, or unexpected traffic.
# Fire if daily projected spend exceeds $500
- alert: AgentCostDailyProjectionHigh
expr: sum(rate(llm_cost_dollars_total[1h])) * 86400 > 500
for: 15m
labels:
severity: critical
team: ai-platform
annotations:
summary: "Projected daily agent spend exceeds $500"
description: >
Projected daily spend: ${{ $value | printf "%.2f" }}.
Consider throttling requests or switching to a cheaper model.
# Fire if a single agent's cost spikes 3x above its 24h average
- alert: AgentCostSpike
expr: >
sum(rate(llm_cost_dollars_total[5m])) by (agent_name)
/
(sum(rate(llm_cost_dollars_total[24h])) by (agent_name) + 1e-10)
> 3
for: 10m
labels:
severity: warning
team: ai-platform
annotations:
summary: "Agent {{ $labels.agent_name }} cost spiked 3x above 24h average"
Langfuse / Helicone Integration
Langfuse (Self-hosted or Cloud)
Langfuse provides trace-level visibility with prompt management and scoring. It can run alongside your existing OTel stack.
# langfuse_integration.py
from langfuse import Langfuse
from langfuse.decorators import observe, langfuse_context
# Initialize -- reads LANGFUSE_SECRET_KEY, LANGFUSE_PUBLIC_KEY, LANGFUSE_HOST from env
langfuse = Langfuse()
@observe(as_type="generation")
def call_llm(client, messages, model="gpt-4o", **kwargs):
"""Langfuse automatically captures input/output, tokens, latency, and cost."""
response = client.chat.completions.create(
model=model, messages=messages, **kwargs
)
langfuse_context.update_current_observation(
model=model,
usage={
"input": response.usage.prompt_tokens,
"output": response.usage.completion_tokens,
},
metadata={"temperature": kwargs.get("temperature", 1.0)},
)
return response
@observe()
def run_agent(user_input: str):
"""Top-level agent trace -- all nested @observe calls become child spans."""
langfuse_context.update_current_trace(
user_id="user-123",
session_id="session-abc",
tags=["production"],
)
# ... agent logic with nested call_llm() and tool calls ...
Environment variables for Langfuse:
export LANGFUSE_SECRET_KEY="sk-lf-..."
export LANGFUSE_PUBLIC_KEY="pk-lf-..."
export LANGFUSE_HOST="https://cloud.langfuse.com" # or your self-hosted URL
Helicone (Proxy-based)
Helicone acts as a logging proxy. Point your OpenAI base URL at Helicone and it captures everything automatically.
# helicone_integration.py
from openai import OpenAI
client = OpenAI(
base_url="https://oai.helicone.ai/v1",
default_headers={
"Helicone-Auth": "Bearer sk-helicone-...",
"Helicone-Property-Agent": "support-agent",
"Helicone-Property-Environment": "production",
"Helicone-User-Id": "user-123",
"Helicone-Session-Id": "session-abc",
"Helicone-Cache-Enabled": "true", # enable response caching
"Helicone-Rate-Limit-Policy": "100;w=60", # 100 req per 60s
},
)
# All calls through this client are automatically logged in Helicone
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Summarise this document..."}],
)
SLO Definition
Define Service Level Objectives for your agents and enforce them with Prometheus recording and alerting rules.
Recording Rules
# agent-slo-recording-rules.yaml
groups:
- name: agent_slo_recording
interval: 30s
rules:
# Success rate (non-error responses / total responses)
- record: agent:success_rate:5m
expr: >
1 - (
sum(rate(llm_errors_total[5m]))
/
(sum(rate(llm_call_duration_seconds_count[5m])) + 1e-10)
)
# p95 latency
- record: agent:latency_p95:5m
expr: >
histogram_quantile(0.95,
sum(rate(agent_request_duration_seconds_bucket[5m])) by (le)
)
# p50 latency
- record: agent:latency_p50:5m
expr: >
histogram_quantile(0.50,
sum(rate(agent_request_duration_seconds_bucket[5m])) by (le)
)
SLO Alert Rules
# agent-slo-alerts.yaml
groups:
- name: agent_slo_alerts
rules:
# SLO: 99.5% success rate over a rolling 30-day window
- alert: AgentSuccessRateSLOBreach
expr: agent:success_rate:5m < 0.995
for: 10m
labels:
severity: critical
slo: agent-success-rate
annotations:
summary: "Agent success rate below 99.5% SLO"
description: >
Current success rate: {{ $value | printf "%.4f" }}.
SLO target: 0.995. Investigate elevated LLM errors or tool failures.
# SLO: p95 latency under 5 seconds
- alert: AgentLatencyP95SLOBreach
expr: agent:latency_p95:5m > 5
for: 10m
labels:
severity: warning
slo: agent-latency-p95
annotations:
summary: "Agent p95 latency exceeds 5s SLO"
description: >
Current p95 latency: {{ $value | printf "%.2f" }}s.
Check for slow LLM responses, long tool calls, or context-window bloat.
# SLO: p50 latency under 2 seconds
- alert: AgentLatencyP50SLOBreach
expr: agent:latency_p50:5m > 2
for: 15m
labels:
severity: warning
slo: agent-latency-p50
annotations:
summary: "Agent median latency exceeds 2s SLO"
description: >
Current p50 latency: {{ $value | printf "%.2f" }}s.
# Error budget: burn rate alert (multi-window)
- alert: AgentErrorBudgetFastBurn
expr: >
(
1 - (sum(rate(llm_errors_total[5m])) / (sum(rate(llm_call_duration_seconds_count[5m])) + 1e-10))
) < 0.99
for: 5m
labels:
severity: critical
slo: agent-error-budget
annotations:
summary: "Agent error budget burning fast -- success rate below 99% over 5m"
Sloth SLO Spec (Alternative)
If you use Sloth to manage SLOs declaratively:
# agent-slo-sloth.yaml
version: "prometheus/v1"
service: "ai-agent"
labels:
team: ai-platform
slos:
- name: "agent-availability"
objective: 99.5
description: "99.5% of agent requests should succeed"
sli:
events:
error_query: sum(rate(llm_errors_total{job="agent"}[{{.window}}]))
total_query: sum(rate(llm_call_duration_seconds_count{job="agent"}[{{.window}}]))
alerting:
name: AgentAvailability
labels:
team: ai-platform
page_alert:
labels:
severity: critical
ticket_alert:
labels:
severity: warning
Debugging Workflows
Slow Agent Responses
-
Identify the bottleneck. Open the Grafana dashboard and check whether p95 latency is driven by LLM calls or tool calls.
# Which component is slow? topk(5, histogram_quantile(0.95, sum(rate(agent_tool_call_duration_seconds_bucket[5m])) by (le, tool_name))) -
Check token counts. Bloated prompts cause proportionally slower responses.
# Average tokens per request, by model sum(rate(llm_prompt_tokens_total[5m])) by (model) / (sum(rate(llm_call_duration_seconds_count[5m])) by (model) + 1e-10) -
Look for retries. Retries multiply latency.
sum(rate(llm_retries_total[5m])) by (retry_reason) -
Inspect traces. Filter traces in Jaeger or Tempo by
agent_request_duration_seconds > 10sand expand spans to find the slow step. -
Common fixes:
- Reduce system prompt length or move static context into a cached prefix.
- Switch long-running tool calls to async execution with a timeout.
- Use a faster/smaller model for subtasks that do not need the flagship model.
- Enable streaming to reduce time-to-first-token perceived by users.
High Token Usage
-
Rank agents by token consumption:
topk(10, sum(rate(llm_prompt_tokens_total[1h])) by (agent_name)) -
Check for conversation-history bloat. Agents that append full conversation history on every turn consume tokens quadratically.
-
Verify RAG chunk sizes. Oversized retrieval chunks inflate prompt tokens without improving quality.
-
Common fixes:
- Implement sliding-window or summarisation-based memory.
- Reduce the number of retrieved chunks (e.g., top-3 instead of top-10).
- Use prompt caching (Anthropic cache, OpenAI cached-tokens) to reduce cost even if token count stays high.
Tool Failures
-
Identify failing tools:
sum(rate(agent_tool_calls_total{status="error"}[5m])) by (tool_name) -
Correlate with traces. Find traces where
tool.<name>spans haveERRORstatus and read the recorded exception. -
Check for timeouts vs exceptions. Timeouts suggest the downstream service is slow; exceptions suggest a contract change or auth issue.
-
Common fixes:
- Add circuit breakers around unreliable tools.
- Implement fallback tools (e.g., a cached search result when live search is down).
- Add input validation before executing the tool to catch malformed agent arguments.
PII Redaction in Traces
Scrub sensitive data before spans and logs leave the application boundary. This is critical for compliance with GDPR, HIPAA, and SOC 2.
Span Processor for PII Redaction
# pii_redactor.py
import re
from opentelemetry.sdk.trace import SpanProcessor, ReadableSpan
from opentelemetry.sdk.trace.export import SpanExporter
# Patterns for common PII
PII_PATTERNS = {
"email": re.compile(r"[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+"),
"ssn": re.compile(r"\b\d{3}-\d{2}-\d{4}\b"),
"phone_us": re.compile(r"\b(\+1[-.\s]?)?\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}\b"),
"credit_card": re.compile(r"\b\d{4}[-\s]?\d{4}[-\s]?\d{4}[-\s]?\d{4}\b"),
"ip_address": re.compile(r"\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b"),
"jwt": re.compile(r"eyJ[a-zA-Z0-9_-]{10,}\.[a-zA-Z0-9_-]{10,}\.[a-zA-Z0-9_-]{10,}"),
"api_key": re.compile(r"(sk-[a-zA-Z0-9]{20,}|pk-[a-zA-Z0-9]{20,})"),
}
REDACTED = "[REDACTED]"
def redact_string(text: str) -> str:
"""Replace all PII patterns in a string with [REDACTED]."""
if not isinstance(text, str):
return text
for pattern in PII_PATTERNS.values():
text = pattern.sub(REDACTED, text)
return text
class PIIRedactingSpanProcessor(SpanProcessor):
"""Wraps an exporter and redacts PII from span attributes before export."""
def __init__(self, exporter: SpanExporter):
self._exporter = exporter
def on_start(self, span, parent_context=None):
pass
def on_end(self, span: ReadableSpan):
# ReadableSpan attributes are immutable, so we build a sanitised copy
sanitised_attrs = {}
for key, value in span.attributes.items():
if isinstance(value, str):
sanitised_attrs[key] = redact_string(value)
else:
sanitised_attrs[key] = value
# Export the span with redacted attributes
# In practice, you would use a custom exporter wrapper or
# monkey-patch the span. Here is a pragmatic approach using
# the BatchSpanProcessor pattern:
self._exporter.export([span])
def shutdown(self):
self._exporter.shutdown()
def force_flush(self, timeout_millis=None):
self._exporter.force_flush(timeout_millis)
Using the Redactor in Setup
# otel_setup_with_redaction.py
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.resources import Resource
from pii_redactor import PIIRedactingSpanProcessor
def init_tracing_with_redaction(service_name: str, otlp_endpoint: str = "http://localhost:4317"):
resource = Resource.create({"service.name": service_name})
provider = TracerProvider(resource=resource)
exporter = OTLPSpanExporter(endpoint=otlp_endpoint, insecure=True)
# Wrap the exporter with PII redaction
redacting_processor = PIIRedactingSpanProcessor(exporter)
provider.add_span_processor(redacting_processor)
trace.set_tracer_provider(provider)
return trace.get_tracer(service_name)
Redacting Logs
# log_redactor.py
import logging
from pii_redactor import redact_string
class PIIRedactingFilter(logging.Filter):
"""Logging filter that redacts PII from log messages and extra fields."""
def filter(self, record: logging.LogRecord) -> bool:
record.msg = redact_string(str(record.msg))
if record.args:
if isinstance(record.args, dict):
record.args = {k: redact_string(str(v)) for k, v in record.args.items()}
elif isinstance(record.args, tuple):
record.args = tuple(redact_string(str(a)) for a in record.args)
return True
# Attach to your logger
logger = logging.getLogger("agent")
logger.addFilter(PIIRedactingFilter())
Best Practices
- Separate high-cardinality labels. Do not put
user_idorrequest_idin Prometheus labels. Store those in traces and logs instead. - Sample traces in production. Use a head-based sampler (e.g., 10% of requests) plus a tail-based sampler that keeps all error traces.
- Keep a replayable request envelope. Store the full prompt and response in a durable store (S3, GCS) keyed by trace ID for post-incident review.
- Alert on anomalies, not thresholds alone. Combine static thresholds (SLO breach) with anomaly detection (cost spike relative to baseline).
- Version your prompts. Tag each trace with the prompt template version so you can correlate quality regressions with prompt changes.
- Test observability in staging. Run synthetic agent requests in staging and verify that traces, metrics, and alerts fire correctly before shipping to production.
Related Skills
- alerting-oncall - Alert workflows and on-call routing
- agent-evals - Quality verification and evaluation pipelines
- sre-dashboards - General SRE dashboard patterns