skills/borghei/claude-skills/agent-workflow-designer

agent-workflow-designer

SKILL.md

Agent Workflow Designer

Tier: POWERFUL Category: Engineering / AI Systems Maintainer: Claude Skills Team

Overview

Design production-grade multi-agent orchestration systems from requirements. Covers five core patterns (sequential pipeline, parallel fan-out/fan-in, hierarchical delegation, event-driven reactor, consensus validation), agent routing strategies, structured handoff protocols, persistent state management, error recovery with circuit breakers, context window budgeting, and cost optimization. Includes framework-specific implementations for LangGraph, CrewAI, AutoGen, and native Claude Code agent teams.

Keywords

agent workflow, multi-agent orchestration, workflow DAG, agent routing, fan-out fan-in, hierarchical delegation, handoff protocol, state management, agent pipeline, LangGraph, CrewAI, AutoGen, context budgeting

Core Capabilities

1. Pattern Selection and Design

  • Sequential pipelines with typed handoffs
  • Parallel fan-out/fan-in with merge strategies
  • Hierarchical delegation with dynamic subtask discovery
  • Event-driven reactors with pub/sub agent triggers
  • Consensus validation with voting and arbitration

2. Agent Routing

  • Intent-based routing with classifier agents
  • Skill-based routing using capability matching
  • Cost-aware routing (cheap models for simple tasks)
  • Load-balanced routing across agent pools
  • Fallback chains with graceful degradation

3. State and Context Management

  • Persistent workflow state across agent hops
  • Context window budgeting and summarization
  • Checkpoint/resume for long-running workflows
  • Conflict resolution for parallel state updates

4. Reliability Engineering

  • Circuit breakers for failing agents
  • Retry with exponential backoff and model fallback
  • Dead letter queues for unprocessable tasks
  • Timeout enforcement at every agent boundary
  • Idempotent operations for safe retries

When to Use

  • Building multi-step AI pipelines that exceed one agent's capability
  • Parallelizing research, analysis, or generation tasks
  • Creating specialist agent teams with defined roles and contracts
  • Designing fault-tolerant AI workflows for production deployment
  • Optimizing cost across workflows with mixed model tiers

Pattern Selection Decision Tree

What does the workflow look like?
├─ Linear: step A feeds step B feeds step C
│  └─ SEQUENTIAL PIPELINE
│     Best for: content pipelines, code review chains, data transformation
├─ Parallel: N independent tasks, then combine
│  └─ FAN-OUT / FAN-IN
│     Best for: competitive research, multi-source analysis, parallel code gen
├─ Tree: orchestrator breaks work into subtasks dynamically
│  └─ HIERARCHICAL DELEGATION
│     Best for: complex projects, open-ended research, code generation with planning
├─ Reactive: agents respond to events/triggers
│  └─ EVENT-DRIVEN REACTOR
│     Best for: monitoring, alerting, continuous integration, chat workflows
└─ Verification: multiple agents must agree on output
   └─ CONSENSUS VALIDATION
      Best for: high-stakes decisions, code review, fact checking, safety-critical output

Pattern 1: Sequential Pipeline

Each stage transforms input and passes structured output to the next. Type-safe handoffs prevent data loss between stages.

LangGraph Implementation

from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
from langchain_anthropic import ChatAnthropic

class PipelineState(TypedDict):
    topic: str
    research: str
    draft: str
    final: str
    stage_costs: Annotated[list[dict], "append"]  # accumulates cost per stage

def research_stage(state: PipelineState) -> dict:
    model = ChatAnthropic(model="claude-sonnet-4-20250514", max_tokens=2048)
    result = model.invoke(
        f"Research the following topic thoroughly. Provide key facts, statistics, "
        f"and expert perspectives:\n\n{state['topic']}"
    )
    return {
        "research": result.content,
        "stage_costs": [{"stage": "research", "tokens": result.usage_metadata["total_tokens"]}],
    }

