flow-coordinator
Flow Coordinator
Lightweight workflow coordinator supporting two workflow formats:
- Legacy Templates: Command chains with slash-command execution
- Unified Workflows: DAG-based PromptTemplate nodes (spec:
spec/unified-workflow-spec.md)
Specification Reference
- Unified Workflow Spec: @spec/unified-workflow-spec.md
- Demo Workflow:
ccw/data/flows/demo-unified-workflow.json
Architecture
User Task → Detect Format → Select Workflow → Init Status → Execute → Complete
│ │
├─ Legacy Template │
│ └─ Sequential cmd execution │
│ │
└─ Unified Workflow │
└─ DAG traversal with contextRefs │
│
└──────────────── Resume (from status.json) ──────────────┘
Execution Modes:
├─ analysis → Read-only, CLI --mode analysis
├─ write → File changes, CLI --mode write
├─ mainprocess → Blocking, synchronous
└─ async → Background, ccw cli
Core Concepts
Dual Format Support:
- Legacy:
templates/*.jsonwithcmd,args,execution - Unified:
ccw/data/flows/*.jsonwithnodes,edges,contextRefs
Unified PromptTemplate Model: All workflow steps are natural language instructions with:
instruction: What to execute (natural language)slashCommand: Optional slash command name (e.g., "workflow:plan")slashArgs: Optional arguments for slash command (supports {{variable}})outputName: Name for output referencecontextRefs: References to previous step outputstool: Optional CLI tool (gemini/qwen/codex/claude)mode: Execution mode (analysis/write/mainprocess/async)
DAG Execution: Unified workflows execute as directed acyclic graphs with parallel branches and conditional edges.
Dynamic Discovery: Both formats discovered at runtime via Glob.
Execution Flow
async function execute(task) {
// 1. Discover and select template
const templates = await discoverTemplates();
const template = await selectTemplate(templates);
// 2. Init status
const sessionId = `fc-${timestamp()}`;
const statusPath = `.workflow/.flow-coordinator/${sessionId}/status.json`;
const status = initStatus(template, task);
write(statusPath, JSON.stringify(status, null, 2));
// 3. Execute steps based on execution config
await executeSteps(status, statusPath);
}
async function executeSteps(status, statusPath) {
for (let i = status.current; i < status.steps.length; i++) {
const step = status.steps[i];
status.current = i;
// Execute based on step mode (all steps use slash-command type)
const execConfig = step.execution || { type: 'slash-command', mode: 'mainprocess' };
if (execConfig.mode === 'async') {
// Async execution - stop and wait for hook callback
await executeSlashCommandAsync(step, status, statusPath);
break;
} else {
// Mainprocess execution - continue immediately
await executeSlashCommandSync(step, status);
step.status = 'done';
write(statusPath, JSON.stringify(status, null, 2));
}
}
// All steps complete
if (status.current >= status.steps.length) {
status.complete = true;
write(statusPath, JSON.stringify(status, null, 2));
}
}
Unified Workflow Execution
For workflows using the unified PromptTemplate format (ccw/data/flows/*.json):
async function executeUnifiedWorkflow(workflow, task) {
// 1. Initialize execution state
const sessionId = `ufc-${timestamp()}`;
const statusPath = `.workflow/.flow-coordinator/${sessionId}/status.json`;
const state = {
id: sessionId,
workflow: workflow.id,
goal: task,
nodeStates: {}, // nodeId -> { status, result, error }
outputs: {}, // outputName -> result
complete: false
};
// 2. Topological sort for execution order
const executionOrder = topologicalSort(workflow.nodes, workflow.edges);
// 3. Execute nodes respecting DAG dependencies
await executeDAG(workflow, executionOrder, state, statusPath);
}
async function executeDAG(workflow, order, state, statusPath) {
for (const nodeId of order) {
const node = workflow.nodes.find(n => n.id === nodeId);
const data = node.data;
// Check if all dependencies are satisfied
if (!areDependenciesSatisfied(nodeId, workflow.edges, state)) {
continue; // Will be executed when dependencies complete
}
// Build instruction from slashCommand or raw instruction
let instruction = buildNodeInstruction(data, state.outputs);
// Execute based on mode
state.nodeStates[nodeId] = { status: 'running' };
write(statusPath, JSON.stringify(state, null, 2));
const result = await executeNode(instruction, data.tool, data.mode);
// Store output for downstream nodes
state.nodeStates[nodeId] = { status: 'completed', result };
if (data.outputName) {
state.outputs[data.outputName] = result;
}
write(statusPath, JSON.stringify(state, null, 2));
}
state.complete = true;
write(statusPath, JSON.stringify(state, null, 2));
}
/**
* Build node instruction from slashCommand or raw instruction
* Handles slashCommand/slashArgs fields from frontend orchestrator
*/
function buildNodeInstruction(data, outputs) {
const refs = data.contextRefs || [];
// If slashCommand is set, construct instruction from it
if (data.slashCommand) {
// Resolve variables in slashArgs
const args = data.slashArgs
? resolveContextRefs(data.slashArgs, refs, outputs)
: '';
// Build slash command instruction
let instruction = `/${data.slashCommand}${args ? ' ' + args : ''}`;
// Append additional instruction if provided
if (data.instruction) {
const additionalInstruction = resolveContextRefs(data.instruction, refs, outputs);
instruction = `${instruction}\n\n${additionalInstruction}`;
}
return instruction;
}
// Fallback: use raw instruction with context refs resolved
return resolveContextRefs(data.instruction || '', refs, outputs);
}
function resolveContextRefs(instruction, refs, outputs) {
let resolved = instruction;
for (const ref of refs) {
const value = outputs[ref];
const placeholder = `{{${ref}}}`;
resolved = resolved.replace(new RegExp(placeholder, 'g'),
typeof value === 'object' ? JSON.stringify(value) : String(value));
}
return resolved;
}
async function executeNode(instruction, tool, mode) {
// Build CLI command based on tool and mode
const cliTool = tool || 'gemini';
const cliMode = mode === 'write' ? 'write' : 'analysis';
if (mode === 'async') {
// Background execution
return Bash(
`ccw cli -p "${escapePrompt(instruction)}" --tool ${cliTool} --mode ${cliMode}`,
{ run_in_background: true }
);
} else {
// Synchronous execution
return Bash(
`ccw cli -p "${escapePrompt(instruction)}" --tool ${cliTool} --mode ${cliMode}`
);
}
}
Unified Workflow Discovery
async function discoverUnifiedWorkflows() {
const files = Glob('*.json', { path: 'ccw/data/flows/' });
const workflows = [];
for (const file of files) {
const content = JSON.parse(Read(file));
// Detect unified format by checking for 'nodes' array
if (content.nodes && Array.isArray(content.nodes)) {
workflows.push({
id: content.id,
name: content.name,
description: content.description,
nodeCount: content.nodes.length,
format: 'unified',
file: file
});
}
}
return workflows;
}
Format Detection
function detectWorkflowFormat(content) {
if (content.nodes && content.edges) {
return 'unified'; // PromptTemplate DAG format
}
if (content.steps && content.steps[0]?.cmd) {
return 'legacy'; // Command chain format
}
throw new Error('Unknown workflow format');
}
Legacy Template Discovery
Dynamic query - never hardcode template list:
async function discoverTemplates() {
// Discover all JSON templates
const files = Glob('*.json', { path: 'templates/' });
// Parse each template
const templates = [];
for (const file of files) {
const content = JSON.parse(Read(file));
templates.push({
name: content.name,
description: content.description,
steps: content.steps.map(s => s.cmd).join(' → '),
file: file
});
}
return templates;
}
Template Selection
User chooses from discovered templates:
async function selectTemplate(templates) {
// Build options from discovered templates
const options = templates.slice(0, 4).map(t => ({
label: t.name,
description: t.steps
}));
const response = await AskUserQuestion({
questions: [{
question: 'Select workflow template:',
header: 'Template',
options: options,
multiSelect: false
}]
});
// Handle "Other" - show remaining templates or custom input
if (response.template === 'Other') {
return await selectFromRemainingTemplates(templates.slice(4));
}
return templates.find(t => t.name === response.template);
}
Status Schema
Creation: Copy template JSON → Update id, template, goal, set all steps status: "pending"
Location: .workflow/.flow-coordinator/{session-id}/status.json
Core Fields:
id: Session ID (fc-YYYYMMDD-HHMMSS)template: Template namegoal: User task descriptioncurrent: Current step indexsteps[]: Step array from template (with runtimestatus,session,taskId)complete: All steps done?
Step Status: pending → running → done | failed | skipped
Extended Template Schema
Templates stored in: templates/*.json (discovered at runtime via Glob)
TemplateStep Fields:
cmd: Full command path (e.g.,/workflow:lite-plan,/workflow:execute)args?: Arguments with{{goal}}and{{prev}}placeholdersunit?: Minimum execution unit name (groups related commands)optional?: Can be skipped by userexecution: Type and mode configurationtype: Always'slash-command'(for all workflow commands)mode:'mainprocess'(blocking) or'async'(background)
contextHint?: Natural language guidance for context assembly
Template Example:
{
"name": "rapid",
"steps": [
{
"cmd": "/workflow:lite-plan",
"args": "\"{{goal}}\"",
"unit": "quick-implementation",
"execution": { "type": "slash-command", "mode": "mainprocess" },
"contextHint": "Create lightweight implementation plan"
},
{
"cmd": "/workflow:lite-execute",
"args": "--in-memory",
"unit": "quick-implementation",
"execution": { "type": "slash-command", "mode": "async" },
"contextHint": "Execute plan from previous step"
}
]
}
Execution Implementation
Mainprocess Mode (Blocking)
async function executeSlashCommandSync(step, status) {
// Build command: /workflow:cmd -y args
const cmd = buildCommand(step, status);
const result = await SlashCommand({ command: cmd });
step.session = result.session_id;
step.status = 'done';
return result;
}
Async Mode (Background)
async function executeSlashCommandAsync(step, status, statusPath) {
// Build prompt: /workflow:cmd -y args + context
const prompt = buildCommandPrompt(step, status);
step.status = 'running';
write(statusPath, JSON.stringify(status, null, 2));
// Execute via ccw cli in background
const taskId = Bash(
`ccw cli -p "${escapePrompt(prompt)}" --tool claude --mode write`,
{ run_in_background: true }
).task_id;
step.taskId = taskId;
write(statusPath, JSON.stringify(status, null, 2));
console.log(`Executing: ${step.cmd} (async)`);
console.log(`Resume: /flow-coordinator --resume ${status.id}`);
}
Prompt Building
Prompts are built in format: /workflow:cmd -y args + context
function buildCommandPrompt(step, status) {
// step.cmd already contains full path: /workflow:lite-plan, /workflow:execute, etc.
let prompt = `${step.cmd} -y`;
// Add arguments (with placeholder replacement)
if (step.args) {
const args = step.args
.replace('{{goal}}', status.goal)
.replace('{{prev}}', getPreviousSessionId(status));
prompt += ` ${args}`;
}
// Add context based on contextHint
if (step.contextHint) {
const context = buildContextFromHint(step.contextHint, status);
prompt += `\n\nContext:\n${context}`;
} else {
// Default context: previous session IDs
const previousContext = collectPreviousResults(status);
if (previousContext) {
prompt += `\n\nPrevious results:\n${previousContext}`;
}
}
return prompt;
}
function buildContextFromHint(hint, status) {
// Parse contextHint instruction and build context accordingly
// Examples:
// "Summarize IMPL_PLAN.md" → read and summarize plan
// "List test coverage gaps" → analyze previous test results
// "Pass session ID" → just return session reference
return parseAndBuildContext(hint, status);
}
Example Prompt Output
/workflow:lite-plan -y "Implement user registration"
Context:
Task: Implement user registration
Previous results:
- None (first step)
/workflow:execute -y --in-memory
Context:
Task: Implement user registration
Previous results:
- lite-plan: WFS-plan-20250130 (planning-context.md)
User Interaction
Step 1: Select Template
Select workflow template:
○ rapid lite-plan → lite-execute → test-cycle-execute
○ coupled plan → plan-verify → execute → review → test
○ bugfix lite-fix → lite-execute → test-cycle-execute
○ tdd tdd-plan → execute → tdd-verify
○ Other (more templates or custom)
Step 2: Review Execution Plan
Template: coupled
Steps:
1. /workflow:plan (slash-command mainprocess)
2. /workflow:plan-verify (slash-command mainprocess)
3. /workflow:execute (slash-command async)
4. /workflow:review-session-cycle (slash-command mainprocess)
5. /workflow:review-cycle-fix (slash-command mainprocess)
6. /workflow:test-fix-gen (slash-command mainprocess)
7. /workflow:test-cycle-execute (slash-command async)
Proceed? [Confirm / Cancel]
Resume Capability
async function resume(sessionId) {
const statusPath = `.workflow/.flow-coordinator/${sessionId}/status.json`;
const status = JSON.parse(Read(statusPath));
// Find first incomplete step
status.current = status.steps.findIndex(s => s.status !== 'done');
if (status.current === -1) {
console.log('All steps complete');
return;
}
// Continue executing steps
await executeSteps(status, statusPath);
}
Available Templates
Templates discovered from templates/*.json:
| Template | Use Case | Steps |
|---|---|---|
| rapid | Simple feature | /workflow:lite-plan → /workflow:lite-execute → /workflow:test-cycle-execute |
| coupled | Complex feature | /workflow:plan → /workflow:plan-verify → /workflow:execute → /workflow:review-session-cycle → /workflow:test-fix-gen |
| bugfix | Bug fix | /workflow:lite-fix → /workflow:lite-execute → /workflow:test-cycle-execute |
| tdd | Test-driven | /workflow:tdd-plan → /workflow:execute → /workflow:tdd-verify |
| test-fix | Fix failing tests | /workflow:test-fix-gen → /workflow:test-cycle-execute |
| brainstorm | Exploration | /workflow:brainstorm-with-file |
| debug | Debug with docs | /workflow:debug-with-file |
| analyze | Collaborative analysis | /workflow:analyze-with-file |
| issue | Issue workflow | /workflow:issue:plan → /workflow:issue:queue → /workflow:issue:execute |
Design Principles
- Minimal fields: Only essential tracking data
- Flat structure: No nested objects beyond steps array
- Step-level execution: Each step defines how it's executed
- Resumable: Any step can be resumed from status
- Human readable: Clear JSON format
Reference Documents
| Document | Purpose |
|---|---|
| spec/unified-workflow-spec.md | Unified PromptTemplate workflow specification |
| ccw/data/flows/*.json | Unified workflows (DAG format, dynamic discovery) |
| templates/*.json | Legacy workflow templates (command chain format) |
Demo Workflows (Unified Format)
| File | Description | Nodes |
|---|---|---|
demo-unified-workflow.json |
Auth implementation | 7 nodes: Analyze → Plan → Implement → Review → Tests → Report |
parallel-ci-workflow.json |
CI/CD pipeline | 8 nodes: Parallel checks → Merge → Conditional notify |
simple-analysis-workflow.json |
Analysis pipeline | 3 nodes: Explore → Analyze → Report |
More from catlog22/claude-code-workflow
skill-generator
Meta-skill for creating new Claude Code skills with configurable execution modes. Supports sequential (fixed order) and autonomous (stateless) phase patterns. Use for skill scaffolding, skill creation, or building new workflows. Triggers on "create skill", "new skill", "skill generator".
127review-code
Multi-dimensional code review with structured reports. Analyzes correctness, readability, performance, security, testing, and architecture. Triggers on "review code", "code review", "审查代码", "代码审查".
102skill-tuning
Universal skill diagnosis and optimization tool. Detect and fix skill execution issues including context explosion, long-tail forgetting, data flow disruption, and agent coordination failures. Supports Gemini CLI for deep analysis. Triggers on "skill tuning", "tune skill", "skill diagnosis", "optimize skill", "skill debug".
71compact
Compact current session memory into structured text for session recovery. Supports custom descriptions and tagging.
71spec-generator
Specification generator - 7 phase document chain producing product brief, PRD, architecture, epics, and issues with Codex review gates. Triggers on generate spec, create specification, spec generator, workflow:spec.
69brainstorm-with-file
Interactive brainstorming with documented thought evolution, multi-perspective analysis, and iterative refinement. Serial execution with no agent delegation.
67