skills/eyadsibai/ltk/data-engineering

data-engineering

SKILL.md

Data Engineering Guide

Data pipelines, warehousing, and modern data stack.

When to Use

  • Building data pipelines
  • Designing data warehouses
  • Implementing ETL/ELT processes
  • Setting up data lakes
  • Optimizing data infrastructure

Modern Data Stack

Components

Sources → Ingestion → Storage → Transform → Serve → Consume
Layer Tools
Ingestion Fivetran, Airbyte, Stitch
Storage S3, GCS, Snowflake, BigQuery
Transform dbt, Spark, Airflow
Orchestration Airflow, Dagster, Prefect
Serving Looker, Tableau, Metabase

Data Pipeline Patterns

Batch Processing

# Airflow DAG example
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

dag = DAG(
    'daily_etl',
    schedule_interval='0 6 * * *',
    start_date=datetime(2024, 1, 1)
)

def extract():
    # Extract from source
    pass

def transform():
    # Transform data
    pass

def load():
    # Load to warehouse
    pass

extract_task = PythonOperator(
    task_id='extract',
    python_callable=extract,
    dag=dag
)

transform_task = PythonOperator(
    task_id='transform',
    python_callable=transform,
    dag=dag
)

load_task = PythonOperator(
    task_id='load',
    python_callable=load,
    dag=dag
)

extract_task >> transform_task >> load_task

Streaming Processing

# Kafka consumer example
from kafka import KafkaConsumer
import json

consumer = KafkaConsumer(
    'events',
    bootstrap_servers=['localhost:9092'],
    value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)

for message in consumer:
    process_event(message.value)

dbt Patterns

Model Structure

models/
├── staging/           # 1:1 with source
│   ├── stg_orders.sql
│   └── stg_customers.sql
├── intermediate/      # Business logic
│   └── int_order_items.sql
└── marts/             # Final models
    ├── dim_customers.sql
    └── fct_orders.sql

Example Model

-- models/marts/fct_orders.sql
{{
    config(
        materialized='incremental',
        unique_key='order_id'
    )
}}

select
    o.order_id,
    o.customer_id,
    o.order_date,
    sum(oi.quantity * oi.unit_price) as order_total
from {{ ref('stg_orders') }} o
join {{ ref('stg_order_items') }} oi
    on o.order_id = oi.order_id
{% if is_incremental() %}
where o.order_date > (select max(order_date) from {{ this }})
{% endif %}
group by 1, 2, 3

Data Modeling

Dimensional Modeling

Fact Tables (events/transactions)
├── fct_orders
├── fct_page_views
└── fct_transactions

Dimension Tables (context)
├── dim_customers
├── dim_products
├── dim_dates
└── dim_locations

Star Schema

        dim_customers
dim_dates ── fct_orders ── dim_products
        dim_locations

Data Quality

Validation Rules

-- dbt tests
models:
  - name: fct_orders
    columns:
      - name: order_id
        tests:
          - unique
          - not_null
      - name: order_total
        tests:
          - not_null
          - positive_value

Quality Metrics

Metric Description
Completeness % non-null values
Uniqueness % distinct values
Timeliness Data freshness
Accuracy Matches source
Consistency Across systems

Performance Optimization

Partitioning

-- BigQuery partitioned table
CREATE TABLE orders
PARTITION BY DATE(order_date)
CLUSTER BY customer_id
AS SELECT * FROM staging.orders

Query Optimization

Technique Impact
Partitioning Reduce scanned data
Clustering Improve filter speed
Materialization Pre-compute joins
Caching Reduce repeat queries

Monitoring

Pipeline Metrics

Metric Alert Threshold
Runtime >2x normal
Row count ±20% variance
Freshness >SLA
Failures Any failure

Data Observability

# Monte Carlo / Elementary example
monitors:
  - table: fct_orders
    tests:
      - freshness:
          threshold: 6 hours
      - volume:
          threshold: 10%
      - schema_change: true

Best Practices

Pipeline Design

  • Idempotent operations
  • Incremental processing
  • Clear data lineage
  • Automated testing

Data Governance

  • Document all models
  • Track data lineage
  • Implement access controls
  • Version control SQL

Cost Management

  • Monitor query costs
  • Use partitioning
  • Schedule off-peak
  • Archive old data
Weekly Installs
34
Repository
eyadsibai/ltk
First Seen
Jan 28, 2026
Installed on
gemini-cli28
opencode27
codex26
github-copilot25
kimi-cli21
amp21