using-flowerpower
FlowerPower Pipeline Framework
🌸 Build configuration-driven data pipelines using Hamilton DAGs. Lightweight, modular, and perfect for batch ETL, data transformation, and ML workflows.
FlowerPower is ideal for:
- Simple to medium complexity data pipelines (not full production orchestration)
- Teams wanting code-first DAG definitions (vs. YAML-heavy Airflow)
- Projects needing configurable parameters and multiple executors
- Rapid prototyping and batch processing
For production orchestration with scheduling, state persistence, and reliability features, see orchestrating-data-pipelines (Prefect, Dagster, dbt).
Skill Boundaries
vs. building-data-pipelines
building-data-pipelines |
@using-flowerpower |
|---|---|
| Raw ETL patterns with Polars/DuckDB/PyArrow | Framework-wrapped ETL with Hamilton DAGs |
| Individual transformation functions | Orchestrated multi-node pipeline definitions |
| Manual pipeline glue code | Configuration-driven parameters and executors |
| General-purpose ETL guidance | FlowerPower-specific setup and best practices |
Use building-data-pipelines for learning ETL patterns, tool selection, and writing raw Polars/DuckDB code. Use @using-flowerpower when you want to structure those operations into reusable, configurable Hamilton DAG pipelines.
vs. orchestrating-data-pipelines
orchestrating-data-pipelines |
@using-flowerpower |
|---|---|
| Production orchestration (Prefect, Dagster, dbt) | Lightweight batch scripts, no infrastructure |
| Scheduling with cron, intervals, sensors | No built-in scheduler (use cron/systemd) |
| State persistence across runs via database | Ephemeral execution, no state tracking |
| Rich observability dashboards, alerts | Basic Hamilton UI only |
| Retries, SLA guarantees, team-scale workflows | Simple batch jobs, single-team pipelines |
Rule of thumb: Start with @using-flowerpower for batch ETL prototypes and lightweight DAGs. Move to orchestrating-data-pipelines when you need scheduling, production reliability, multi-team coordination, or SLA guarantees.
Skill Dependencies
This skill assumes familiarity with:
@building-data-pipelines- Polars, DuckDB, PyArrow basics@designing-data-storage- Delta Lake, Iceberg table formats@accessing-cloud-storage- Cloud storage (S3, GCS) and fsspec@assuring-data-pipelines- Data validation with Pandera, Great Expectations@building-data-pipelines- Medallion architecture, partitioning, incremental loads
When to Use FlowerPower vs. Prefect/Dagster
| Scenario | Recommended Tool | Why |
|---|---|---|
| Simple batch ETL, data transformation scripts | FlowerPower | Lightweight, no infrastructure, Hamilton DAG elegance |
| Production workflows with scheduling, retries, SLA | Prefect | Orchestrator with cloud, robust error handling |
| Asset-based pipelines, data observability | Dagster | Asset lineage, materialization, sensors |
| SQL transformations, dbt ecosystem | dbt | SQL-first, built-in testing, documentation |
| Complex dependency graphs, multi-team | Airflow | Mature, scalable, operator ecosystem |
FlowerPower limitations:
- No built-in scheduler (use cron/systemd)
- No state persistence across restarts
- Limited observability (Hamilton UI is basic)
- Not designed for long-running, fault-tolerant production workflows
Quick Start
# Install
uv pip install flowerpower[io,ui]
# Initialize project
flowerpower init --name my-pipeline-project
# Create pipeline
flowerpower pipeline new bronze_ingestion
# Run
flowerpower pipeline run bronze_ingestion
Advanced Patterns
Medallion Architecture with FlowerPower
Create three pipelines following bronze-silver-gold pattern:
bronze_ingest.py (raw ingestion, append-only)
from hamilton.function_modifiers import parameterize
import polars as pl
from flowerpower.cfg import Config
PARAMS = Config.load(Path(__file__).parents[1], "bronze_ingest").pipeline.h_params
@parameterize(**PARAMS.source)
def source_uri(uri: str) -> str:
"""Source data location (S3, local, etc.)."""
return uri
def bronze_table(source_uri: str) -> pl.LazyFrame:
"""Read raw data as-is, add ingestion metadata."""
df = pl.scan_parquet(source_uri)
return df.with_columns([
pl.lit(pl.datetime.now()).alias("_ingestion_timestamp"),
pl.lit(source_uri).alias("_source_file")
])
def write_bronze(bronze_table: pl.LazyFrame, output_path: str) -> str:
"""Write to Delta Lake bronze layer (partitioned by date)."""
bronze_table.write_delta(
output_path,
mode="append",
partition_by=["_ingestion_date"]
)
return f"Wrote {bronze_table.count()} rows to {output_path}"
silver_clean.py (validation, standardization)
def validate_schema(bronze_table: pl.LazyFrame) -> pl.LazyFrame:
"""Apply schema checks using Pandera."""
import pandera as pa
from pandera.polars import DataFrameSchema, Column
schema = DataFrameSchema({
"order_id": Column(pl.Int64, nullable=False, unique=True),
"customer_id": Column(pl.Int32, nullable=False),
"amount": Column(pl.Float64, pa.Check.ge(0)),
"_ingestion_timestamp": Column(pl.Datetime)
})
# Validate (raises if invalid)
validated_df = schema.validate(bronze_table.collect())
return validated_df.lazy()
def standardize_data(validated: pl.LazyFrame) -> pl.LazyFrame:
"""Standardize dates, currencies, etc."""
return validated.with_columns([
pl.col("order_date").str.strptime(pl.Date, fmt="%Y-%m-%d"),
pl.col("amount").round(2)
])
def write_silver(standardize_data: pl.LazyFrame, silver_path: str) -> str:
"""Write to Silver Delta table, overwrite partition."""
standardize_data.write_delta(
silver_path,
mode="overwrite",
partition_filters=[("ingestion_date", "=", "2024-01-01")]
)
gold_aggregate.py (business-ready aggregates)
def daily_sales(silver_table: pl.LazyFrame) -> pl.DataFrame:
"""Aggregate sales by day, region."""
return silver_table.group_by(["order_date", "region"]).agg([
pl.sum("amount").alias("total_sales"),
pl.count().alias("order_count")
]).collect()
def write_gold(daily_sales: pl.DataFrame, gold_path: str) -> str:
"""Write Gold table (Parquet, no ACID needed)."""
daily_sales.write_parquet(
gold_path,
compression="zstd",
stat_getters=["min", "max"] # For predicate pushdown
)
Delta Lake Integration with Schema Evolution
Use delta_scan() and write_delta() with merge schema:
# conf/pipelines/delta_incremental.yml
params:
delta_table: "s3://lakehouse/silver/orders/"
source_parquet: "s3://raw/orders/"
run:
final_vars:
- write_result
executor:
type: threadpool
max_workers: 4
# pipelines/delta_incremental.py
def source_data(source_parquet: str) -> pl.LazyFrame:
return pl.scan_parquet(source_parquet)
def merge_delta(source_data: pl.LazyFrame, delta_table: str) -> str:
"""Append with schema evolution."""
source_data.write_delta(
delta_table,
mode="append",
delta_write_options={"schema_mode": "merge"} # Auto-add new columns
)
return f"Appended {len(source_data.collect())} rows"
Watermark/Incremental Load Pattern
# Use DuckDB to manage watermarks
def get_last_watermark(con, table_name: str) -> datetime:
"""Query watermark table."""
result = con.execute(f"""
SELECT watermark_value
FROM watermark_table
WHERE table_name = '{table_name}'
""").fetchone()
return result[0] if result else datetime(1970,1,1)
def incremental_load(source: str, target: str, timestamp_col: str = "updated_at"):
"""Load only new/updated records."""
import duckdb
con = duckdb.connect(":memory:")
old_wm = get_last_watermark(con, target)
new_wm = pl.scan_parquet(source).select(pl.max(timestamp_col)).collect()[0,0]
df = pl.scan_parquet(source).filter(
(pl.col(timestamp_col) > old_wm) &
(pl.col(timestamp_col) <= new_wm)
)
df.write_delta(target, mode="append")
# Update watermark
con.execute("""
INSERT OR REPLACE INTO watermark_table (table_name, watermark_value)
VALUES (?, ?)
""", [target, new_wm])
Data Quality Validation (Pandera)
import pandera as pa
from pandera.polars import DataFrameSchema, Column
def validate_silver(silver_df: pl.DataFrame) -> pl.DataFrame:
"""Validate against Pandera schema."""
schema = DataFrameSchema({
"customer_id": Column(pl.Int32, pa.Check.gt(0)),
"email": Column(pl.Utf8, pa.Check.str_contains("@")),
"signup_date": Column(pl.Date, pa.Check(lambda s: s >= "2020-01-01"))
})
try:
validated = schema.validate(silver_df, lazy=True)
return validated
except pa.errors.SchemaErrors as e:
# Log failures, write to quarantine
print(f"Validation failed: {e.failure_cases}")
raise
Cloud Storage (S3) with fsspec
# conf/pipelines/s3_ingest.yml
params:
s3_path: "s3://my-bucket/raw/orders/"
local_cache: "/tmp/cache"
run:
executor:
type: threadpool
max_workers: 8
# pipelines/s3_ingest.py
def list_s3_files(s3_path: str) -> list[str]:
"""List files to process."""
import fsspec
fs = fsspec.filesystem('s3')
return fs.glob(f"{s3_path}*.parquet")
def read_s3_file(file_path: str) -> pl.LazyFrame:
"""Read individual file with fsspec."""
import fsspec
fs = fsspec.filesystem('s3')
with fs.open(file_path, 'rb') as f:
return pl.read_parquet(f)
def process_files(list_s3_files: list[str]) -> pl.LazyFrame:
"""Process all files and union."""
frames = [read_s3_file(f) for f in list_s3_files]
return pl.concat(frames)
Best Practices for FlowerPower
- Use lazy evaluation (Polars LazyFrame) for large datasets
- Set appropriate executor: threadpool for I/O, processpool for CPU
- Add retries in config for external API calls
- Use configuration for all parameters (no hardcoded paths)
- Log strategically - Hamilton captures node outputs
- Implement idempotency - pipelines should be re-runnable
- Monitor with Hamilton UI for DAG visualization
- Use partitioning for Delta Lake tables (by date/tenant)
- Validate at Silver layer with Pandera/Great Expectations
- Handle schema evolution with
schema_mode="merge"for appends
Limitations & Gotchas
- No built-in scheduling - pair with cron/systemd/Prefect
- No state persistence - track watermarks externally (DuckDB)
- No SLA alerts - implement custom
on_failurehooks - Hamilton cache can grow indefinitely - configure
cache: falseor prune - Multi-node execution requires Ray/Dask setup (advanced)
See Also
building-data-pipelines- General ETL patterns, tool selection, and raw Polars/DuckDB codeorchestrating-data-pipelines- Production orchestration (Prefect, Dagster, dbt) when you need scheduling, retries, and SLA guarantees@building-data-pipelines- Medallion architecture, incremental loads, partitioning, file sizing@designing-data-storage- Delta Lake, Iceberg table formats and operations@accessing-cloud-storage- Cloud storage backends (S3, GCS) and libraries@assuring-data-pipelines- Data validation frameworks (Pandera, Great Expectations)@managing-data-catalogs- Data catalog systems for discovery and governance
References
- FlowerPower Documentation
- Hamilton Framework
- FlowerPower GitHub
orchestrating-data-pipelines- Comparison of orchestration tools
More from legout/data-agent-skills
data-engineering-storage-remote-access-integrations-iceberg
Apache Iceberg catalog configuration for cloud storage (S3, GCS, Azure). Covers AWS Glue and REST catalogs, table scanning, and append/overwrite operations.
4data-engineering-storage-remote-access-libraries-pyarrow-fs
Native Arrow filesystem integration with PyArrow. Optimized for Parquet workflows, zero-copy data transfer, predicate pushdown, and column pruning. Covers S3, GCS, HDFS with PyArrow datasets.
4flowerpower
Create and manage data pipelines using the FlowerPower framework with Hamilton DAGs and uv. Lightweight orchestration for batch ETL, data transformation, and ML pipelines. Integrates with Delta Lake, DuckDB, Polars, and cloud storage.
4data-engineering-storage-lakehouse
Lakehouse table formats: Delta Lake, Apache Iceberg, and Apache Hudi for ACID transactions, schema evolution, and time travel on data lakes.
4data-science-feature-engineering
Feature engineering for machine learning: encoding, scaling, transformations, datetime features, text features, and feature selection. Use when preparing data for modeling or improving model performance through better representations.
4data-engineering-storage-authentication
Cloud storage authentication patterns: AWS, GCP, Azure credentials, IAM roles, service principals, secret management, and secure credential handling for data pipelines.
3