def writing_stage(state: PipelineState) -> dict:
    model = ChatAnthropic(model="claude-sonnet-4-20250514", max_tokens=4096)
    result = model.invoke(
        f"Using this research, write a compelling 800-word blog post with a hook, "
        f"3 main sections, and a CTA:\n\n{state['research']}"
    )
    return {
        "draft": result.content,
        "stage_costs": [{"stage": "writing", "tokens": result.usage_metadata["total_tokens"]}],
    }

def editing_stage(state: PipelineState) -> dict:
    model = ChatAnthropic(model="claude-haiku-4-20250514", max_tokens=4096)
    result = model.invoke(
        f"Edit this draft for clarity, flow, and grammar. Return only the improved "
        f"version:\n\n{state['draft']}"
    )
    return {
        "final": result.content,
        "stage_costs": [{"stage": "editing", "tokens": result.usage_metadata["total_tokens"]}],
    }

# Build the graph
graph = StateGraph(PipelineState)
graph.add_node("research", research_stage)
graph.add_node("write", writing_stage)
graph.add_node("edit", editing_stage)
graph.add_edge("research", "write")
graph.add_edge("write", "edit")
graph.add_edge("edit", END)
graph.set_entry_point("research")

pipeline = graph.compile()

# Execute
result = pipeline.invoke({"topic": "The future of AI agents in enterprise software"})
print(f"Total cost: {sum(s['tokens'] for s in result['stage_costs'])} tokens")

Pattern 2: Parallel Fan-Out / Fan-In

Independent tasks run concurrently. A merge function combines results.

import asyncio
from dataclasses import dataclass

@dataclass
class FanOutTask:
    name: str
    system_prompt: str
    user_message: str
    model: str = "claude-sonnet-4-20250514"

@dataclass
class FanOutResult:
    task_name: str
    output: str
    tokens_used: int
    success: bool
    error: str | None = None

async def fan_out_fan_in(
    tasks: list[FanOutTask],
    merge_prompt: str,
    max_concurrent: int = 5,
    timeout_seconds: float = 60.0,
) -> dict:
    """Execute tasks in parallel with concurrency limit and timeout."""
    import anthropic

    client = anthropic.AsyncAnthropic()
    semaphore = asyncio.Semaphore(max_concurrent)

    async def run_one(task: FanOutTask) -> FanOutResult:
        async with semaphore:
            try:
                response = await asyncio.wait_for(
                    client.messages.create(
                        model=task.model,
                        max_tokens=2048,
                        system=task.system_prompt,
                        messages=[{"role": "user", "content": task.user_message}],
                    ),
                    timeout=timeout_seconds,
                )
                return FanOutResult(
                    task_name=task.name,
                    output=response.content[0].text,
                    tokens_used=response.usage.input_tokens + response.usage.output_tokens,
                    success=True,
                )
            except Exception as e:
                return FanOutResult(
                    task_name=task.name, output="", tokens_used=0,
                    success=False, error=str(e),
                )

    # FAN-OUT: run all tasks concurrently
    results = await asyncio.gather(*[run_one(t) for t in tasks])
    successful = [r for r in results if r.success]
    failed = [r for r in results if not r.success]

    if not successful:
        raise RuntimeError(f"All {len(tasks)} fan-out tasks failed: {[f.error for f in failed]}")

    # FAN-IN: merge results
    combined = "\n\n---\n\n".join(
        f"## {r.task_name}\n{r.output}" for r in successful
    )

    merge_response = await client.messages.create(
        model="claude-sonnet-4-20250514",
        max_tokens=4096,
        system="Synthesize the following parallel analyses into a unified report.",
        messages=[{"role": "user", "content": f"{merge_prompt}\n\n{combined}"}],
    )

    return {
        "synthesis": merge_response.content[0].text,
        "individual_results": successful,
        "failures": failed,
        "total_tokens": sum(r.tokens_used for r in results) + merge_response.usage.input_tokens + merge_response.usage.output_tokens,
    }

