data-pipelines

SKILL.md

You are building data pipelines. The general pattern is ingest (get data in) → transform (clean, model, join) → query (analyze) → explore (notebooks, apps, visualizations).

The specific tools for each step depend on the project. Preferred defaults:

Step Preferred Tool Alternatives
Ingest dlt Plain Python scripts, shell + curl, custom connectors
Transform sqlmesh Plain SQL scripts, dbt, Python scripts
Query engine DuckDB / MotherDuck
DataFrames polars
Notebooks marimo
Project mgmt uv

Language Preference

SQL first (DuckDB dialect), then Python, then bash. Use the simplest language that gets the job done.

Project Layout

Data projects should follow this structure:

project/
├── ingest/              # Extraction and loading scripts
│   ├── <source>.py      # One file per data source
│   └── .dlt/            # dlt config (if using dlt)
├── transform/           # Transformation logic
│   ├── models/          # SQL models (sqlmesh or plain SQL)
│   └── config.yaml      # sqlmesh config (if using sqlmesh)
├── notebooks/           # Exploration and analysis
│   └── *.py             # marimo notebooks (plain .py files)
├── data/                # Local data files (gitignored)
├── pyproject.toml       # Dependencies
├── uv.lock              # Locked dependencies (committed)
└── *.duckdb             # Local database (gitignored)

Not every project needs all directories — a simple analysis might only have notebooks/ and a DuckDB file. Scale up as needed.

uv — Project Management

Never use pip directly. All Python work goes through uv.

uv init my-project                    # New project
uv add polars duckdb                  # Add dependencies
uv sync                               # Install into .venv
uv run python script.py               # Run in project venv
uv run --with requests script.py      # Ad-hoc dependency

Inline script dependencies (PEP 723) for standalone scripts:

# /// script
# dependencies = ["dlt[duckdb]", "polars"]
# requires-python = ">=3.12"
# ///

Run with uv run script.py — deps are resolved automatically.

Always commit uv.lock. Use pyproject.toml for dependency declarations, never requirements.txt.

DuckDB — Query Engine

DuckDB is the shared SQL engine across the entire stack. Use DuckDB-specific syntax freely.

CLI

duckdb                              # In-memory
duckdb my_data.db                   # Persistent local
duckdb md:my_db                     # MotherDuck
duckdb -c "SELECT 42"              # One-shot

DuckDB SQL Syntax

Friendly SQL:

FROM my_table;                                          -- Implicit SELECT *
FROM my_table SELECT col1, col2 WHERE col3 > 5;        -- FROM-first
SELECT * EXCLUDE (internal_id) FROM events;             -- Drop columns
SELECT * REPLACE (amount / 100.0 AS amount) FROM txns;  -- Transform in-place
SELECT category, SUM(amount) FROM sales GROUP BY ALL;    -- Infer GROUP BY

Read files directly (no import step):

SELECT * FROM 'data.parquet';
SELECT * FROM read_csv('data.csv', header=true);
SELECT * FROM 's3://bucket/path/*.parquet';
COPY (SELECT * FROM events) TO 'output.parquet' (FORMAT PARQUET);

Nested types:

SELECT {'name': 'Alice', 'age': 30} AS person;
SELECT [1, 2, 3] AS nums;
SELECT list_filter([1, 2, 3, 4], x -> x > 2);

Useful commands:

DESCRIBE SELECT * FROM events;
SUMMARIZE events;

MotherDuck

ATTACH 'md:';              -- All databases
ATTACH 'md:my_db';         -- Specific database

Auth via motherduck_token env var. Cross-database queries work: SELECT * FROM local_db.main.t1 JOIN md:cloud_db.main.t2 USING (id).

polars — DataFrames

Use polars when Python logic is needed — complex string transforms, ML features, row-level conditionals. For joins, aggregations, and window functions, prefer SQL.

Key Patterns

import polars as pl

# Lazy evaluation (always prefer for production)
lf = pl.scan_parquet("events/*.parquet")
result = (
    lf.filter(pl.col("event_date") >= "2024-01-01")
    .group_by("user_id")
    .agg(pl.col("amount").sum().alias("total_spend"))
    .sort("total_spend", descending=True)
    .collect()
)

