data-pipelines
You are building data pipelines. The general pattern is ingest (get data in) → transform (clean, model, join) → query (analyze) → explore (notebooks, apps, visualizations).
The specific tools for each step depend on the project. Preferred defaults:
| Step | Preferred Tool | Alternatives |
|---|---|---|
| Ingest | dlt | Plain Python scripts, shell + curl, custom connectors |
| Transform | sqlmesh | Plain SQL scripts, dbt, Python scripts |
| Query engine | DuckDB / MotherDuck | — |
| DataFrames | polars | — |
| Notebooks | marimo | — |
| Project mgmt | uv | — |
Language Preference
SQL first (DuckDB dialect), then Python, then bash. Use the simplest language that gets the job done.
Project Layout
Data projects should follow this structure:
project/
├── ingest/ # Extraction and loading scripts
│ ├── <source>.py # One file per data source
│ └── .dlt/ # dlt config (if using dlt)
├── transform/ # Transformation logic
│ ├── models/ # SQL models (sqlmesh or plain SQL)
│ └── config.yaml # sqlmesh config (if using sqlmesh)
├── notebooks/ # Exploration and analysis
│ └── *.py # marimo notebooks (plain .py files)
├── data/ # Local data files (gitignored)
├── pyproject.toml # Dependencies
├── uv.lock # Locked dependencies (committed)
└── *.duckdb # Local database (gitignored)
Not every project needs all directories — a simple analysis might only have notebooks/ and a DuckDB file. Scale up as needed.
uv — Project Management
Never use pip directly. All Python work goes through uv.
uv init my-project # New project
uv add polars duckdb # Add dependencies
uv sync # Install into .venv
uv run python script.py # Run in project venv
uv run --with requests script.py # Ad-hoc dependency
Inline script dependencies (PEP 723) for standalone scripts:
# /// script
# dependencies = ["dlt[duckdb]", "polars"]
# requires-python = ">=3.12"
# ///
Run with uv run script.py — deps are resolved automatically.
Always commit uv.lock. Use pyproject.toml for dependency declarations, never requirements.txt.
DuckDB — Query Engine
DuckDB is the shared SQL engine across the entire stack. Use DuckDB-specific syntax freely.
CLI
duckdb # In-memory
duckdb my_data.db # Persistent local
duckdb md:my_db # MotherDuck
duckdb -c "SELECT 42" # One-shot
DuckDB SQL Syntax
Friendly SQL:
FROM my_table; -- Implicit SELECT *
FROM my_table SELECT col1, col2 WHERE col3 > 5; -- FROM-first
SELECT * EXCLUDE (internal_id) FROM events; -- Drop columns
SELECT * REPLACE (amount / 100.0 AS amount) FROM txns; -- Transform in-place
SELECT category, SUM(amount) FROM sales GROUP BY ALL; -- Infer GROUP BY
Read files directly (no import step):
SELECT * FROM 'data.parquet';
SELECT * FROM read_csv('data.csv', header=true);
SELECT * FROM 's3://bucket/path/*.parquet';
COPY (SELECT * FROM events) TO 'output.parquet' (FORMAT PARQUET);
Nested types:
SELECT {'name': 'Alice', 'age': 30} AS person;
SELECT [1, 2, 3] AS nums;
SELECT list_filter([1, 2, 3, 4], x -> x > 2);
Useful commands:
DESCRIBE SELECT * FROM events;
SUMMARIZE events;
MotherDuck
ATTACH 'md:'; -- All databases
ATTACH 'md:my_db'; -- Specific database
Auth via motherduck_token env var. Cross-database queries work: SELECT * FROM local_db.main.t1 JOIN md:cloud_db.main.t2 USING (id).
polars — DataFrames
Use polars when Python logic is needed — complex string transforms, ML features, row-level conditionals. For joins, aggregations, and window functions, prefer SQL.
Key Patterns
import polars as pl
# Lazy evaluation (always prefer for production)
lf = pl.scan_parquet("events/*.parquet")
result = (
lf.filter(pl.col("event_date") >= "2024-01-01")
.group_by("user_id")
.agg(pl.col("amount").sum().alias("total_spend"))
.sort("total_spend", descending=True)
.collect()
)
# Three contexts
df.select(...) # Pick/transform columns (output has ONLY these)
df.with_columns(...) # Add/overwrite columns (keeps all originals)
df.filter(...) # Keep rows matching condition
DuckDB interop (zero-copy via Arrow):
import duckdb
result = duckdb.sql("SELECT * FROM df WHERE amount > 100").pl()
marimo — Notebooks
Reactive Python notebooks stored as plain .py files. Cells auto-re-execute when dependencies change.
marimo edit notebook.py # Create/edit
marimo run notebook.py # Serve as app
marimo convert notebook.ipynb -o out.py # From Jupyter
SQL cells use DuckDB by default and return polars DataFrames:
result = mo.sql(f"""
SELECT * FROM events
WHERE event_date >= '{start_date}'
""")
Python variables and polars DataFrames are queryable from SQL cells and vice versa.
dlt — Ingestion
When a project uses dlt for ingestion. Handles API calls, pagination, schema inference, incremental loading, and state management.
Scaffold and Run
dlt init rest_api duckdb # Scaffold pipeline
uv run python pipeline.py # Run extraction
dlt pipeline <name> info # Inspect state
dlt pipeline <name> schema # View inferred schema
Pipeline Patterns
Minimal pipeline:
import dlt
pipeline = dlt.pipeline(
pipeline_name="my_pipeline",
destination="duckdb",
dataset_name="raw",
)
info = pipeline.run(data, table_name="events")
Incremental loading:
@dlt.resource(write_disposition="merge", primary_key="id")
def users(updated_at=dlt.sources.incremental("updated_at")):
yield from fetch_users(since=updated_at.last_value)
REST API source (declarative):
from dlt.sources.rest_api import rest_api_source
source = rest_api_source({
"client": {"base_url": "https://api.example.com/v1"},
"resource_defaults": {"primary_key": "id", "write_disposition": "merge"},
"resources": [
"users",
{
"name": "events",
"write_disposition": "append",
"endpoint": {
"path": "events",
"incremental": {"cursor_path": "created_at", "initial_value": "2024-01-01"},
},
},
],
})
Write dispositions:
| Disposition | Behavior | Use For |
|---|---|---|
append |
Insert rows (default) | Immutable events, logs |
replace |
Drop and recreate | Small lookup tables |
merge |
Upsert by primary_key |
Mutable records |
Destinations: duckdb (local file), motherduck (cloud). Set motherduck_token env var or configure in .dlt/secrets.toml.
sqlmesh — Transformation
When a project uses sqlmesh for transformations. SQL-first, plan/apply workflow — no accidental production changes.
Scaffold and Run
sqlmesh init duckdb # New project
sqlmesh init -t dlt --dlt-pipeline <name> # From dlt schema
sqlmesh plan # Preview + apply (dev)
sqlmesh plan prod # Promote to production
sqlmesh fetchdf "SELECT * FROM analytics.users" # Ad-hoc query
sqlmesh test # Run unit tests
sqlmesh ui # Web interface
Model Kinds
| Kind | Behavior | Use For |
|---|---|---|
FULL |
Rewrite entire table | Small dimension tables |
INCREMENTAL_BY_TIME_RANGE |
Process new time intervals | Facts, events, logs |
INCREMENTAL_BY_UNIQUE_KEY |
Upsert by key | Mutable dimensions |
SEED |
Static CSV data | Reference/lookup data |
VIEW |
SQL view | Simple pass-throughs |
SCD_TYPE_2 |
Slowly changing dimensions | Historical tracking |
Model Example
MODEL (
name analytics.stg_events,
kind INCREMENTAL_BY_TIME_RANGE (time_column event_date),
cron '@daily',
grain (event_id),
audits (NOT_NULL(columns=[event_id]))
);
SELECT
event_id,
user_id,
event_type,
event_date
FROM raw.events
WHERE event_date BETWEEN @start_date AND @end_date
Config (config.yaml)
gateways:
local:
connection:
type: duckdb
database: db.duckdb
default_gateway: local
model_defaults:
dialect: duckdb
dlt Integration
sqlmesh init -t dlt auto-generates external models and incremental staging models from dlt's inferred schema. Schema changes from dlt are detected by sqlmesh plan.