Pattern 3: Hierarchical Delegation

An orchestrator agent dynamically decomposes work and delegates to specialists.

from typing import Literal

SPECIALISTS = {
    "researcher": "Find accurate information with sources. Be thorough and cite evidence.",
    "coder": "Write clean, tested code. Include error handling and type hints.",
    "writer": "Create clear, engaging content. Match the requested tone and format.",
    "analyst": "Analyze data and produce evidence-backed conclusions with visualizations.",
    "reviewer": "Review work product for quality, accuracy, and completeness.",
}

@dataclass
class SubTask:
    id: str
    agent: Literal["researcher", "coder", "writer", "analyst", "reviewer"]
    task: str
    depends_on: list[str]
    priority: int = 0  # higher = run first when deps are equal

class HierarchicalOrchestrator:
    def __init__(self, client):
        self.client = client

    async def plan(self, request: str) -> list[SubTask]:
        """Orchestrator creates an execution plan with dependencies."""
        response = await self.client.messages.create(
            model="claude-sonnet-4-20250514",
            max_tokens=2048,
            system=f"""You are a task orchestrator. Break down the request into subtasks.
Available specialists: {', '.join(SPECIALISTS.keys())}
Respond with JSON: {{"subtasks": [{{"id": "1", "agent": "researcher", "task": "...", "depends_on": []}}]}}
Rules:
- Minimize the number of subtasks (prefer fewer, more substantial tasks)
- Only add dependencies when output is genuinely needed
- Independent tasks should have empty depends_on for parallel execution""",
            messages=[{"role": "user", "content": request}],
        )
        import json
        plan = json.loads(response.content[0].text)
        return [SubTask(**st) for st in plan["subtasks"]]

    async def execute(self, request: str) -> str:
        """Plan, execute with dependency resolution, and synthesize."""
        subtasks = await self.plan(request)
        results = {}

        # Execute in dependency order, parallelize where possible
        for batch in self._batch_by_dependencies(subtasks):
            batch_results = await asyncio.gather(*[
                self._run_specialist(st, results) for st in batch
            ])
            for st, result in zip(batch, batch_results):
                results[st.id] = result

        # Final synthesis
        all_outputs = "\n\n".join(f"### {k}\n{v}" for k, v in results.items())
        synthesis = await self.client.messages.create(
            model="claude-sonnet-4-20250514",
            max_tokens=4096,
            system="Synthesize specialist outputs into a coherent final response.",
            messages=[{"role": "user", "content": f"Request: {request}\n\nOutputs:\n{all_outputs}"}],
        )
        return synthesis.content[0].text

    def _batch_by_dependencies(self, subtasks: list[SubTask]) -> list[list[SubTask]]:
        """Group subtasks into batches that can run in parallel."""
        completed = set()
        remaining = list(subtasks)
        batches = []
        while remaining:
            batch = [t for t in remaining if all(d in completed for d in t.depends_on)]
            if not batch:
                raise ValueError("Circular dependency detected in subtask plan")
            batches.append(sorted(batch, key=lambda t: -t.priority))
            completed.update(t.id for t in batch)
            remaining = [t for t in remaining if t.id not in completed]
        return batches

Pattern 4: Event-Driven Reactor

Agents react to events from a message bus. Decoupled and scalable.

from collections import defaultdict
from typing import Callable, Any

class AgentEventBus:
    """Simple event bus for agent-to-agent communication."""

    def __init__(self):
        self._handlers: dict[str, list[Callable]] = defaultdict(list)
        self._history: list[dict] = []

    def subscribe(self, event_type: str, handler: Callable):
        self._handlers[event_type].append(handler)

    async def publish(self, event_type: str, payload: Any, source: str):
        event = {"type": event_type, "payload": payload, "source": source}
        self._history.append(event)
        handlers = self._handlers.get(event_type, [])
        results = await asyncio.gather(
            *[h(event) for h in handlers],
            return_exceptions=True,
        )
        errors = [(h, r) for h, r in zip(handlers, results) if isinstance(r, Exception)]
        if errors:
            for handler, error in errors:
                print(f"Handler {handler.__name__} failed: {error}")
        return results