# Three contexts
df.select(...)         # Pick/transform columns (output has ONLY these)
df.with_columns(...)   # Add/overwrite columns (keeps all originals)
df.filter(...)         # Keep rows matching condition

DuckDB interop (zero-copy via Arrow):

import duckdb
result = duckdb.sql("SELECT * FROM df WHERE amount > 100").pl()

marimo — Notebooks

Reactive Python notebooks stored as plain .py files. Cells auto-re-execute when dependencies change.

marimo edit notebook.py              # Create/edit
marimo run notebook.py               # Serve as app
marimo convert notebook.ipynb -o out.py  # From Jupyter

SQL cells use DuckDB by default and return polars DataFrames:

result = mo.sql(f"""
    SELECT * FROM events
    WHERE event_date >= '{start_date}'
""")

Python variables and polars DataFrames are queryable from SQL cells and vice versa.

dlt — Ingestion

When a project uses dlt for ingestion. Handles API calls, pagination, schema inference, incremental loading, and state management.

Scaffold and Run

dlt init rest_api duckdb             # Scaffold pipeline
uv run python pipeline.py           # Run extraction
dlt pipeline <name> info             # Inspect state
dlt pipeline <name> schema           # View inferred schema

Pipeline Patterns

Minimal pipeline:

import dlt

pipeline = dlt.pipeline(
    pipeline_name="my_pipeline",
    destination="duckdb",
    dataset_name="raw",
)
info = pipeline.run(data, table_name="events")

Incremental loading:

@dlt.resource(write_disposition="merge", primary_key="id")
def users(updated_at=dlt.sources.incremental("updated_at")):
    yield from fetch_users(since=updated_at.last_value)

REST API source (declarative):

from dlt.sources.rest_api import rest_api_source

source = rest_api_source({
    "client": {"base_url": "https://api.example.com/v1"},
    "resource_defaults": {"primary_key": "id", "write_disposition": "merge"},
    "resources": [
        "users",
        {
            "name": "events",
            "write_disposition": "append",
            "endpoint": {
                "path": "events",
                "incremental": {"cursor_path": "created_at", "initial_value": "2024-01-01"},
            },
        },
    ],
})

Write dispositions:

Disposition Behavior Use For
append Insert rows (default) Immutable events, logs
replace Drop and recreate Small lookup tables
merge Upsert by primary_key Mutable records

Destinations: duckdb (local file), motherduck (cloud). Set motherduck_token env var or configure in .dlt/secrets.toml.

sqlmesh — Transformation

When a project uses sqlmesh for transformations. SQL-first, plan/apply workflow — no accidental production changes.

Scaffold and Run

sqlmesh init duckdb                              # New project
sqlmesh init -t dlt --dlt-pipeline <name>        # From dlt schema
sqlmesh plan                                     # Preview + apply (dev)
sqlmesh plan prod                                # Promote to production
sqlmesh fetchdf "SELECT * FROM analytics.users"  # Ad-hoc query
sqlmesh test                                     # Run unit tests
sqlmesh ui                                       # Web interface

Model Kinds

Kind Behavior Use For
FULL Rewrite entire table Small dimension tables
INCREMENTAL_BY_TIME_RANGE Process new time intervals Facts, events, logs
INCREMENTAL_BY_UNIQUE_KEY Upsert by key Mutable dimensions
SEED Static CSV data Reference/lookup data
VIEW SQL view Simple pass-throughs
SCD_TYPE_2 Slowly changing dimensions Historical tracking

Model Example

MODEL (
    name analytics.stg_events,
    kind INCREMENTAL_BY_TIME_RANGE (time_column event_date),
    cron '@daily',
    grain (event_id),
    audits (NOT_NULL(columns=[event_id]))
);

SELECT
    event_id,
    user_id,
    event_type,
    event_date
FROM raw.events
WHERE event_date BETWEEN @start_date AND @end_date

Config (config.yaml)

gateways:
  local:
    connection:
      type: duckdb
      database: db.duckdb
default_gateway: local
model_defaults:
  dialect: duckdb

dlt Integration

sqlmesh init -t dlt auto-generates external models and incremental staging models from dlt's inferred schema. Schema changes from dlt are detected by sqlmesh plan.

Weekly Installs
49
First Seen
Feb 18, 2026
Installed on
opencode49
github-copilot49
codex49
gemini-cli48
amp48
kimi-cli48