skills/borghei/claude-skills/senior-data-engineer

senior-data-engineer

Installation
SKILL.md

Senior Data Engineer

The agent generates pipeline configurations (Airflow, Prefect, Dagster), validates data quality with profiling and anomaly detection, and optimizes SQL/Spark performance with actionable recommendations.


Quick Start

# Generate an Airflow DAG for incremental PostgreSQL -> Snowflake
python scripts/pipeline_orchestrator.py generate \
  --type airflow --source postgres --destination snowflake \
  --tables orders,customers --mode incremental --schedule "0 5 * * *"

# Validate data quality against a schema
python scripts/data_quality_validator.py validate data.csv \
  --schema schema.json --detect-anomalies --json

# Profile a dataset
python scripts/data_quality_validator.py profile data.csv --json

# Optimize a slow SQL query
python scripts/etl_performance_optimizer.py analyze-sql query.sql \
  --warehouse snowflake --json

# Estimate query cost
python scripts/etl_performance_optimizer.py estimate-cost query.sql \
  --warehouse bigquery --stats data_stats.json --json

Tools Overview

Tool Subcommands Purpose
pipeline_orchestrator.py generate, validate, template Generate Airflow/Prefect/Dagster pipeline code, validate DAGs
data_quality_validator.py validate, profile, generate-suite, contract, schema Schema validation, profiling, anomaly detection, Great Expectations
etl_performance_optimizer.py analyze-sql, analyze-spark, optimize-partition, estimate-cost, template SQL/Spark optimization, partition strategy, cost estimation

All subcommands support --json for machine-readable output and --output for file writing.


Workflow 1: Batch ETL Pipeline (PostgreSQL -> dbt -> Snowflake)

Step 1 -- Generate extraction config.

python scripts/pipeline_orchestrator.py generate \
  --type airflow --source postgres --tables orders,customers,products \
  --mode incremental --watermark updated_at --output dags/extract_source.py

Step 2 -- Create dbt staging model.

-- models/staging/stg_orders.sql
WITH source AS (
    SELECT * FROM {{ source('postgres', 'orders') }}
)
SELECT order_id, customer_id, order_date, total_amount, status, _extracted_at
FROM source
WHERE order_date >= DATEADD(day, -3, CURRENT_DATE)

Step 3 -- Create incremental mart model.

-- models/marts/fct_orders.sql
{{ config(materialized='incremental', unique_key='order_id', cluster_by=['order_date']) }}

SELECT o.order_id, o.customer_id, c.customer_segment, o.order_date, o.total_amount, o.status
FROM {{ ref('stg_orders') }} o
LEFT JOIN {{ ref('dim_customers') }} c ON o.customer_id = c.customer_id
{% if is_incremental() %}
WHERE o._extracted_at > (SELECT MAX(_extracted_at) FROM {{ this }})
{% endif %}

Step 4 -- Wire into Airflow DAG.

with DAG('daily_etl', schedule_interval='0 5 * * *', catchup=False, tags=['etl']) as dag:
    extract = BashOperator(task_id='extract', bash_command='python scripts/extract.py --date {{ ds }}')
    transform = BashOperator(task_id='dbt_run', bash_command='dbt run --select marts.*')
    test = BashOperator(task_id='dbt_test', bash_command='dbt test --select marts.*')
    extract >> transform >> test

Step 5 -- Validate.

python scripts/data_quality_validator.py validate --table fct_orders --checks all --output report.json

Validation checkpoint: DAG runs end-to-end. Data quality report shows 0 failures on uniqueness, completeness, and freshness.


Workflow 2: Real-Time Streaming (Kafka -> Spark -> Delta Lake)

Step 1 -- Define event schema and Kafka topic.

kafka-topics.sh --create --bootstrap-server localhost:9092 \
  --topic user-events --partitions 12 --replication-factor 3 \
  --config retention.ms=604800000

Step 2 -- Implement Spark Structured Streaming.

events_df = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "user-events") \
    .option("startingOffsets", "latest").load()

parsed_df = events_df.select(from_json(col("value").cast("string"), schema).alias("data")).select("data.*")

aggregated_df = parsed_df \
    .withWatermark("event_timestamp", "10 minutes") \
    .groupBy(window(col("event_timestamp"), "5 minutes"), col("event_type")) \
    .agg(count("*").alias("event_count"), approx_count_distinct("user_id").alias("unique_users"))

aggregated_df.writeStream.format("delta").outputMode("append") \
    .option("checkpointLocation", "/checkpoints/user-events") \
    .trigger(processingTime="1 minute").start()