# Usage: code review pipeline triggered by PR events
bus = AgentEventBus()

async def on_pr_opened(event):
    """Security agent scans PR for vulnerabilities."""
    diff = event["payload"]["diff"]
    # ... scan and publish results
    await bus.publish("security_scan_complete", {"findings": findings}, "security-agent")

async def on_security_complete(event):
    """Review agent incorporates security findings into review."""
    # ... generate review with security context

bus.subscribe("pr_opened", on_pr_opened)
bus.subscribe("security_scan_complete", on_security_complete)

Pattern 5: Consensus Validation

Multiple agents independently evaluate the same input. A quorum determines the final output.

@dataclass
class Vote:
    agent: str
    verdict: str  # "approve" | "reject" | "revise"
    confidence: float  # 0.0 - 1.0
    reasoning: str

async def consensus_validate(
    content: str,
    validators: list[dict],  # [{"name": "...", "system": "..."}]
    quorum: float = 0.66,
    confidence_threshold: float = 0.7,
) -> dict:
    """Run content through multiple validators and determine consensus."""
    votes: list[Vote] = []

    # Collect independent votes (no agent sees another's vote)
    vote_tasks = []
    for v in validators:
        vote_tasks.append(get_agent_vote(v["name"], v["system"], content))
    raw_votes = await asyncio.gather(*vote_tasks)
    votes = [v for v in raw_votes if v is not None]

    # Calculate consensus
    approvals = [v for v in votes if v.verdict == "approve"]
    approval_rate = len(approvals) / len(votes) if votes else 0
    avg_confidence = sum(v.confidence for v in votes) / len(votes) if votes else 0

    if approval_rate >= quorum and avg_confidence >= confidence_threshold:
        return {"decision": "approved", "approval_rate": approval_rate, "votes": votes}
    elif any(v.verdict == "reject" for v in votes):
        rejections = [v for v in votes if v.verdict == "reject"]
        return {"decision": "rejected", "reasons": [r.reasoning for r in rejections], "votes": votes}
    else:
        return {"decision": "needs_revision", "feedback": [v.reasoning for v in votes], "votes": votes}

Agent Routing Strategies

Intent-Based Router

class IntentRouter:
    """Route requests to specialized agents based on intent classification."""

    ROUTING_TABLE = {
        "code_generation": {"agent": "coder", "model": "claude-sonnet-4-20250514"},
        "code_review": {"agent": "reviewer", "model": "claude-sonnet-4-20250514"},
        "research": {"agent": "researcher", "model": "claude-sonnet-4-20250514"},
        "simple_question": {"agent": "assistant", "model": "claude-haiku-4-20250514"},
        "creative_writing": {"agent": "writer", "model": "claude-sonnet-4-20250514"},
        "complex_analysis": {"agent": "analyst", "model": "claude-sonnet-4-20250514"},
    }

    async def route(self, message: str) -> dict:
        # Use a fast, cheap model for classification
        classification = await self.client.messages.create(
            model="claude-haiku-4-20250514",
            max_tokens=50,
            system="Classify the user intent. Respond with ONLY one of: code_generation, code_review, research, simple_question, creative_writing, complex_analysis",
            messages=[{"role": "user", "content": message}],
        )
        intent = classification.content[0].text.strip().lower()
        return self.ROUTING_TABLE.get(intent, self.ROUTING_TABLE["simple_question"])

Context Window Budgeting

MODEL_LIMITS = {
    "claude-sonnet-4-20250514": 200_000,
    "claude-haiku-4-20250514": 200_000,
    "claude-opus-4-20250514": 200_000,
    "gpt-4o": 128_000,
}

