skills/dtsong/data-engineering-skills/python-data-engineering

python-data-engineering

SKILL.md

Python Data Engineering Skill

Expert guidance for Python data engineering: DataFrame libraries (Polars, Pandas, PySpark), dbt Python models, API extraction, and data validation. Assumes Python proficiency.

Scope Constraints

  • SQL transforms in dbt: hand off to dbt-transforms
  • DLT pipeline config: hand off to data-integration
  • Kafka/Flink streaming: hand off to event-streaming
  • Dagster/Airflow orchestration: hand off to data-pipelines
  • General Python or web development: out of scope

When to Use

Activate when: choosing between DataFrame libraries, writing Polars/Pandas/PySpark transforms, building dbt Python models, building API extraction scripts, implementing data validation (Pydantic/Pandera/GX), optimizing DataFrame memory, or converting between DataFrame formats.

Model Routing

reasoning_demand preferred acceptable minimum
medium Sonnet Sonnet, Opus Sonnet

Core Principles

1. Type Safety First

Annotate all functions. Data pipelines process untrusted data — types catch errors before production.

from pydantic import BaseModel
from datetime import date
from decimal import Decimal
import polars as pl

class Order(BaseModel):
    order_id: str
    customer_id: str
    amount: Decimal
    order_date: date

def transform_orders(raw: pl.LazyFrame) -> pl.LazyFrame:
    return (
        raw.filter(pl.col("amount") > 0)
        .with_columns(
            pl.col("order_date").str.to_date("%Y-%m-%d"),
            pl.col("amount").cast(pl.Decimal(10, 2)),
        )
        .unique(subset=["order_id"])
    )

2. Immutable Transforms

Never mutate DataFrames in place. Return new DataFrames from every transformation for reproducibility and testability.

3. Lazy Evaluation When Possible

Prefer lazy evaluation (Polars LazyFrame, Spark DataFrame).

4. Memory Efficiency

Use appropriate dtypes (Int32 not Int64 when range allows). Stream/scan instead of loading entire files. Prefer columnar formats (Parquet, Arrow) over CSV/JSON.

5. Test Data Pipelines

Test transforms with small, representative fixtures. Use polars.testing.assert_frame_equal or pandas.testing.assert_frame_equal.


DataFrame Library Decision Matrix

Factor Polars Pandas PySpark DuckDB (Python)
Data size Single machine (GB-TB via streaming) Single machine (MB-GB) Distributed cluster (TB-PB) Single machine (GB-TB)
Speed Very fast (Rust, multi-threaded) Moderate (single-threaded) Fast at scale (distributed) Very fast (vectorized)
Memory Efficient (Arrow-native, streaming) Inefficient (copies, object dtype) Efficient (distributed) Efficient (out-of-core)
API style Expression-based, method chaining Index-based, mixed paradigms SQL-like DataFrame API SQL-first, DataFrame bridge
Lazy eval Yes (LazyFrame) No (eager only) Yes (execution plan) Yes (query plan)
dbt support Via DataFrame return Native (dbt-core) Via dbt-spark Via dbt-duckdb
Best for New projects, performance-critical Legacy code, ML integration Big data, Databricks Analytics, local dev

Polars (Primary)

import polars as pl

orders = (
    pl.scan_parquet("raw/orders/*.parquet")
    .filter(
        (pl.col("status").is_in(["completed", "shipped"]))
        & (pl.col("amount") > 0)
    )
    .with_columns(
        pl.col("order_date").str.to_date("%Y-%m-%d").alias("order_date_parsed"),
        pl.col("amount").cast(pl.Decimal(10, 2)),
        (pl.col("amount") * pl.col("tax_rate")).round(2).alias("tax_amount"),
    )
    .rename({"order_date_parsed": "order_date"})
    .unique(subset=["order_id"])
    .sort("order_date")
    .collect()
)

For aggregations, joins, window functions, streaming, Arrow interop, DuckDB bridge, and performance tuning, see Polars Patterns Reference.

Pandas (Legacy/Compatibility)

Use when integrating with ML libraries or maintaining existing codebases. Prefer method chaining and vectorized operations.

For method chaining, Arrow backend, memory optimization, chunked processing, and anti-patterns, see Pandas Patterns Reference.

PySpark (Distributed)

Use when data exceeds single-machine memory or running on Databricks/Spark infrastructure.

For DataFrame API, Pandas UDFs, Spark Connect, Delta Lake, partitioning, and caching, see PySpark Patterns Reference.


dbt Python Models

Use Python models for transforms difficult in SQL: complex statistics, ML scoring, API calls, complex string parsing, or external library integration. Use SQL for joins, filters, aggregations, window functions, CTEs, and standard ELT.

# models/intermediate/int_customer_rfm.py
def model(dbt, session):
    """RFM scoring — requires Pandas groupby + qcut."""
    dbt.config(materialized="table", packages=["scikit-learn==1.4.0"])
    orders = dbt.ref("stg_orders").to_pandas()
    # ... transform with Pandas/sklearn, return DataFrame
    return result  # dbt writes to warehouse automatically

On Snowflake, session is a Snowpark Session and dbt.ref() returns a Snowpark DataFrame (not Pandas).


API Extraction

Build typed clients with Pydantic models, pagination, retry (tenacity), and rate limiting. Use httpx for sync/async HTTP.

For typed client patterns, pagination (cursor/offset/link-header), rate limiting, async extraction, and complete pipeline examples, see Extraction Patterns Reference.

Data Validation

  • Pydantic for row-level validation (API responses, individual records)
  • Pandera for DataFrame-level validation (column types, constraints)
  • Great Expectations for suite-level validation (warehouse tables, CI gates)
  • dbt tests for model-level assertions in dbt projects

For Pydantic v2 patterns, Pandera schemas, Great Expectations checkpoints, and contract testing, see Data Validation Patterns Reference.


DataFrame Interoperability

Conversion Method Notes
Polars -> Pandas polars_df.to_pandas() Copies data
Pandas -> Polars pl.from_pandas(df) Zero-copy when possible
Polars <-> Arrow to_arrow() / pl.from_arrow() Zero-copy
Polars <-> DuckDB duckdb.sql("SELECT ... FROM df").pl() Zero-copy via Arrow
Spark -> Pandas spark_df.toPandas() Pulls to driver; use limit() for large data
Pandas -> Spark spark.createDataFrame(df) Enable Arrow: spark.sql.execution.arrow.pyspark.enabled=true

Security

Credentials: always os.environ["KEY"], never inline. Document required vars in .env.example. Use connection strings from env vars. Close httpx clients in finally blocks. Never store credentials in notebook cells.

See Security & Compliance Patterns for the full framework including security tiers.


Reference Files

  • Polars Patterns — LazyFrame, expressions, aggregations, joins, windows, streaming, Arrow interop, performance
  • Pandas Patterns — Arrow backend, method chaining, memory optimization, anti-patterns
  • PySpark Patterns — DataFrame API, Pandas UDFs, Spark Connect, Delta Lake, Databricks
  • Data Validation Patterns — Pydantic v2, Pandera, Great Expectations, contract testing
  • Extraction Patterns — httpx clients, async extraction, pagination, rate limiting, retry
Weekly Installs
1
GitHub Stars
1
First Seen
7 days ago
Installed on
zencoder1
amp1
cline1
openclaw1
opencode1
cursor1