flowing

Installation
SKILL.md

Flowing — DAG Workflow Runner

Batch independent operations into one python3 invocation. Declare steps, wire dependencies, run once.

Quick Start

from flowing import task, Flow

@task
def fetch_data():
    return {"items": [1, 2, 3]}

@task(depends_on=[fetch_data])
def process(fetch_data):
    return sum(fetch_data["items"])

@task(depends_on=[process])
def store(process):
    print(f"Result: {process}")

Flow(store).run()

Core API

@task decorator

@task(
    depends_on=[other_task],  # DAG edges
    retry=2,                  # retry count (0 = no retry)
    retry_backoff_base_ms=1000,
    retry_max_backoff_ms=30_000,
    timeout_s=60.0,
    detached=True,            # non-blocking side-effect
    name="custom_name",       # override function name
)
def my_step(other_task):      # param name = dependency task name
    return result

Flow class

flow = Flow(terminal_task, max_workers=5, fail_fast=True)
results = flow.run()          # execute full DAG
flow.summary()                # human-readable status
flow.value(some_task)         # get succeeded task's return value

Resume from failure

When a step fails mid-pipeline, fix the issue and continue without re-running succeeded steps:

flow = Flow(terminal)
results = flow.run()                    # step_3 fails
flow.override(step_3, corrected_value)  # inject fix
results = flow.resume()                 # step_1, step_2 cached; step_4+ runs
  • flow.resume(): Resets FAILED/SKIPPED tasks, keeps SUCCEEDED results cached
  • flow.override(task_def, value): Manually inject a succeeded result

Detached tasks (non-blocking side-effects)

@task(depends_on=[create_issue], detached=True)
def store_memory(create_issue):
    remember(create_issue["url"], ...)
  • Run in a final layer after the main DAG completes
  • Failures collected in flow.detached_failures, never trigger fail_fast
  • Dependencies must all be SUCCEEDED (same as normal tasks)

When to use

  • 3+ independent operations (recall, SQL, web search) that can parallelize
  • Multi-step pipelines where late failures shouldn't waste early work
  • Side-effects (memory storage, notifications) that shouldn't block the critical path

When NOT to use

  • Next step depends on reasoning about prior result (use a think loop)
  • Single sequential operation
  • Async/distributed workflows (this is single-container, ThreadPoolExecutor)
Related skills

More from oaustegard/claude-skills

Installs
3
GitHub Stars
119
First Seen
Mar 29, 2026