Step 3 -- Handle errors with dead letter queue.

def process_with_dlq(batch_df, batch_id):
    valid_df = batch_df.filter(col("event_id").isNotNull())
    invalid_df = batch_df.filter(col("event_id").isNull())
    valid_df.write.format("delta").mode("append").save("/data/lake/user_events")
    if invalid_df.count() > 0:
        invalid_df.withColumn("error_reason", lit("missing_event_id")) \
            .write.format("delta").mode("append").save("/data/lake/dlq/user_events")

Validation checkpoint: Consumer lag stays under threshold. DLQ table has < 0.1% of total events.


Workflow 3: Data Quality Framework

Step 1 -- Generate a Great Expectations suite from data.

python scripts/data_quality_validator.py generate-suite data.csv --output expectations.json

Step 2 -- Validate against a data contract.

# contracts/orders_contract.yaml
contract:
  name: orders_data_contract
  version: "1.0.0"
schema:
  properties:
    order_id: { type: string, format: uuid }
    total_amount: { type: decimal, minimum: 0 }
    status: { type: string, enum: [pending, confirmed, shipped, delivered, cancelled] }
sla:
  freshness: { max_delay_hours: 1 }
  completeness: { min_percentage: 99.9 }
  accuracy: { duplicate_tolerance: 0.01 }
python scripts/data_quality_validator.py contract data.csv --contract orders_contract.yaml --json

Step 3 -- Add dbt tests for ongoing validation.

models:
  - name: fct_orders
    columns:
      - name: order_id
        tests: [unique, not_null]
      - name: total_amount
        tests:
          - not_null
          - dbt_utils.accepted_range: { min_value: 0, max_value: 1000000 }

Validation checkpoint: Quality score >= 95%. Zero duplicates. Freshness under SLA threshold.


Architecture Decision Framework

Question Batch Streaming
Latency requirement Hours to days Seconds to minutes
Processing complexity Complex transforms, ML Simple aggregations
Cost sensitivity More cost-effective Higher infra cost
Error handling Easy reprocessing Requires careful DLQ design

Decision tree:

Real-time insight needed?
  Yes -> Exactly-once needed?
    Yes -> Kafka + Flink/Spark Structured Streaming
    No  -> Kafka + consumer groups
  No  -> Daily volume > 1TB?
    Yes -> Spark/Databricks
    No  -> dbt + warehouse compute
Feature Warehouse (Snowflake/BigQuery) Lakehouse (Delta/Iceberg)
Best for BI, SQL analytics ML, unstructured data
Storage cost Higher (proprietary) Lower (open formats)
Flexibility Schema-on-write Schema-on-read

Anti-Patterns

  1. Full table reload on every run -- use incremental loads with watermark columns.
  2. No dead letter queue -- failed records silently dropped. Always route failures to a DLQ.
  3. Timezone mismatch -- normalize all timestamps to UTC at extraction.
  4. Missing freshness checks -- add dbt source freshness before transforms start.
  5. Skipping schema drift detection -- use mergeSchema option or data contracts to catch new columns.

Troubleshooting

Problem Cause Solution
Pipeline silently produces zero rows Timezone mismatch on watermark column Normalize to UTC; add row-count assertion
Spark shuffle 10x slower than expected Data skew on join key Salt the key or broadcast the smaller table
Airflow shows "no tasks to run" Circular dependency or import error airflow dags list-import-errors; fix import
dbt succeeds but dashboards stale Source freshness not checked Add dbt source freshness as prerequisite task
Kafka consumer lag grows unbounded Throughput < producer rate Increase partitions, scale consumers, batch max.poll.records
Quality validator false-positive anomalies Z-score threshold too tight Raise threshold or switch to IQR mode

References

Guide Path
Pipeline Architecture references/data_pipeline_architecture.md
Data Modeling Patterns references/data_modeling_patterns.md
DataOps Best Practices references/dataops_best_practices.md

Integration Points

Skill Integration
senior-data-scientist Feature engineering consumes curated mart data
senior-ml-engineer ML pipelines depend on feature store tables
senior-devops CI/CD for dbt, Airflow deployment, container orchestration
senior-architect Architecture reviews for lakehouse vs warehouse decisions
code-reviewer Pipeline code reviews for DAGs, dbt models, Spark jobs

Last Updated: April 2026 Version: 1.1.0

Weekly Installs
137
GitHub Stars
103
First Seen
3 days ago