data-engineering-observability
SKILL.md
Pipeline Observability
Tracing and metrics for data pipelines using OpenTelemetry and Prometheus. Instrument code for visibility into performance, errors, and data lineage.
Quick Reference
| Tool | Purpose | What it Measures |
|---|---|---|
| OpenTelemetry | Distributed tracing | Pipeline stages, latency, dependencies |
| Prometheus | Metrics | Throughput, error rates, resource utilization |
| Grafana | Visualization | Dashboards combining traces + metrics |
Why Observable?
- Debugging: Trace failed records through pipeline stages
- Performance: Identify bottlenecks, optimize slow transformations
- Reliability: Set alerts on error rates, SLA breaches
- Cost: Track resource usage, optimize expensive operations
- Compliance: Audit trail of data transformations
Skill Dependencies
@data-engineering-core- Pipeline structure to instrument@data-engineering-orchestration- Prefect/Dagster have built-in observability@data-engineering-streaming- Stream processing patterns need tracing
OpenTelemetry Integration
OpenTelemetry (OTel) provides a vendor-neutral standard for distributed tracing, metrics, and logs.
Installation
pip install opentelemetry-api opentelemetry-sdk opentelemetry-exporter-otlp
Basic Tracing
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
import logging
# Setup tracer provider
provider = TracerProvider()
exporter = OTLPSpanExporter(endpoint="http://localhost:4317")
provider.add_span_processor(BatchSpanProcessor(exporter))
trace.set_tracer_provider(provider)
tracer = trace.get_tracer("data_pipeline")
def run_pipeline():
with tracer.start_as_current_span("extract") as span:
span.set_attribute("source", "sales.parquet")
span.set_attribute("format", "parquet")
df = pl.scan_parquet("data/sales.parquet").collect()
span.set_attribute("rows_read", len(df))
with tracer.start_as_current_span("transform") as span:
span.set_attribute("operation", "aggregation")
result = df.group_by("category").agg(pl.col("value").sum())
with tracer.start_as_current_span("load") as span:
span.set_attribute("target", "duckdb.summary")
result.to_pandas().to_sql("summary", conn, if_exists="replace")
span.set_attribute("rows_written", len(result))
if __name__ == "__main__":
run_pipeline()
Trace Context Propagation
For multi-service pipelines, pass trace context:
from opentelemetry import propagators
from opentelemetry.propagators.b3 import B3Format
# Inject trace context into message headers (Kafka, HTTP)
carrier = {}
propagator = B3Format()
propagator.inject(carrier, context=trace.get_current_span().get_context())
# Send carrier dict with message (e.g., Kafka header)
producer.produce(
topic="events",
key=key,
value=json.dumps(data),
headers=list(carrier.items())
)
# Consumer extracts context
context = propagator.extract(carrier=carrier)
with tracer.start_as_current_span("process_message", context=context):
process(data)
Prometheus Metrics
Prometheus collects numeric time series data. Push or pull metrics from your application.
Installation
pip install prometheus-client
Basic Instrumentation
from prometheus_client import Counter, Histogram, Gauge, start_http_server
import time
# Define metrics
ROWS_PROCESSED = Counter(
'etl_rows_processed_total',
'Total rows processed by ETL',
['source', 'stage']
)
PROCESSING_TIME = Histogram(
'etl_processing_seconds',
'Time spent processing',
['operation'],
buckets=[0.1, 0.5, 1.0, 5.0, 10.0, 30.0, 60.0]
)
PIPELINE_ERRORS = Counter(
'etl_errors_total',
'Total preprocessing errors',
['stage', 'error_type']
)
MEMORY_USAGE = Gauge(
'etl_memory_bytes',
'Process memory usage in bytes'
)
# Start metrics server (Prometheus scrapes this endpoint)
start_http_server(8000)
def process_batch(stage: str, batch_id: int):
with PROCESSING_TIME.time(operation=f"batch_{batch_id}"):
try:
rows = extract_and_process(batch_id)
ROWS_PROCESSED.labels(source="kafka", stage=stage).inc(rows)
return rows
except Exception as e:
PIPELINE_ERRORS.labels(stage=stage, error_type=type(e).__name__).inc()
raise
# Periodic gauge update
import psutil
def update_memory():
process = psutil.Process()
MEMORY_USAGE.set(process.memory_info().rss)
Custom Collector
from prometheus_client import CollectorRegistry, Gauge
registry = CollectorRegistry()
# Custom gauge that computes on demand
queue_size = Gauge(
'kafka_queue_size',
'Number of messages in queue',
registry=registry
)
def collect_queue_size():
size = kafka_consumer.metrics()['fetch-metrics']['records-lag-max']
queue_size.set(size)
# Register with push gateway or scrape
Integration with Orchestration
Prefect Built-in Observability
Prefect automatically records:
- Task run status (success/failure)
- Duration
- Retry counts
- Parameters
Enable Prefect Cloud/Server for UI:
prefect cloud login # or prefect server start
prefect agent start -q 'default'
Dagster Observability
Dagster Dagit UI shows:
- Asset materialization history
- Run duration and status
- Asset lineage graph
- Resource usage
Enable metrics:
from dagster import DagsterMetric
@asset
def monitored_asset():
# Dagster automatically records metrics
pass
Dashboards & Alerting
Grafana Dashboard Example
Create dashboard with panels:
- Throughput:
rate(etl_rows_processed_total[5m]) - Latency:
histogram_quantile(0.95, etl_processing_seconds_bucket) - Error Rate:
rate(etl_errors_total[5m]) - Memory:
etl_memory_bytes / 1024 / 1024
Alert Rules (Prometheus Alertmanager)
groups:
- name: etl-alerts
rules:
- alert: HighErrorRate
expr: rate(etl_errors_total[5m]) > 0.1
for: 2m
labels:
severity: critical
annotations:
summary: "ETL error rate elevated"
description: "{{ $labels.stage }} stage error rate: {{ $value }} errors/sec"
Best Practices
Instrumentation
- ✅ Span every pipeline stage - extract, transform, load, validate
- ✅ Add attributes - dataset names, row counts, file paths
- ✅ Propagate context across async boundaries (threads, processes, network)
- ✅ Record errors in spans with
span.record_exception() - ✅ Sample judiciously - 100% in dev, lower in prod (sampling policy)
Metrics
- ✅ Use counters for events (rows processed, errors)
- ✅ Use histograms for durations (processing time, latency)
- ✅ Use gauges for state (queue size, memory usage)
- ✅ Label dimensions (stage, source, status) but avoid cardinality explosion
- ✅ Export endpoint on separate port (8000) outside app port
Production
- ✅ Centralized logs - send structured logs to ELK/Datadog
- ✅ Correlation IDs - Include trace IDs in log entries
- ✅ Alert on SLA breaches - latency > threshold, error rate > X%
- ✅ Test observability - Simulate failures, verify traces/metrics
- ✅ Document schema - Define metric names and label values in README
References
- OpenTelemetry Python
- Prometheus Python Client
- Grafana Dashboarding
@data-engineering-orchestration- Prefect/Dagster observability features
Weekly Installs
6
Repository
legout/data-pla…t-skillsFirst Seen
Feb 11, 2026
Security Audits
Installed on
pi6
mcpjam4
claude-code4
junie4
windsurf4
zencoder4