flowing
Flowing — DAG Workflow Runner
Batch independent operations into one python3 invocation. Declare steps, wire dependencies, run once.
Quick Start
from flowing import task, Flow
@task
def fetch_data():
return {"items": [1, 2, 3]}
@task(depends_on=[fetch_data])
def process(fetch_data):
return sum(fetch_data["items"])
@task(depends_on=[process])
def store(process):
print(f"Result: {process}")
Flow(store).run()
Core API
@task decorator
@task(
depends_on=[other_task], # DAG edges
retry=2, # retry count (0 = no retry)
retry_backoff_base_ms=1000,
retry_max_backoff_ms=30_000,
timeout_s=60.0,
detached=True, # non-blocking side-effect
name="custom_name", # override function name
)
def my_step(other_task): # param name = dependency task name
return result
Flow class
flow = Flow(terminal_task, max_workers=5, fail_fast=True)
results = flow.run() # execute full DAG
flow.summary() # human-readable status
flow.value(some_task) # get succeeded task's return value
Resume from failure
When a step fails mid-pipeline, fix the issue and continue without re-running succeeded steps:
flow = Flow(terminal)
results = flow.run() # step_3 fails
flow.override(step_3, corrected_value) # inject fix
results = flow.resume() # step_1, step_2 cached; step_4+ runs
flow.resume(): Resets FAILED/SKIPPED tasks, keeps SUCCEEDED results cachedflow.override(task_def, value): Manually inject a succeeded result
Detached tasks (non-blocking side-effects)
@task(depends_on=[create_issue], detached=True)
def store_memory(create_issue):
remember(create_issue["url"], ...)
- Run in a final layer after the main DAG completes
- Failures collected in
flow.detached_failures, never triggerfail_fast - Dependencies must all be SUCCEEDED (same as normal tasks)
When to use
- 3+ independent operations (recall, SQL, web search) that can parallelize
- Multi-step pipelines where late failures shouldn't waste early work
- Side-effects (memory storage, notifications) that shouldn't block the critical path
When NOT to use
- Next step depends on reasoning about prior result (use a think loop)
- Single sequential operation
- Async/distributed workflows (this is single-container, ThreadPoolExecutor)
More from oaustegard/claude-skills
developing-preact
Specialized Preact development skill for standards-based web applications with native-first architecture and minimal dependency footprint. Use when building Preact projects, particularly those involving data visualization, interactive applications, single-page apps with HTM syntax, Web Components integration, CSV/JSON data parsing, WebGL shader visualizations, or zero-build solutions with vendored ESM imports.
104reviewing-ai-papers
Analyze AI/ML technical content (papers, articles, blog posts) and extract actionable insights filtered through enterprise AI engineering lens. Use when user provides URL/document for AI/ML content analysis, asks to "review this paper", or mentions technical content in domains like RAG, embeddings, fine-tuning, prompt engineering, LLM deployment.
79exploring-codebases
>-
63mapping-codebases
Generate navigable code maps for unfamiliar codebases. Extracts exports/imports via AST (tree-sitter) to create _MAP.md files per directory showing classes, functions, methods with signatures and line numbers. Use when exploring repositories, understanding project structure, analyzing unfamiliar code, or before modifications. Triggers on "map this codebase", "explore repo", "understand structure", "what does this project contain", or when starting work on an unfamiliar repository.
48accessing-github-repos
GitHub repository access in containerized environments using REST API and credential detection. Use when git clone fails, or when accessing private repos/writing files via API.
43asking-questions
Guidance for asking clarifying questions when user requests are ambiguous, have multiple valid approaches, or require critical decisions. Use when implementation choices exist that could significantly affect outcomes.
40