agent-workflow-designer
Agent Workflow Designer
The agent designs multi-agent orchestration systems using five core patterns: sequential pipeline, parallel fan-out/fan-in, hierarchical delegation, event-driven reactor, and consensus validation. It implements agent routing strategies, circuit breaker reliability patterns, context window budgeting, and cost optimization across LangGraph, CrewAI, AutoGen, and Claude Code agent teams.
Core Capabilities
1. Pattern Selection and Design
- Sequential pipelines with typed handoffs
- Parallel fan-out/fan-in with merge strategies
- Hierarchical delegation with dynamic subtask discovery
- Event-driven reactors with pub/sub agent triggers
- Consensus validation with voting and arbitration
2. Agent Routing
- Intent-based routing with classifier agents
- Skill-based routing using capability matching
- Cost-aware routing (cheap models for simple tasks)
- Load-balanced routing across agent pools
- Fallback chains with graceful degradation
3. State and Context Management
- Persistent workflow state across agent hops
- Context window budgeting and summarization
- Checkpoint/resume for long-running workflows
- Conflict resolution for parallel state updates
4. Reliability Engineering
- Circuit breakers for failing agents
- Retry with exponential backoff and model fallback
- Dead letter queues for unprocessable tasks
- Timeout enforcement at every agent boundary
- Idempotent operations for safe retries
When to Use
- Building multi-step AI pipelines that exceed one agent's capability
- Parallelizing research, analysis, or generation tasks
- Creating specialist agent teams with defined roles and contracts
- Designing fault-tolerant AI workflows for production deployment
- Optimizing cost across workflows with mixed model tiers
Pattern Selection Decision Tree
What does the workflow look like?
│
├─ Linear: step A feeds step B feeds step C
│ └─ SEQUENTIAL PIPELINE
│ Best for: content pipelines, code review chains, data transformation
│
├─ Parallel: N independent tasks, then combine
│ └─ FAN-OUT / FAN-IN
│ Best for: competitive research, multi-source analysis, parallel code gen
│
├─ Tree: orchestrator breaks work into subtasks dynamically
│ └─ HIERARCHICAL DELEGATION
│ Best for: complex projects, open-ended research, code generation with planning
│
├─ Reactive: agents respond to events/triggers
│ └─ EVENT-DRIVEN REACTOR
│ Best for: monitoring, alerting, continuous integration, chat workflows
│
└─ Verification: multiple agents must agree on output
└─ CONSENSUS VALIDATION
Best for: high-stakes decisions, code review, fact checking, safety-critical output
Pattern 1: Sequential Pipeline
Each stage transforms input and passes structured output to the next. Type-safe handoffs prevent data loss between stages.
LangGraph Implementation
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
from langchain_anthropic import ChatAnthropic
class PipelineState(TypedDict):
topic: str
research: str
draft: str
final: str
stage_costs: Annotated[list[dict], "append"] # accumulates cost per stage
def research_stage(state: PipelineState) -> dict:
model = ChatAnthropic(model="claude-sonnet-4-20250514", max_tokens=2048)
result = model.invoke(
f"Research the following topic thoroughly. Provide key facts, statistics, "
f"and expert perspectives:\n\n{state['topic']}"
)
return {
"research": result.content,
"stage_costs": [{"stage": "research", "tokens": result.usage_metadata["total_tokens"]}],
}
def writing_stage(state: PipelineState) -> dict:
model = ChatAnthropic(model="claude-sonnet-4-20250514", max_tokens=4096)
result = model.invoke(
f"Using this research, write a compelling 800-word blog post with a hook, "
f"3 main sections, and a CTA:\n\n{state['research']}"
)
return {
"draft": result.content,
"stage_costs": [{"stage": "writing", "tokens": result.usage_metadata["total_tokens"]}],
}
def editing_stage(state: PipelineState) -> dict:
model = ChatAnthropic(model="claude-haiku-4-20250514", max_tokens=4096)
result = model.invoke(
f"Edit this draft for clarity, flow, and grammar. Return only the improved "
f"version:\n\n{state['draft']}"
)
return {
"final": result.content,
"stage_costs": [{"stage": "editing", "tokens": result.usage_metadata["total_tokens"]}],
}
# Build the graph
graph = StateGraph(PipelineState)
graph.add_node("research", research_stage)
graph.add_node("write", writing_stage)
graph.add_node("edit", editing_stage)
graph.add_edge("research", "write")
graph.add_edge("write", "edit")
graph.add_edge("edit", END)
graph.set_entry_point("research")
pipeline = graph.compile()
# Execute
result = pipeline.invoke({"topic": "The future of AI agents in enterprise software"})
print(f"Total cost: {sum(s['tokens'] for s in result['stage_costs'])} tokens")
Pattern 2: Parallel Fan-Out / Fan-In
Independent tasks run concurrently. A merge function combines results.
import asyncio
from dataclasses import dataclass
@dataclass
class FanOutTask:
name: str
system_prompt: str
user_message: str
model: str = "claude-sonnet-4-20250514"
@dataclass
class FanOutResult:
task_name: str
output: str
tokens_used: int
success: bool
error: str | None = None
async def fan_out_fan_in(
tasks: list[FanOutTask],
merge_prompt: str,
max_concurrent: int = 5,
timeout_seconds: float = 60.0,
) -> dict:
"""Execute tasks in parallel with concurrency limit and timeout."""
import anthropic
client = anthropic.AsyncAnthropic()
semaphore = asyncio.Semaphore(max_concurrent)
async def run_one(task: FanOutTask) -> FanOutResult:
async with semaphore:
try:
response = await asyncio.wait_for(
client.messages.create(
model=task.model,
max_tokens=2048,
system=task.system_prompt,
messages=[{"role": "user", "content": task.user_message}],
),
timeout=timeout_seconds,
)
return FanOutResult(
task_name=task.name,
output=response.content[0].text,
tokens_used=response.usage.input_tokens + response.usage.output_tokens,
success=True,
)
except Exception as e:
return FanOutResult(
task_name=task.name, output="", tokens_used=0,
success=False, error=str(e),
)
# FAN-OUT: run all tasks concurrently
results = await asyncio.gather(*[run_one(t) for t in tasks])
successful = [r for r in results if r.success]
failed = [r for r in results if not r.success]
if not successful:
raise RuntimeError(f"All {len(tasks)} fan-out tasks failed: {[f.error for f in failed]}")
# FAN-IN: merge results
combined = "\n\n---\n\n".join(
f"## {r.task_name}\n{r.output}" for r in successful
)
merge_response = await client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=4096,
system="Synthesize the following parallel analyses into a unified report.",
messages=[{"role": "user", "content": f"{merge_prompt}\n\n{combined}"}],
)
return {
"synthesis": merge_response.content[0].text,
"individual_results": successful,
"failures": failed,
"total_tokens": sum(r.tokens_used for r in results) + merge_response.usage.input_tokens + merge_response.usage.output_tokens,
}
Pattern 3: Hierarchical Delegation
An orchestrator agent dynamically decomposes work and delegates to specialists.
from typing import Literal
SPECIALISTS = {
"researcher": "Find accurate information with sources. Be thorough and cite evidence.",
"coder": "Write clean, tested code. Include error handling and type hints.",
"writer": "Create clear, engaging content. Match the requested tone and format.",
"analyst": "Analyze data and produce evidence-backed conclusions with visualizations.",
"reviewer": "Review work product for quality, accuracy, and completeness.",
}
@dataclass
class SubTask:
id: str
agent: Literal["researcher", "coder", "writer", "analyst", "reviewer"]
task: str
depends_on: list[str]
priority: int = 0 # higher = run first when deps are equal
class HierarchicalOrchestrator:
def __init__(self, client):
self.client = client
async def plan(self, request: str) -> list[SubTask]:
"""Orchestrator creates an execution plan with dependencies."""
response = await self.client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=2048,
system=f"""You are a task orchestrator. Break down the request into subtasks.
Available specialists: {', '.join(SPECIALISTS.keys())}
Respond with JSON: {{"subtasks": [{{"id": "1", "agent": "researcher", "task": "...", "depends_on": []}}]}}
Rules:
- Minimize the number of subtasks (prefer fewer, more substantial tasks)
- Only add dependencies when output is genuinely needed
- Independent tasks should have empty depends_on for parallel execution""",
messages=[{"role": "user", "content": request}],
)
import json
plan = json.loads(response.content[0].text)
return [SubTask(**st) for st in plan["subtasks"]]
async def execute(self, request: str) -> str:
"""Plan, execute with dependency resolution, and synthesize."""
subtasks = await self.plan(request)
results = {}
# Execute in dependency order, parallelize where possible
for batch in self._batch_by_dependencies(subtasks):
batch_results = await asyncio.gather(*[
self._run_specialist(st, results) for st in batch
])
for st, result in zip(batch, batch_results):
results[st.id] = result
# Final synthesis
all_outputs = "\n\n".join(f"### {k}\n{v}" for k, v in results.items())
synthesis = await self.client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=4096,
system="Synthesize specialist outputs into a coherent final response.",
messages=[{"role": "user", "content": f"Request: {request}\n\nOutputs:\n{all_outputs}"}],
)
return synthesis.content[0].text
def _batch_by_dependencies(self, subtasks: list[SubTask]) -> list[list[SubTask]]:
"""Group subtasks into batches that can run in parallel."""
completed = set()
remaining = list(subtasks)
batches = []
while remaining:
batch = [t for t in remaining if all(d in completed for d in t.depends_on)]
if not batch:
raise ValueError("Circular dependency detected in subtask plan")
batches.append(sorted(batch, key=lambda t: -t.priority))
completed.update(t.id for t in batch)
remaining = [t for t in remaining if t.id not in completed]
return batches
Pattern 4: Event-Driven Reactor
Agents react to events from a message bus. Decoupled and scalable.
from collections import defaultdict
from typing import Callable, Any
class AgentEventBus:
"""Simple event bus for agent-to-agent communication."""
def __init__(self):
self._handlers: dict[str, list[Callable]] = defaultdict(list)
self._history: list[dict] = []
def subscribe(self, event_type: str, handler: Callable):
self._handlers[event_type].append(handler)
async def publish(self, event_type: str, payload: Any, source: str):
event = {"type": event_type, "payload": payload, "source": source}
self._history.append(event)
handlers = self._handlers.get(event_type, [])
results = await asyncio.gather(
*[h(event) for h in handlers],
return_exceptions=True,
)
errors = [(h, r) for h, r in zip(handlers, results) if isinstance(r, Exception)]
if errors:
for handler, error in errors:
print(f"Handler {handler.__name__} failed: {error}")
return results
# Usage: code review pipeline triggered by PR events
bus = AgentEventBus()
async def on_pr_opened(event):
"""Security agent scans PR for vulnerabilities."""
diff = event["payload"]["diff"]
# ... scan and publish results
await bus.publish("security_scan_complete", {"findings": findings}, "security-agent")
async def on_security_complete(event):
"""Review agent incorporates security findings into review."""
# ... generate review with security context
bus.subscribe("pr_opened", on_pr_opened)
bus.subscribe("security_scan_complete", on_security_complete)
Pattern 5: Consensus Validation
Multiple agents independently evaluate the same input. A quorum determines the final output.
@dataclass
class Vote:
agent: str
verdict: str # "approve" | "reject" | "revise"
confidence: float # 0.0 - 1.0
reasoning: str
async def consensus_validate(
content: str,
validators: list[dict], # [{"name": "...", "system": "..."}]
quorum: float = 0.66,
confidence_threshold: float = 0.7,
) -> dict:
"""Run content through multiple validators and determine consensus."""
votes: list[Vote] = []
# Collect independent votes (no agent sees another's vote)
vote_tasks = []
for v in validators:
vote_tasks.append(get_agent_vote(v["name"], v["system"], content))
raw_votes = await asyncio.gather(*vote_tasks)
votes = [v for v in raw_votes if v is not None]
# Calculate consensus
approvals = [v for v in votes if v.verdict == "approve"]
approval_rate = len(approvals) / len(votes) if votes else 0
avg_confidence = sum(v.confidence for v in votes) / len(votes) if votes else 0
if approval_rate >= quorum and avg_confidence >= confidence_threshold:
return {"decision": "approved", "approval_rate": approval_rate, "votes": votes}
elif any(v.verdict == "reject" for v in votes):
rejections = [v for v in votes if v.verdict == "reject"]
return {"decision": "rejected", "reasons": [r.reasoning for r in rejections], "votes": votes}
else:
return {"decision": "needs_revision", "feedback": [v.reasoning for v in votes], "votes": votes}
Agent Routing Strategies
Intent-Based Router
class IntentRouter:
"""Route requests to specialized agents based on intent classification."""
ROUTING_TABLE = {
"code_generation": {"agent": "coder", "model": "claude-sonnet-4-20250514"},
"code_review": {"agent": "reviewer", "model": "claude-sonnet-4-20250514"},
"research": {"agent": "researcher", "model": "claude-sonnet-4-20250514"},
"simple_question": {"agent": "assistant", "model": "claude-haiku-4-20250514"},
"creative_writing": {"agent": "writer", "model": "claude-sonnet-4-20250514"},
"complex_analysis": {"agent": "analyst", "model": "claude-sonnet-4-20250514"},
}
async def route(self, message: str) -> dict:
# Use a fast, cheap model for classification
classification = await self.client.messages.create(
model="claude-haiku-4-20250514",
max_tokens=50,
system="Classify the user intent. Respond with ONLY one of: code_generation, code_review, research, simple_question, creative_writing, complex_analysis",
messages=[{"role": "user", "content": message}],
)
intent = classification.content[0].text.strip().lower()
return self.ROUTING_TABLE.get(intent, self.ROUTING_TABLE["simple_question"])
Context Window Budgeting
MODEL_LIMITS = {
"claude-sonnet-4-20250514": 200_000,
"claude-haiku-4-20250514": 200_000,
"claude-opus-4-20250514": 200_000,
"gpt-4o": 128_000,
}
class ContextBudget:
def __init__(self, model: str, pipeline_stages: int, reserve_pct: float = 0.15):
self.total = MODEL_LIMITS.get(model, 128_000)
self.reserve = int(self.total * reserve_pct)
self.per_stage = (self.total - self.reserve) // pipeline_stages
self.used = 0
def allocate(self, stage: str) -> int:
available = self.total - self.reserve - self.used
allocation = min(self.per_stage, int(available * 0.6))
return max(allocation, 1000) # minimum 1000 tokens per stage
def consume(self, tokens: int):
self.used += tokens
def summarize_if_needed(self, text: str, budget: int) -> str:
estimated_tokens = len(text) // 4
if estimated_tokens <= budget:
return text
# Truncate to budget with marker
char_limit = budget * 4
return text[:char_limit] + "\n\n[Content truncated to fit context budget]"
Cost Optimization Matrix
| Strategy | Cost Reduction | Quality Impact | When to Use |
|---|---|---|---|
| Haiku for routing/classification | 85-90% | Minimal | Always for intent routing |
| Haiku for editing/formatting | 60-70% | Low | Mechanical tasks |
| Sonnet for most stages | Baseline | Baseline | Default choice |
| Opus only for final synthesis | +50% on that stage | Higher quality | High-stakes output |
| Prompt caching (system prompts) | 50-90% per call | None | Repeated system prompts |
| Truncate intermediate outputs | 20-40% | May lose detail | Long pipelines |
| Parallel + early termination | 30-50% | None if threshold met | Search/validation tasks |
| Batch similar requests | Up to 50% | Increased latency | Non-real-time workloads |
Reliability Patterns
Circuit Breaker
import time
class CircuitBreaker:
"""Prevent cascading failures when an agent/model is down."""
def __init__(self, failure_threshold: int = 5, recovery_time: float = 60.0):
self.failure_threshold = failure_threshold
self.recovery_time = recovery_time
self.failures = 0
self.state = "closed" # closed = healthy, open = failing, half-open = testing
self.last_failure_time = 0.0
def can_execute(self) -> bool:
if self.state == "closed":
return True
if self.state == "open":
if time.time() - self.last_failure_time > self.recovery_time:
self.state = "half-open"
return True
return False
return True # half-open: allow one test request
def record_success(self):
self.failures = 0
self.state = "closed"
def record_failure(self):
self.failures += 1
self.last_failure_time = time.time()
if self.failures >= self.failure_threshold:
self.state = "open"
Common Pitfalls
- Over-orchestration — if a single prompt can handle it, adding agents adds cost and latency, not value
- Circular dependencies in subtask graphs causing infinite loops; always validate DAG structure before execution
- Context bleed — passing entire previous outputs to every stage; summarize or extract only what is needed
- No timeout enforcement — a stuck agent blocks the entire pipeline; set wall-clock timeouts at every boundary
- Silent failures — agent returns plausible but incorrect output; add validation stages for critical paths
- Ignoring cost — 10 parallel Opus calls is expensive; model selection is a cost decision, not just a quality one
- Stateless retries on stateful operations — ensure idempotency before enabling automatic retries
- Single point of failure in orchestrator — if the orchestrator agent fails, the entire workflow fails
Best Practices
- Start with a single prompt — only add agents when you prove one cannot handle the task
- Type your handoffs — use dataclasses or TypedDicts for inter-agent data, not raw strings
- Budget context upfront — calculate token allocations before running the pipeline
- Use cheap models for routing — Haiku for classification costs 10x less than Sonnet
- Validate DAG structure at build time, not runtime
- Log every agent call with input hash, output hash, tokens, latency, and cost
- Set SLAs per stage — if research takes >30s, timeout and use cached results
- Test with production-scale inputs — a pipeline that works on 100 words may fail on 10,000
Troubleshooting
| Problem | Cause | Solution |
|---|---|---|
| Pipeline hangs indefinitely | Missing timeout enforcement on one or more agent stages | Add asyncio.wait_for() with explicit timeout_seconds at every agent boundary; use the Circuit Breaker pattern to fail fast |
| Circular dependency error at runtime | Subtask graph contains a cycle (e.g., task A depends on B which depends on A) | Validate DAG structure at build time with topological sort; the _batch_by_dependencies method catches this but validation should happen earlier |
| Context window exceeded mid-pipeline | Intermediate outputs grow beyond the model's token limit | Use the ContextBudget class to allocate tokens per stage; summarize or truncate outputs before passing to the next stage |
| Fan-out tasks return inconsistent formats | Each parallel agent interprets the output schema differently | Define a shared TypedDict or dataclass for all fan-out results; add a validation step before the merge function |
| Orchestrator plan creates too many subtasks | The planning prompt does not constrain subtask count, leading to over-decomposition | Add explicit constraints in the planner system prompt (e.g., "maximum 5 subtasks"); review and approve plans before execution in high-stakes workflows |
| Consensus never reaches quorum | Validators disagree consistently or confidence scores are too low | Lower the quorum threshold, add a tiebreaker agent, or revise validator prompts to align on evaluation criteria |
| Cost spikes on parallel workflows | Expensive models (Opus) used for all fan-out branches instead of routing by complexity | Apply cost-aware routing: use Haiku for classification and simple tasks, Sonnet for most work, Opus only for final synthesis or high-stakes decisions |
Success Criteria
- Pipeline end-to-end latency stays within the defined SLA (e.g., under 60 seconds for a 5-stage workflow) with no stage exceeding its individual timeout
- Agent routing accuracy exceeds 90% when measured against a labeled test set of at least 100 representative requests
- Fan-out/fan-in workflows complete with fewer than 5% task failures across all parallel branches under normal operating conditions
- Total token cost per workflow run decreases by at least 40% after applying model tiering (Haiku for routing, Sonnet for core work, Opus for synthesis)
- Circuit breakers trigger correctly within 5 consecutive failures and recover automatically after the defined recovery window
- Context window utilization stays below 85% of model limits at every pipeline stage, with no truncation-related quality degradation
- All inter-agent handoffs pass schema validation with zero type errors across 100 consecutive workflow executions
Scope & Limitations
This skill covers:
- Design and implementation of five core multi-agent orchestration patterns (sequential, parallel, hierarchical, event-driven, consensus)
- Agent routing strategies including intent-based, skill-based, and cost-aware routing
- Reliability engineering patterns: circuit breakers, retries, timeouts, and dead letter queues
- Context window budgeting, cost optimization, and framework-specific implementations (LangGraph, CrewAI, AutoGen)
This skill does NOT cover:
- Training or fine-tuning the underlying LLMs used by agents (see
engineering/ml-pipeline-architectfor ML training workflows) - Infrastructure provisioning, container orchestration, or deployment pipelines (see
engineering/cloud-infrastructure-designerfor cloud architecture) - Human-in-the-loop approval workflows or UI design for agent dashboards (see
product-team/ux-researcherfor user-facing workflow design) - Long-term agent memory, vector database setup, or RAG pipeline construction (see
engineering/rag-pipeline-architectfor retrieval-augmented generation)
Integration Points
| Skill | Integration | Data Flow |
|---|---|---|
engineering/ml-pipeline-architect |
Agent workflows that include ML inference stages use ML Pipeline Architect for model serving and batch prediction design | Workflow DAG exports stage specs to ML pipeline; ML pipeline returns inference endpoints for agent consumption |
engineering/rag-pipeline-architect |
Research and retrieval agents within workflows rely on RAG pipelines for grounded knowledge access | Agent sends queries to RAG pipeline; RAG returns ranked document chunks with citations for agent context |
engineering/cloud-infrastructure-designer |
Production deployment of agent workflows requires infrastructure design for scaling, queuing, and monitoring | Workflow resource requirements feed into infrastructure specs; infra returns endpoint URLs, queue ARNs, and scaling policies |
engineering/api-design-architect |
Inter-agent communication contracts and external API boundaries follow API design standards | Agent handoff schemas are validated against API design specs; API architect provides OpenAPI definitions for external integrations |
engineering/system-design-architect |
Overall system architecture decisions (sync vs async, monolith vs distributed) shape workflow topology choices | System design constraints (latency budgets, availability targets) inform pattern selection; workflow requirements feed back into system capacity planning |
project-management/technical-project-planning |
Complex multi-agent projects require structured planning for phased rollout, risk management, and milestone tracking | Workflow complexity estimates feed into project plans; PM skill provides sprint boundaries and dependency timelines for staged deployment |