class ContextBudget:
    def __init__(self, model: str, pipeline_stages: int, reserve_pct: float = 0.15):
        self.total = MODEL_LIMITS.get(model, 128_000)
        self.reserve = int(self.total * reserve_pct)
        self.per_stage = (self.total - self.reserve) // pipeline_stages
        self.used = 0

    def allocate(self, stage: str) -> int:
        available = self.total - self.reserve - self.used
        allocation = min(self.per_stage, int(available * 0.6))
        return max(allocation, 1000)  # minimum 1000 tokens per stage

    def consume(self, tokens: int):
        self.used += tokens

    def summarize_if_needed(self, text: str, budget: int) -> str:
        estimated_tokens = len(text) // 4
        if estimated_tokens <= budget:
            return text
        # Truncate to budget with marker
        char_limit = budget * 4
        return text[:char_limit] + "\n\n[Content truncated to fit context budget]"

Cost Optimization Matrix

Strategy Cost Reduction Quality Impact When to Use
Haiku for routing/classification 85-90% Minimal Always for intent routing
Haiku for editing/formatting 60-70% Low Mechanical tasks
Sonnet for most stages Baseline Baseline Default choice
Opus only for final synthesis +50% on that stage Higher quality High-stakes output
Prompt caching (system prompts) 50-90% per call None Repeated system prompts
Truncate intermediate outputs 20-40% May lose detail Long pipelines
Parallel + early termination 30-50% None if threshold met Search/validation tasks
Batch similar requests Up to 50% Increased latency Non-real-time workloads

Reliability Patterns

Circuit Breaker

import time

class CircuitBreaker:
    """Prevent cascading failures when an agent/model is down."""

    def __init__(self, failure_threshold: int = 5, recovery_time: float = 60.0):
        self.failure_threshold = failure_threshold
        self.recovery_time = recovery_time
        self.failures = 0
        self.state = "closed"  # closed = healthy, open = failing, half-open = testing
        self.last_failure_time = 0.0

    def can_execute(self) -> bool:
        if self.state == "closed":
            return True
        if self.state == "open":
            if time.time() - self.last_failure_time > self.recovery_time:
                self.state = "half-open"
                return True
            return False
        return True  # half-open: allow one test request

    def record_success(self):
        self.failures = 0
        self.state = "closed"

    def record_failure(self):
        self.failures += 1
        self.last_failure_time = time.time()
        if self.failures >= self.failure_threshold:
            self.state = "open"

Common Pitfalls

  • Over-orchestration — if a single prompt can handle it, adding agents adds cost and latency, not value
  • Circular dependencies in subtask graphs causing infinite loops; always validate DAG structure before execution
  • Context bleed — passing entire previous outputs to every stage; summarize or extract only what is needed
  • No timeout enforcement — a stuck agent blocks the entire pipeline; set wall-clock timeouts at every boundary
  • Silent failures — agent returns plausible but incorrect output; add validation stages for critical paths
  • Ignoring cost — 10 parallel Opus calls is expensive; model selection is a cost decision, not just a quality one
  • Stateless retries on stateful operations — ensure idempotency before enabling automatic retries
  • Single point of failure in orchestrator — if the orchestrator agent fails, the entire workflow fails

Best Practices

  1. Start with a single prompt — only add agents when you prove one cannot handle the task
  2. Type your handoffs — use dataclasses or TypedDicts for inter-agent data, not raw strings
  3. Budget context upfront — calculate token allocations before running the pipeline
  4. Use cheap models for routing — Haiku for classification costs 10x less than Sonnet
  5. Validate DAG structure at build time, not runtime
  6. Log every agent call with input hash, output hash, tokens, latency, and cost
  7. Set SLAs per stage — if research takes >30s, timeout and use cached results
  8. Test with production-scale inputs — a pipeline that works on 100 words may fail on 10,000
Weekly Installs
10
GitHub Stars
38
First Seen
6 days ago
Installed on
opencode10
gemini-cli10
amp10
cline10
github-copilot10
codex10