agent-workflow-designer
Agent Workflow Designer
Tier: POWERFUL Category: Engineering / AI Systems Maintainer: Claude Skills Team
Overview
Design production-grade multi-agent orchestration systems from requirements. Covers five core patterns (sequential pipeline, parallel fan-out/fan-in, hierarchical delegation, event-driven reactor, consensus validation), agent routing strategies, structured handoff protocols, persistent state management, error recovery with circuit breakers, context window budgeting, and cost optimization. Includes framework-specific implementations for LangGraph, CrewAI, AutoGen, and native Claude Code agent teams.
Keywords
agent workflow, multi-agent orchestration, workflow DAG, agent routing, fan-out fan-in, hierarchical delegation, handoff protocol, state management, agent pipeline, LangGraph, CrewAI, AutoGen, context budgeting
Core Capabilities
1. Pattern Selection and Design
- Sequential pipelines with typed handoffs
- Parallel fan-out/fan-in with merge strategies
- Hierarchical delegation with dynamic subtask discovery
- Event-driven reactors with pub/sub agent triggers
- Consensus validation with voting and arbitration
2. Agent Routing
- Intent-based routing with classifier agents
- Skill-based routing using capability matching
- Cost-aware routing (cheap models for simple tasks)
- Load-balanced routing across agent pools
- Fallback chains with graceful degradation
3. State and Context Management
- Persistent workflow state across agent hops
- Context window budgeting and summarization
- Checkpoint/resume for long-running workflows
- Conflict resolution for parallel state updates
4. Reliability Engineering
- Circuit breakers for failing agents
- Retry with exponential backoff and model fallback
- Dead letter queues for unprocessable tasks
- Timeout enforcement at every agent boundary
- Idempotent operations for safe retries
When to Use
- Building multi-step AI pipelines that exceed one agent's capability
- Parallelizing research, analysis, or generation tasks
- Creating specialist agent teams with defined roles and contracts
- Designing fault-tolerant AI workflows for production deployment
- Optimizing cost across workflows with mixed model tiers
Pattern Selection Decision Tree
What does the workflow look like?
│
├─ Linear: step A feeds step B feeds step C
│ └─ SEQUENTIAL PIPELINE
│ Best for: content pipelines, code review chains, data transformation
│
├─ Parallel: N independent tasks, then combine
│ └─ FAN-OUT / FAN-IN
│ Best for: competitive research, multi-source analysis, parallel code gen
│
├─ Tree: orchestrator breaks work into subtasks dynamically
│ └─ HIERARCHICAL DELEGATION
│ Best for: complex projects, open-ended research, code generation with planning
│
├─ Reactive: agents respond to events/triggers
│ └─ EVENT-DRIVEN REACTOR
│ Best for: monitoring, alerting, continuous integration, chat workflows
│
└─ Verification: multiple agents must agree on output
└─ CONSENSUS VALIDATION
Best for: high-stakes decisions, code review, fact checking, safety-critical output
Pattern 1: Sequential Pipeline
Each stage transforms input and passes structured output to the next. Type-safe handoffs prevent data loss between stages.
LangGraph Implementation
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
from langchain_anthropic import ChatAnthropic
class PipelineState(TypedDict):
topic: str
research: str
draft: str
final: str
stage_costs: Annotated[list[dict], "append"] # accumulates cost per stage
def research_stage(state: PipelineState) -> dict:
model = ChatAnthropic(model="claude-sonnet-4-20250514", max_tokens=2048)
result = model.invoke(
f"Research the following topic thoroughly. Provide key facts, statistics, "
f"and expert perspectives:\n\n{state['topic']}"
)
return {
"research": result.content,
"stage_costs": [{"stage": "research", "tokens": result.usage_metadata["total_tokens"]}],
}
def writing_stage(state: PipelineState) -> dict:
model = ChatAnthropic(model="claude-sonnet-4-20250514", max_tokens=4096)
result = model.invoke(
f"Using this research, write a compelling 800-word blog post with a hook, "
f"3 main sections, and a CTA:\n\n{state['research']}"
)
return {
"draft": result.content,
"stage_costs": [{"stage": "writing", "tokens": result.usage_metadata["total_tokens"]}],
}
def editing_stage(state: PipelineState) -> dict:
model = ChatAnthropic(model="claude-haiku-4-20250514", max_tokens=4096)
result = model.invoke(
f"Edit this draft for clarity, flow, and grammar. Return only the improved "
f"version:\n\n{state['draft']}"
)
return {
"final": result.content,
"stage_costs": [{"stage": "editing", "tokens": result.usage_metadata["total_tokens"]}],
}
# Build the graph
graph = StateGraph(PipelineState)
graph.add_node("research", research_stage)
graph.add_node("write", writing_stage)
graph.add_node("edit", editing_stage)
graph.add_edge("research", "write")
graph.add_edge("write", "edit")
graph.add_edge("edit", END)
graph.set_entry_point("research")
pipeline = graph.compile()
# Execute
result = pipeline.invoke({"topic": "The future of AI agents in enterprise software"})
print(f"Total cost: {sum(s['tokens'] for s in result['stage_costs'])} tokens")
Pattern 2: Parallel Fan-Out / Fan-In
Independent tasks run concurrently. A merge function combines results.
import asyncio
from dataclasses import dataclass
@dataclass
class FanOutTask:
name: str
system_prompt: str
user_message: str
model: str = "claude-sonnet-4-20250514"
@dataclass
class FanOutResult:
task_name: str
output: str
tokens_used: int
success: bool
error: str | None = None
async def fan_out_fan_in(
tasks: list[FanOutTask],
merge_prompt: str,
max_concurrent: int = 5,
timeout_seconds: float = 60.0,
) -> dict:
"""Execute tasks in parallel with concurrency limit and timeout."""
import anthropic
client = anthropic.AsyncAnthropic()
semaphore = asyncio.Semaphore(max_concurrent)
async def run_one(task: FanOutTask) -> FanOutResult:
async with semaphore:
try:
response = await asyncio.wait_for(
client.messages.create(
model=task.model,
max_tokens=2048,
system=task.system_prompt,
messages=[{"role": "user", "content": task.user_message}],
),
timeout=timeout_seconds,
)
return FanOutResult(
task_name=task.name,
output=response.content[0].text,
tokens_used=response.usage.input_tokens + response.usage.output_tokens,
success=True,
)
except Exception as e:
return FanOutResult(
task_name=task.name, output="", tokens_used=0,
success=False, error=str(e),
)
# FAN-OUT: run all tasks concurrently
results = await asyncio.gather(*[run_one(t) for t in tasks])
successful = [r for r in results if r.success]
failed = [r for r in results if not r.success]
if not successful:
raise RuntimeError(f"All {len(tasks)} fan-out tasks failed: {[f.error for f in failed]}")
# FAN-IN: merge results
combined = "\n\n---\n\n".join(
f"## {r.task_name}\n{r.output}" for r in successful
)
merge_response = await client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=4096,
system="Synthesize the following parallel analyses into a unified report.",
messages=[{"role": "user", "content": f"{merge_prompt}\n\n{combined}"}],
)
return {
"synthesis": merge_response.content[0].text,
"individual_results": successful,
"failures": failed,
"total_tokens": sum(r.tokens_used for r in results) + merge_response.usage.input_tokens + merge_response.usage.output_tokens,
}
Pattern 3: Hierarchical Delegation
An orchestrator agent dynamically decomposes work and delegates to specialists.
from typing import Literal
SPECIALISTS = {
"researcher": "Find accurate information with sources. Be thorough and cite evidence.",
"coder": "Write clean, tested code. Include error handling and type hints.",
"writer": "Create clear, engaging content. Match the requested tone and format.",
"analyst": "Analyze data and produce evidence-backed conclusions with visualizations.",
"reviewer": "Review work product for quality, accuracy, and completeness.",
}
@dataclass
class SubTask:
id: str
agent: Literal["researcher", "coder", "writer", "analyst", "reviewer"]
task: str
depends_on: list[str]
priority: int = 0 # higher = run first when deps are equal
class HierarchicalOrchestrator:
def __init__(self, client):
self.client = client
async def plan(self, request: str) -> list[SubTask]:
"""Orchestrator creates an execution plan with dependencies."""
response = await self.client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=2048,
system=f"""You are a task orchestrator. Break down the request into subtasks.
Available specialists: {', '.join(SPECIALISTS.keys())}
Respond with JSON: {{"subtasks": [{{"id": "1", "agent": "researcher", "task": "...", "depends_on": []}}]}}
Rules:
- Minimize the number of subtasks (prefer fewer, more substantial tasks)
- Only add dependencies when output is genuinely needed
- Independent tasks should have empty depends_on for parallel execution""",
messages=[{"role": "user", "content": request}],
)
import json
plan = json.loads(response.content[0].text)
return [SubTask(**st) for st in plan["subtasks"]]
async def execute(self, request: str) -> str:
"""Plan, execute with dependency resolution, and synthesize."""
subtasks = await self.plan(request)
results = {}
# Execute in dependency order, parallelize where possible
for batch in self._batch_by_dependencies(subtasks):
batch_results = await asyncio.gather(*[
self._run_specialist(st, results) for st in batch
])
for st, result in zip(batch, batch_results):
results[st.id] = result
# Final synthesis
all_outputs = "\n\n".join(f"### {k}\n{v}" for k, v in results.items())
synthesis = await self.client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=4096,
system="Synthesize specialist outputs into a coherent final response.",
messages=[{"role": "user", "content": f"Request: {request}\n\nOutputs:\n{all_outputs}"}],
)
return synthesis.content[0].text
def _batch_by_dependencies(self, subtasks: list[SubTask]) -> list[list[SubTask]]:
"""Group subtasks into batches that can run in parallel."""
completed = set()
remaining = list(subtasks)
batches = []
while remaining:
batch = [t for t in remaining if all(d in completed for d in t.depends_on)]
if not batch:
raise ValueError("Circular dependency detected in subtask plan")
batches.append(sorted(batch, key=lambda t: -t.priority))
completed.update(t.id for t in batch)
remaining = [t for t in remaining if t.id not in completed]
return batches
Pattern 4: Event-Driven Reactor
Agents react to events from a message bus. Decoupled and scalable.
from collections import defaultdict
from typing import Callable, Any
class AgentEventBus:
"""Simple event bus for agent-to-agent communication."""
def __init__(self):
self._handlers: dict[str, list[Callable]] = defaultdict(list)
self._history: list[dict] = []
def subscribe(self, event_type: str, handler: Callable):
self._handlers[event_type].append(handler)
async def publish(self, event_type: str, payload: Any, source: str):
event = {"type": event_type, "payload": payload, "source": source}
self._history.append(event)
handlers = self._handlers.get(event_type, [])
results = await asyncio.gather(
*[h(event) for h in handlers],
return_exceptions=True,
)
errors = [(h, r) for h, r in zip(handlers, results) if isinstance(r, Exception)]
if errors:
for handler, error in errors:
print(f"Handler {handler.__name__} failed: {error}")
return results
# Usage: code review pipeline triggered by PR events
bus = AgentEventBus()
async def on_pr_opened(event):
"""Security agent scans PR for vulnerabilities."""
diff = event["payload"]["diff"]
# ... scan and publish results
await bus.publish("security_scan_complete", {"findings": findings}, "security-agent")
async def on_security_complete(event):
"""Review agent incorporates security findings into review."""
# ... generate review with security context
bus.subscribe("pr_opened", on_pr_opened)
bus.subscribe("security_scan_complete", on_security_complete)
Pattern 5: Consensus Validation
Multiple agents independently evaluate the same input. A quorum determines the final output.
@dataclass
class Vote:
agent: str
verdict: str # "approve" | "reject" | "revise"
confidence: float # 0.0 - 1.0
reasoning: str
async def consensus_validate(
content: str,
validators: list[dict], # [{"name": "...", "system": "..."}]
quorum: float = 0.66,
confidence_threshold: float = 0.7,
) -> dict:
"""Run content through multiple validators and determine consensus."""
votes: list[Vote] = []
# Collect independent votes (no agent sees another's vote)
vote_tasks = []
for v in validators:
vote_tasks.append(get_agent_vote(v["name"], v["system"], content))
raw_votes = await asyncio.gather(*vote_tasks)
votes = [v for v in raw_votes if v is not None]
# Calculate consensus
approvals = [v for v in votes if v.verdict == "approve"]
approval_rate = len(approvals) / len(votes) if votes else 0
avg_confidence = sum(v.confidence for v in votes) / len(votes) if votes else 0
if approval_rate >= quorum and avg_confidence >= confidence_threshold:
return {"decision": "approved", "approval_rate": approval_rate, "votes": votes}
elif any(v.verdict == "reject" for v in votes):
rejections = [v for v in votes if v.verdict == "reject"]
return {"decision": "rejected", "reasons": [r.reasoning for r in rejections], "votes": votes}
else:
return {"decision": "needs_revision", "feedback": [v.reasoning for v in votes], "votes": votes}
Agent Routing Strategies
Intent-Based Router
class IntentRouter:
"""Route requests to specialized agents based on intent classification."""
ROUTING_TABLE = {
"code_generation": {"agent": "coder", "model": "claude-sonnet-4-20250514"},
"code_review": {"agent": "reviewer", "model": "claude-sonnet-4-20250514"},
"research": {"agent": "researcher", "model": "claude-sonnet-4-20250514"},
"simple_question": {"agent": "assistant", "model": "claude-haiku-4-20250514"},
"creative_writing": {"agent": "writer", "model": "claude-sonnet-4-20250514"},
"complex_analysis": {"agent": "analyst", "model": "claude-sonnet-4-20250514"},
}
async def route(self, message: str) -> dict:
# Use a fast, cheap model for classification
classification = await self.client.messages.create(
model="claude-haiku-4-20250514",
max_tokens=50,
system="Classify the user intent. Respond with ONLY one of: code_generation, code_review, research, simple_question, creative_writing, complex_analysis",
messages=[{"role": "user", "content": message}],
)
intent = classification.content[0].text.strip().lower()
return self.ROUTING_TABLE.get(intent, self.ROUTING_TABLE["simple_question"])
Context Window Budgeting
MODEL_LIMITS = {
"claude-sonnet-4-20250514": 200_000,
"claude-haiku-4-20250514": 200_000,
"claude-opus-4-20250514": 200_000,
"gpt-4o": 128_000,
}
class ContextBudget:
def __init__(self, model: str, pipeline_stages: int, reserve_pct: float = 0.15):
self.total = MODEL_LIMITS.get(model, 128_000)
self.reserve = int(self.total * reserve_pct)
self.per_stage = (self.total - self.reserve) // pipeline_stages
self.used = 0
def allocate(self, stage: str) -> int:
available = self.total - self.reserve - self.used
allocation = min(self.per_stage, int(available * 0.6))
return max(allocation, 1000) # minimum 1000 tokens per stage
def consume(self, tokens: int):
self.used += tokens
def summarize_if_needed(self, text: str, budget: int) -> str:
estimated_tokens = len(text) // 4
if estimated_tokens <= budget:
return text
# Truncate to budget with marker
char_limit = budget * 4
return text[:char_limit] + "\n\n[Content truncated to fit context budget]"
Cost Optimization Matrix
| Strategy | Cost Reduction | Quality Impact | When to Use |
|---|---|---|---|
| Haiku for routing/classification | 85-90% | Minimal | Always for intent routing |
| Haiku for editing/formatting | 60-70% | Low | Mechanical tasks |
| Sonnet for most stages | Baseline | Baseline | Default choice |
| Opus only for final synthesis | +50% on that stage | Higher quality | High-stakes output |
| Prompt caching (system prompts) | 50-90% per call | None | Repeated system prompts |
| Truncate intermediate outputs | 20-40% | May lose detail | Long pipelines |
| Parallel + early termination | 30-50% | None if threshold met | Search/validation tasks |
| Batch similar requests | Up to 50% | Increased latency | Non-real-time workloads |
Reliability Patterns
Circuit Breaker
import time
class CircuitBreaker:
"""Prevent cascading failures when an agent/model is down."""
def __init__(self, failure_threshold: int = 5, recovery_time: float = 60.0):
self.failure_threshold = failure_threshold
self.recovery_time = recovery_time
self.failures = 0
self.state = "closed" # closed = healthy, open = failing, half-open = testing
self.last_failure_time = 0.0
def can_execute(self) -> bool:
if self.state == "closed":
return True
if self.state == "open":
if time.time() - self.last_failure_time > self.recovery_time:
self.state = "half-open"
return True
return False
return True # half-open: allow one test request
def record_success(self):
self.failures = 0
self.state = "closed"
def record_failure(self):
self.failures += 1
self.last_failure_time = time.time()
if self.failures >= self.failure_threshold:
self.state = "open"
Common Pitfalls
- Over-orchestration — if a single prompt can handle it, adding agents adds cost and latency, not value
- Circular dependencies in subtask graphs causing infinite loops; always validate DAG structure before execution
- Context bleed — passing entire previous outputs to every stage; summarize or extract only what is needed
- No timeout enforcement — a stuck agent blocks the entire pipeline; set wall-clock timeouts at every boundary
- Silent failures — agent returns plausible but incorrect output; add validation stages for critical paths
- Ignoring cost — 10 parallel Opus calls is expensive; model selection is a cost decision, not just a quality one
- Stateless retries on stateful operations — ensure idempotency before enabling automatic retries
- Single point of failure in orchestrator — if the orchestrator agent fails, the entire workflow fails
Best Practices
- Start with a single prompt — only add agents when you prove one cannot handle the task
- Type your handoffs — use dataclasses or TypedDicts for inter-agent data, not raw strings
- Budget context upfront — calculate token allocations before running the pipeline
- Use cheap models for routing — Haiku for classification costs 10x less than Sonnet
- Validate DAG structure at build time, not runtime
- Log every agent call with input hash, output hash, tokens, latency, and cost
- Set SLAs per stage — if research takes >30s, timeout and use cached results
- Test with production-scale inputs — a pipeline that works on 100 words may fail on 10,000