using-flowerpower
FlowerPower Pipeline Framework
🌸 Build configuration-driven data pipelines using Hamilton DAGs. Lightweight, modular, and perfect for batch ETL, data transformation, and ML workflows.
FlowerPower is ideal for:
- Simple to medium complexity data pipelines (not full production orchestration)
- Teams wanting code-first DAG definitions (vs. YAML-heavy Airflow)
- Projects needing configurable parameters and multiple executors
- Rapid prototyping and batch processing
For production orchestration with scheduling, state persistence, and reliability features, see orchestrating-data-pipelines (Prefect, Dagster, dbt).
Skill Boundaries
vs. building-data-pipelines
building-data-pipelines |
@using-flowerpower |
|---|---|
| Raw ETL patterns with Polars/DuckDB/PyArrow | Framework-wrapped ETL with Hamilton DAGs |
| Individual transformation functions | Orchestrated multi-node pipeline definitions |
| Manual pipeline glue code | Configuration-driven parameters and executors |
| General-purpose ETL guidance | FlowerPower-specific setup and best practices |
Use building-data-pipelines for learning ETL patterns, tool selection, and writing raw Polars/DuckDB code. Use @using-flowerpower when you want to structure those operations into reusable, configurable Hamilton DAG pipelines.
vs. orchestrating-data-pipelines
orchestrating-data-pipelines |
@using-flowerpower |
|---|---|
| Production orchestration (Prefect, Dagster, dbt) | Lightweight batch scripts, no infrastructure |
| Scheduling with cron, intervals, sensors | No built-in scheduler (use cron/systemd) |
| State persistence across runs via database | Ephemeral execution, no state tracking |
| Rich observability dashboards, alerts | Basic Hamilton UI only |
| Retries, SLA guarantees, team-scale workflows | Simple batch jobs, single-team pipelines |
Rule of thumb: Start with @using-flowerpower for batch ETL prototypes and lightweight DAGs. Move to orchestrating-data-pipelines when you need scheduling, production reliability, multi-team coordination, or SLA guarantees.
Skill Dependencies
This skill assumes familiarity with:
@building-data-pipelines- Polars, DuckDB, PyArrow basics@designing-data-storage- Delta Lake, Iceberg table formats@accessing-cloud-storage- Cloud storage (S3, GCS) and fsspec@assuring-data-pipelines- Data validation with Pandera, Great Expectations@building-data-pipelines- Medallion architecture, partitioning, incremental loads
When to Use FlowerPower vs. Prefect/Dagster
| Scenario | Recommended Tool | Why |
|---|---|---|
| Simple batch ETL, data transformation scripts | FlowerPower | Lightweight, no infrastructure, Hamilton DAG elegance |
| Production workflows with scheduling, retries, SLA | Prefect | Orchestrator with cloud, robust error handling |
| Asset-based pipelines, data observability | Dagster | Asset lineage, materialization, sensors |
| SQL transformations, dbt ecosystem | dbt | SQL-first, built-in testing, documentation |
| Complex dependency graphs, multi-team | Airflow | Mature, scalable, operator ecosystem |
FlowerPower limitations:
- No built-in scheduler (use cron/systemd)
- No state persistence across restarts
- Limited observability (Hamilton UI is basic)
- Not designed for long-running, fault-tolerant production workflows
Quick Start
# Install
uv pip install flowerpower[io,ui]
# Initialize project
flowerpower init --name my-pipeline-project
# Create pipeline
flowerpower pipeline new bronze_ingestion
# Run
flowerpower pipeline run bronze_ingestion
Advanced Patterns
Medallion Architecture with FlowerPower
Create three pipelines following bronze-silver-gold pattern:
bronze_ingest.py (raw ingestion, append-only)
from hamilton.function_modifiers import parameterize
import polars as pl
from flowerpower.cfg import Config
PARAMS = Config.load(Path(__file__).parents[1], "bronze_ingest").pipeline.h_params
@parameterize(**PARAMS.source)
def source_uri(uri: str) -> str:
"""Source data location (S3, local, etc.)."""
return uri
def bronze_table(source_uri: str) -> pl.LazyFrame:
"""Read raw data as-is, add ingestion metadata."""
df = pl.scan_parquet(source_uri)
return df.with_columns([
pl.lit(pl.datetime.now()).alias("_ingestion_timestamp"),
pl.lit(source_uri).alias("_source_file")
])
def write_bronze(bronze_table: pl.LazyFrame, output_path: str) -> str:
"""Write to Delta Lake bronze layer (partitioned by date)."""
bronze_table.write_delta(
output_path,
mode="append",
partition_by=["_ingestion_date"]
)
return f"Wrote {bronze_table.count()} rows to {output_path}"
silver_clean.py (validation, standardization)
def validate_schema(bronze_table: pl.LazyFrame) -> pl.LazyFrame:
"""Apply schema checks using Pandera."""
import pandera as pa
from pandera.polars import DataFrameSchema, Column
schema = DataFrameSchema({
"order_id": Column(pl.Int64, nullable=False, unique=True),
"customer_id": Column(pl.Int32, nullable=False),
"amount": Column(pl.Float64, pa.Check.ge(0)),
"_ingestion_timestamp": Column(pl.Datetime)
})
# Validate (raises if invalid)
validated_df = schema.validate(bronze_table.collect())
return validated_df.lazy()
def standardize_data(validated: pl.LazyFrame) -> pl.LazyFrame:
"""Standardize dates, currencies, etc."""
return validated.with_columns([
pl.col("order_date").str.strptime(pl.Date, fmt="%Y-%m-%d"),
pl.col("amount").round(2)
])
def write_silver(standardize_data: pl.LazyFrame, silver_path: str) -> str:
"""Write to Silver Delta table, overwrite partition."""
standardize_data.write_delta(
silver_path,
mode="overwrite",
partition_filters=[("ingestion_date", "=", "2024-01-01")]
)
gold_aggregate.py (business-ready aggregates)
def daily_sales(silver_table: pl.LazyFrame) -> pl.DataFrame:
"""Aggregate sales by day, region."""
return silver_table.group_by(["order_date", "region"]).agg([
pl.sum("amount").alias("total_sales"),
pl.count().alias("order_count")
]).collect()
def write_gold(daily_sales: pl.DataFrame, gold_path: str) -> str:
"""Write Gold table (Parquet, no ACID needed)."""
daily_sales.write_parquet(
gold_path,
compression="zstd",
stat_getters=["min", "max"] # For predicate pushdown
)
Delta Lake Integration with Schema Evolution
Use delta_scan() and write_delta() with merge schema:
# conf/pipelines/delta_incremental.yml
params:
delta_table: "s3://lakehouse/silver/orders/"
source_parquet: "s3://raw/orders/"
run:
final_vars:
- write_result
executor:
type: threadpool
max_workers: 4
# pipelines/delta_incremental.py
def source_data(source_parquet: str) -> pl.LazyFrame:
return pl.scan_parquet(source_parquet)
def merge_delta(source_data: pl.LazyFrame, delta_table: str) -> str:
"""Append with schema evolution."""
source_data.write_delta(
delta_table,
mode="append",
delta_write_options={"schema_mode": "merge"} # Auto-add new columns
)
return f"Appended {len(source_data.collect())} rows"
Watermark/Incremental Load Pattern
# Use DuckDB to manage watermarks
def get_last_watermark(con, table_name: str) -> datetime:
"""Query watermark table."""
result = con.execute(f"""
SELECT watermark_value
FROM watermark_table
WHERE table_name = '{table_name}'
""").fetchone()
return result[0] if result else datetime(1970,1,1)
def incremental_load(source: str, target: str, timestamp_col: str = "updated_at"):
"""Load only new/updated records."""
import duckdb
con = duckdb.connect(":memory:")
old_wm = get_last_watermark(con, target)
new_wm = pl.scan_parquet(source).select(pl.max(timestamp_col)).collect()[0,0]
df = pl.scan_parquet(source).filter(
(pl.col(timestamp_col) > old_wm) &
(pl.col(timestamp_col) <= new_wm)
)
df.write_delta(target, mode="append")
# Update watermark
con.execute("""
INSERT OR REPLACE INTO watermark_table (table_name, watermark_value)
VALUES (?, ?)
""", [target, new_wm])
Data Quality Validation (Pandera)
import pandera as pa
from pandera.polars import DataFrameSchema, Column
def validate_silver(silver_df: pl.DataFrame) -> pl.DataFrame:
"""Validate against Pandera schema."""
schema = DataFrameSchema({
"customer_id": Column(pl.Int32, pa.Check.gt(0)),
"email": Column(pl.Utf8, pa.Check.str_contains("@")),
"signup_date": Column(pl.Date, pa.Check(lambda s: s >= "2020-01-01"))
})
try:
validated = schema.validate(silver_df, lazy=True)
return validated
except pa.errors.SchemaErrors as e:
# Log failures, write to quarantine
print(f"Validation failed: {e.failure_cases}")
raise
Cloud Storage (S3) with fsspec
# conf/pipelines/s3_ingest.yml
params:
s3_path: "s3://my-bucket/raw/orders/"
local_cache: "/tmp/cache"
run:
executor:
type: threadpool
max_workers: 8
# pipelines/s3_ingest.py
def list_s3_files(s3_path: str) -> list[str]:
"""List files to process."""
import fsspec
fs = fsspec.filesystem('s3')
return fs.glob(f"{s3_path}*.parquet")
def read_s3_file(file_path: str) -> pl.LazyFrame:
"""Read individual file with fsspec."""
import fsspec
fs = fsspec.filesystem('s3')
with fs.open(file_path, 'rb') as f:
return pl.read_parquet(f)
def process_files(list_s3_files: list[str]) -> pl.LazyFrame:
"""Process all files and union."""
frames = [read_s3_file(f) for f in list_s3_files]
return pl.concat(frames)
Best Practices for FlowerPower
- Use lazy evaluation (Polars LazyFrame) for large datasets
- Set appropriate executor: threadpool for I/O, processpool for CPU
- Add retries in config for external API calls
- Use configuration for all parameters (no hardcoded paths)
- Log strategically - Hamilton captures node outputs
- Implement idempotency - pipelines should be re-runnable
- Monitor with Hamilton UI for DAG visualization
- Use partitioning for Delta Lake tables (by date/tenant)
- Validate at Silver layer with Pandera/Great Expectations
- Handle schema evolution with
schema_mode="merge"for appends
Limitations & Gotchas
- No built-in scheduling - pair with cron/systemd/Prefect
- No state persistence - track watermarks externally (DuckDB)
- No SLA alerts - implement custom
on_failurehooks - Hamilton cache can grow indefinitely - configure
cache: falseor prune - Multi-node execution requires Ray/Dask setup (advanced)
See Also
building-data-pipelines- General ETL patterns, tool selection, and raw Polars/DuckDB codeorchestrating-data-pipelines- Production orchestration (Prefect, Dagster, dbt) when you need scheduling, retries, and SLA guarantees@building-data-pipelines- Medallion architecture, incremental loads, partitioning, file sizing@designing-data-storage- Delta Lake, Iceberg table formats and operations@accessing-cloud-storage- Cloud storage backends (S3, GCS) and libraries@assuring-data-pipelines- Data validation frameworks (Pandera, Great Expectations)@managing-data-catalogs- Data catalog systems for discovery and governance
References
- FlowerPower Documentation
- Hamilton Framework
- FlowerPower GitHub
orchestrating-data-pipelines- Comparison of orchestration tools