etl-designer
ETL Designer
Design robust ETL/ELT pipelines for data processing.
Quick Start
Use Airflow for orchestration, implement idempotent operations, add error handling, monitor pipeline health.
Instructions
Airflow DAG Structure
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-team',
'retries': 3,
'retry_delay': timedelta(minutes=5),
'email_on_failure': True,
'email': ['alerts@company.com']
}
with DAG(
'etl_pipeline',
default_args=default_args,
schedule_interval='0 2 * * *', # Daily at 2 AM
start_date=datetime(2024, 1, 1),
catchup=False
) as dag:
extract = PythonOperator(
task_id='extract_data',
python_callable=extract_from_source
)
transform = PythonOperator(
task_id='transform_data',
python_callable=transform_data
)
load = PythonOperator(
task_id='load_to_warehouse',
python_callable=load_to_warehouse
)
extract >> transform >> load
Incremental Processing
def extract_incremental(last_run_date):
query = f"""
SELECT * FROM source_table
WHERE updated_at > '{last_run_date}'
"""
return pd.read_sql(query, conn)
Error Handling
def safe_transform(data):
try:
transformed = transform_data(data)
return transformed
except Exception as e:
logger.error(f"Transform failed: {e}")
send_alert(f"Pipeline failed: {e}")
raise
Best Practices
- Make operations idempotent
- Use incremental processing
- Implement proper error handling
- Add monitoring and alerts
- Use data quality checks
- Document pipeline logic
More from armanzeroeight/fastagent-plugins
gcp-cost-optimizer
Analyzes GCP costs and provides optimization recommendations including committed use discounts, rightsizing, and unused resources. Use when optimizing GCP spending or analyzing GCP costs.
15kubernetes-best-practices
Provides production-ready Kubernetes manifest guidance including resource management, security, high availability, and configuration best practices. This skill should be used when working with Kubernetes YAML files, deployments, pods, services, or when users mention k8s, container orchestration, or cloud-native applications.
11schema-designer
Design database schemas with proper normalization, relationships, constraints, and indexes. Use when creating database tables, modeling data relationships, or designing database structure.
11api-documentation-generator
Generate OpenAPI/Swagger specifications and API documentation from code or design. Use when creating API docs, generating OpenAPI specs, or documenting REST APIs.
9goroutine-patterns
Implement Go concurrency patterns using goroutines, channels, and synchronization primitives. Use when building concurrent systems, implementing parallelism, or managing goroutine lifecycles. Trigger words include "goroutine", "channel", "concurrent", "parallel", "sync", "context".
9inventory-manager
Organizes Ansible inventory files, manages host groups, and configures dynamic inventory. Use when organizing Ansible inventory, managing host groups, or setting up dynamic inventory sources.
9