prefect

SKILL.md

Prefect

Prefect turns Python functions into observable, schedulable workflows with minimal boilerplate. Add @flow and @task decorators to get retries, logging, caching, and a monitoring UI.

Installation

# Install Prefect
pip install prefect

# Start the local Prefect server (UI + API)
prefect server start
# UI at http://localhost:4200

# Or use Prefect Cloud (managed)
prefect cloud login

Basic Flow

# flows/hello.py: Simple flow with tasks
from prefect import flow, task, get_run_logger
from datetime import timedelta

@task(retries=3, retry_delay_seconds=10)
def fetch_data(url: str) -> dict:
    import httpx
    logger = get_run_logger()
    logger.info(f"Fetching {url}")
    response = httpx.get(url)
    response.raise_for_status()
    return response.json()

@task(cache_expiration=timedelta(hours=1))
def transform(data: dict) -> list:
    return [
        {"id": item["id"], "value": item["amount"] * 100}
        for item in data["results"]
    ]

@task
def load(records: list) -> int:
    logger = get_run_logger()
    logger.info(f"Loading {len(records)} records")
    # Insert into database...
    return len(records)

@flow(name="etl-pipeline", log_prints=True)
def etl_pipeline(api_url: str = "https://api.example.com/data"):
    raw = fetch_data(api_url)
    cleaned = transform(raw)
    count = load(cleaned)
    print(f"Processed {count} records")
    return count

if __name__ == "__main__":
    etl_pipeline()

Scheduling and Deployments

# flows/deploy.py: Create a deployment with schedule
from prefect import flow
from prefect.deployments import Deployment
from prefect.server.schemas.schedules import CronSchedule

@flow
def daily_report():
    print("Generating daily report...")

if __name__ == "__main__":
    # Deploy via Python
    daily_report.serve(
        name="daily-report-deployment",
        cron="0 8 * * *",  # Every day at 8 AM
        tags=["reporting"],
        parameters={"param1": "value1"},
    )
# deploy.sh: Deploy and manage via CLI
# Create deployment from flow file
prefect deploy flows/hello.py:etl_pipeline \
  --name etl-prod \
  --pool default-agent-pool \
  --cron "*/30 * * * *"

# Start a worker to execute deployments
prefect worker start --pool default-agent-pool

# Trigger a deployment run
prefect deployment run "etl-pipeline/etl-prod" --param api_url=https://api.example.com

Error Handling and Concurrency

# flows/advanced.py: Concurrent tasks, error handling, and sub-flows
from prefect import flow, task
from prefect.tasks import task_input_hash
import asyncio

@task(
    retries=2,
    retry_delay_seconds=[10, 60],  # Exponential backoff
    cache_key_fn=task_input_hash,
    timeout_seconds=300,
)
def process_item(item_id: int) -> dict:
    # Process a single item
    return {"id": item_id, "status": "done"}

@flow
def batch_process(item_ids: list[int]):
    # Submit tasks concurrently
    futures = [process_item.submit(id) for id in item_ids]
    results = [f.result() for f in futures]

    succeeded = [r for r in results if r["status"] == "done"]
    print(f"Processed {len(succeeded)}/{len(item_ids)} items")

@flow
async def async_pipeline():
    # Async flow for I/O-bound work
    results = await asyncio.gather(
        fetch_from_api("source_a"),
        fetch_from_api("source_b"),
    )
    return results

Blocks and Infrastructure

# flows/blocks.py: Use blocks for reusable configuration
from prefect.blocks.system import Secret, JSON
from prefect_sqlalchemy import SqlAlchemyConnector

# Store secrets (set via UI or CLI)
# prefect block register -m prefect_sqlalchemy
# Then configure in UI at http://localhost:4200/blocks

# Use in flows
@flow
def db_flow():
    api_key = Secret.load("my-api-key").get()
    config = JSON.load("pipeline-config").value

    with SqlAlchemyConnector.load("prod-db") as conn:
        result = conn.fetch_all("SELECT count(*) FROM users")
        print(result)

Notifications

# flows/notifications.py: Send alerts on failure
from prefect import flow
from prefect.blocks.notifications import SlackWebhook

@flow
def monitored_flow():
    try:
        # ... do work
        pass
    except Exception as e:
        slack = SlackWebhook.load("alerts-channel")
        slack.notify(f"❌ Pipeline failed: {e}")
        raise

# Or use automations in Prefect UI:
# Automations → Create → Trigger: Flow run failed → Action: Send Slack notification

CLI Reference

# cli.sh: Common Prefect CLI commands
# Check connection
prefect version
prefect config view

# List flows and deployments
prefect flow-run ls
prefect deployment ls

# View logs
prefect flow-run logs <flow-run-id>

# Manage work pools
prefect work-pool create my-pool --type process
prefect work-pool ls
Weekly Installs
1
GitHub Stars
15
First Seen
3 days ago
Installed on
amp1
cline1
augment1
opencode1
cursor1
kimi-cli1