data-engineering
SKILL.md
Data Engineering Guide
Data pipelines, warehousing, and modern data stack.
When to Use
- Building data pipelines
- Designing data warehouses
- Implementing ETL/ELT processes
- Setting up data lakes
- Optimizing data infrastructure
Modern Data Stack
Components
Sources → Ingestion → Storage → Transform → Serve → Consume
| Layer | Tools |
|---|---|
| Ingestion | Fivetran, Airbyte, Stitch |
| Storage | S3, GCS, Snowflake, BigQuery |
| Transform | dbt, Spark, Airflow |
| Orchestration | Airflow, Dagster, Prefect |
| Serving | Looker, Tableau, Metabase |
Data Pipeline Patterns
Batch Processing
# Airflow DAG example
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
dag = DAG(
'daily_etl',
schedule_interval='0 6 * * *',
start_date=datetime(2024, 1, 1)
)
def extract():
# Extract from source
pass
def transform():
# Transform data
pass
def load():
# Load to warehouse
pass
extract_task = PythonOperator(
task_id='extract',
python_callable=extract,
dag=dag
)
transform_task = PythonOperator(
task_id='transform',
python_callable=transform,
dag=dag
)
load_task = PythonOperator(
task_id='load',
python_callable=load,
dag=dag
)
extract_task >> transform_task >> load_task
Streaming Processing
# Kafka consumer example
from kafka import KafkaConsumer
import json
consumer = KafkaConsumer(
'events',
bootstrap_servers=['localhost:9092'],
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
for message in consumer:
process_event(message.value)
dbt Patterns
Model Structure
models/
├── staging/ # 1:1 with source
│ ├── stg_orders.sql
│ └── stg_customers.sql
├── intermediate/ # Business logic
│ └── int_order_items.sql
└── marts/ # Final models
├── dim_customers.sql
└── fct_orders.sql
Example Model
-- models/marts/fct_orders.sql
{{
config(
materialized='incremental',
unique_key='order_id'
)
}}
select
o.order_id,
o.customer_id,
o.order_date,
sum(oi.quantity * oi.unit_price) as order_total
from {{ ref('stg_orders') }} o
join {{ ref('stg_order_items') }} oi
on o.order_id = oi.order_id
{% if is_incremental() %}
where o.order_date > (select max(order_date) from {{ this }})
{% endif %}
group by 1, 2, 3
Data Modeling
Dimensional Modeling
Fact Tables (events/transactions)
├── fct_orders
├── fct_page_views
└── fct_transactions
Dimension Tables (context)
├── dim_customers
├── dim_products
├── dim_dates
└── dim_locations
Star Schema
dim_customers
│
dim_dates ── fct_orders ── dim_products
│
dim_locations
Data Quality
Validation Rules
-- dbt tests
models:
- name: fct_orders
columns:
- name: order_id
tests:
- unique
- not_null
- name: order_total
tests:
- not_null
- positive_value
Quality Metrics
| Metric | Description |
|---|---|
| Completeness | % non-null values |
| Uniqueness | % distinct values |
| Timeliness | Data freshness |
| Accuracy | Matches source |
| Consistency | Across systems |
Performance Optimization
Partitioning
-- BigQuery partitioned table
CREATE TABLE orders
PARTITION BY DATE(order_date)
CLUSTER BY customer_id
AS SELECT * FROM staging.orders
Query Optimization
| Technique | Impact |
|---|---|
| Partitioning | Reduce scanned data |
| Clustering | Improve filter speed |
| Materialization | Pre-compute joins |
| Caching | Reduce repeat queries |
Monitoring
Pipeline Metrics
| Metric | Alert Threshold |
|---|---|
| Runtime | >2x normal |
| Row count | ±20% variance |
| Freshness | >SLA |
| Failures | Any failure |
Data Observability
# Monte Carlo / Elementary example
monitors:
- table: fct_orders
tests:
- freshness:
threshold: 6 hours
- volume:
threshold: 10%
- schema_change: true
Best Practices
Pipeline Design
- Idempotent operations
- Incremental processing
- Clear data lineage
- Automated testing
Data Governance
- Document all models
- Track data lineage
- Implement access controls
- Version control SQL
Cost Management
- Monitor query costs
- Use partitioning
- Schedule off-peak
- Archive old data
Weekly Installs
34
Repository
eyadsibai/ltkFirst Seen
Jan 28, 2026
Security Audits
Installed on
gemini-cli28
opencode27
codex26
github-copilot25
kimi-cli21
amp21