orchestrating-agents
Orchestrating Agents
This skill enables programmatic API invocations for advanced workflows including parallel processing, task delegation, and multi-agent analysis using the Anthropic API.
When to Use This Skill
Primary use cases:
- Parallel sub-tasks: Break complex analysis into simultaneous independent streams
- Multi-perspective analysis: Get 3-5 different expert viewpoints concurrently
- Delegation: Offload specific subtasks to specialized API instances
- Recursive workflows: Orchestrator coordinating multiple API instances
- High-volume processing: Batch process multiple items concurrently
Trigger patterns:
- "Parallel analysis", "multi-perspective review", "concurrent processing"
- "Delegate subtasks", "coordinate multiple agents"
- "Run analyses from different perspectives"
- "Get expert opinions from multiple angles"
Quick Start
Single Invocation
import sys
sys.path.append('/home/user/claude-skills/orchestrating-agents/scripts')
from claude_client import invoke_claude
response = invoke_claude(
prompt="Analyze this code for security vulnerabilities: ...",
model="claude-sonnet-4-6"
)
print(response)
Parallel Multi-Perspective Analysis
from claude_client import invoke_parallel
prompts = [
{
"prompt": "Analyze from security perspective: ...",
"system": "You are a security expert"
},
{
"prompt": "Analyze from performance perspective: ...",
"system": "You are a performance optimization expert"
},
{
"prompt": "Analyze from maintainability perspective: ...",
"system": "You are a software architecture expert"
}
]
results = invoke_parallel(prompts, model="claude-sonnet-4-6")
for i, result in enumerate(results):
print(f"\n=== Perspective {i+1} ===")
print(result)
Parallel with Shared Cached Context (Recommended)
For parallel operations with shared base context, use caching to reduce costs by up to 90%:
from claude_client import invoke_parallel
# Large context shared across all sub-agents (e.g., codebase, documentation)
base_context = """
<codebase>
...large codebase or documentation (1000+ tokens)...
</codebase>
"""
prompts = [
{"prompt": "Find security vulnerabilities in the authentication module"},
{"prompt": "Identify performance bottlenecks in the API layer"},
{"prompt": "Suggest refactoring opportunities in the database layer"}
]
# First sub-agent creates cache, subsequent ones reuse it
results = invoke_parallel(
prompts,
shared_system=base_context,
cache_shared_system=True # 90% cost reduction for cached content
)
Multi-Turn Conversation with Auto-Caching
For sub-agents that need multiple rounds of conversation:
from claude_client import ConversationThread
# Create a conversation thread (auto-caches history)
agent = ConversationThread(
system="You are a code refactoring expert with access to the codebase",
cache_system=True
)
# Turn 1: Initial analysis
response1 = agent.send("Analyze the UserAuth class for issues")
print(response1)
# Turn 2: Follow-up (reuses cached system + turn 1)
response2 = agent.send("How would you refactor the login method?")
print(response2)
# Turn 3: Implementation (reuses all previous context)
response3 = agent.send("Show me the refactored code")
print(response3)
Streaming Responses
For real-time feedback from sub-agents:
from claude_client import invoke_claude_streaming
def show_progress(chunk):
print(chunk, end='', flush=True)
response = invoke_claude_streaming(
"Write a comprehensive security analysis...",
callback=show_progress
)
Parallel Streaming
Monitor multiple sub-agents simultaneously:
from claude_client import invoke_parallel_streaming
def agent1_callback(chunk):
print(f"[Security] {chunk}", end='', flush=True)
def agent2_callback(chunk):
print(f"[Performance] {chunk}", end='', flush=True)
results = invoke_parallel_streaming(
[
{"prompt": "Security review: ..."},
{"prompt": "Performance review: ..."}
],
callbacks=[agent1_callback, agent2_callback]
)
Interruptible Operations
Cancel long-running parallel operations:
from claude_client import invoke_parallel_interruptible, InterruptToken
import threading
import time
token = InterruptToken()
# Run in background
def run_analysis():
results = invoke_parallel_interruptible(
prompts=[...],
interrupt_token=token
)
return results
thread = threading.Thread(target=run_analysis)
thread.start()
# Interrupt after 5 seconds
time.sleep(5)
token.interrupt()
Core Functions
invoke_claude()
Single synchronous invocation with full control:
invoke_claude(
prompt: str | list[dict],
model: str = "claude-sonnet-4-6",
system: str | list[dict] | None = None,
max_tokens: int = 4096,
temperature: float = 1.0,
streaming: bool = False,
cache_system: bool = False,
cache_prompt: bool = False,
messages: list[dict] | None = None,
**kwargs
) -> str
Parameters:
prompt: The user message (string or list of content blocks)model: Claude model to use (default: claude-sonnet-4-6)system: Optional system prompt (string or list of content blocks)max_tokens: Maximum tokens in response (default: 4096)temperature: Randomness 0-1 (default: 1.0)streaming: Enable streaming response (default: False)cache_system: Add cache_control to system prompt (requires 1024+ tokens, default: False)cache_prompt: Add cache_control to user prompt (requires 1024+ tokens, default: False)messages: Pre-built messages list for multi-turn (overrides prompt)**kwargs: Additional API parameters (top_p, top_k, etc.)
Returns: Response text as string
Note: Caching requires minimum 1,024 tokens per cache breakpoint. Cache lifetime is 5 minutes (refreshed on use).
invoke_parallel()
Concurrent invocations using lightweight workflow pattern:
invoke_parallel(
prompts: list[dict],
model: str = "claude-sonnet-4-6",
max_tokens: int = 4096,
max_workers: int = 5,
shared_system: str | list[dict] | None = None,
cache_shared_system: bool = False
) -> list[str]
Parameters:
prompts: List of dicts with 'prompt' (required) and optional 'system', 'temperature', 'cache_system', 'cache_prompt', etc.model: Claude model for all invocationsmax_tokens: Max tokens per responsemax_workers: Max concurrent API calls (default: 5, max: 10)shared_system: System context shared across ALL invocations (for cache efficiency)cache_shared_system: Add cache_control to shared_system (default: False)
Returns: List of response strings in same order as prompts
Note: For optimal cost savings, put large common context (1024+ tokens) in shared_system with cache_shared_system=True. First invocation creates cache, subsequent ones reuse it (90% cost reduction).
invoke_claude_streaming()
Stream responses in real-time with optional callbacks:
invoke_claude_streaming(
prompt: str | list[dict],
callback: callable = None,
model: str = "claude-sonnet-4-6",
system: str | list[dict] | None = None,
max_tokens: int = 4096,
temperature: float = 1.0,
cache_system: bool = False,
cache_prompt: bool = False,
**kwargs
) -> str
Parameters:
callback: Optional function called with each text chunk (str) as it arrives- (other parameters same as invoke_claude)
Returns: Complete accumulated response text
invoke_parallel_streaming()
Parallel invocations with per-agent streaming callbacks:
invoke_parallel_streaming(
prompts: list[dict],
callbacks: list[callable] = None,
model: str = "claude-sonnet-4-6",
max_tokens: int = 4096,
max_workers: int = 5,
shared_system: str | list[dict] | None = None,
cache_shared_system: bool = False
) -> list[str]
Parameters:
callbacks: Optional list of callback functions, one per prompt- (other parameters same as invoke_parallel)
invoke_parallel_interruptible()
Parallel invocations with cancellation support:
invoke_parallel_interruptible(
prompts: list[dict],
interrupt_token: InterruptToken = None,
# ... same other parameters as invoke_parallel
) -> list[str]
Parameters:
interrupt_token: Optional InterruptToken to signal cancellation- (other parameters same as invoke_parallel)
Returns: List of response strings (None for interrupted tasks)
ConversationThread
Manages multi-turn conversations with automatic caching:
thread = ConversationThread(
system: str | list[dict] | None = None,
model: str = "claude-sonnet-4-6",
max_tokens: int = 4096,
temperature: float = 1.0,
cache_system: bool = True
)
response = thread.send(
user_message: str | list[dict],
cache_history: bool = True
) -> str
Methods:
send(message, cache_history=True): Send message and get responseget_messages(): Get conversation historyclear(): Clear conversation history__len__(): Get number of turns
New in 0.3.0:
turn_countproperty: Number of completed turn pairssend_continuation(guidance, cache_history): Lightweight continuation turn (requires priorsend())max_turnsconstructor parameter: Optional turn limitcontinuation_promptconstructor parameter: Default continuation guidance
StallDetector
Monitors activity timestamps and detects unresponsive operations:
from claude_client import StallDetector
def handle_stall(task_id, idle_seconds):
print(f"Task {task_id} stalled for {idle_seconds:.1f}s")
detector = StallDetector(timeout=60.0, on_stall=handle_stall)
detector.register("task-1")
detector.start_monitoring(poll_interval=5.0)
# Call heartbeat() during streaming/progress
detector.heartbeat("task-1")
# When done
detector.unregister("task-1")
detector.stop_monitoring()
TaskTracker (task_state module)
Formal task lifecycle state machine with enforced transitions:
from task_state import TaskTracker, TaskState
tracker = TaskTracker(max_retries=3)
tracker.add("task-1", category="security")
tracker.claim("task-1") # UNCLAIMED → CLAIMED
tracker.start("task-1") # CLAIMED → RUNNING (increments attempt)
tracker.complete("task-1") # RUNNING → COMPLETED
# On failure with retry:
tracker.fail("task-2", error="timeout")
tracker.retry("task-2") # FAILED → RETRY_QUEUED (if under max_retries)
tracker.claim("task-2") # RETRY_QUEUED → CLAIMED
# Query state
tracker.active_count(category="security")
tracker.get_by_state(TaskState.RUNNING)
tracker.summary() # {"completed": 1, "running": 1, ...}
invoke_with_retry() (orchestration module)
Single invocation with exponential backoff:
from orchestration import invoke_with_retry
response = invoke_with_retry(
"Analyze this code...",
max_retries=3,
base_delay_ms=1000, # 1s, 2s, 4s backoff
max_delay_ms=10000, # capped at 10s
)
invoke_parallel_managed() (orchestration module)
Full-featured parallel invocations with all Symphony patterns:
from orchestration import invoke_parallel_managed, ConcurrencyLimiter
limiter = ConcurrencyLimiter(
global_limit=10,
category_limits={"security": 3, "perf": 3}
)
def reconcile(prompts, tracker):
# Filter out invalid/duplicate work before dispatch
return [p for p in prompts if should_run(p)]
results = invoke_parallel_managed(
prompts=[
{"prompt": "Security review...", "task_id": "sec-1", "category": "security"},
{"prompt": "Perf review...", "task_id": "perf-1", "category": "perf"},
],
reconcile=reconcile,
concurrency_limiter=limiter,
max_retries=3,
stall_timeout=60.0,
on_stall=lambda tid, idle: print(f"{tid} stalled"),
)
Example Workflows
See references/workflows.md for detailed examples including:
- Multi-expert code review
- Parallel document analysis
- Recursive task delegation
- Advanced Agent SDK delegation patterns
- Prompt caching workflows
Execute Mode (Default Sub-Agent Prompt)
For autonomous sub-agents that should execute without asking questions:
from claude_client import invoke_claude, EXECUTE_MODE
response = invoke_claude(
prompt="Review auth.py for SQL injection vulnerabilities",
system=f"You are a security expert.\n\n{EXECUTE_MODE}"
)
EXECUTE_MODE encodes these principles (adapted from OpenAI Codex):
- Make assumptions instead of asking questions; state them briefly
- Think ahead: what else might be needed?
- Report failures with what you tried and what you'll do next
- Summarize deliverables and how to validate them
Agent Pool (Named Agents with Messaging)
For workflows where multiple agents need to communicate:
from agent_pool import AgentPool
pool = AgentPool(
shared_system="You are reviewing the auth module of a web app.",
max_depth=3, # prevent recursive spawn explosion
max_agents=10,
)
# Spawn named agents with roles
pool.spawn("security", system=f"Focus on vulnerabilities.\n\n{pool.EXECUTE_MODE}")
pool.spawn("perf", system=f"Focus on performance.\n\n{pool.EXECUTE_MODE}")
# Run turns (pending inter-agent messages auto-injected)
sec_result = pool.run("security", "Review the login flow")
# Agent-to-agent messaging
pool.send("security", to="perf",
content="Auth does N+1 queries in the session check loop",
trigger_turn=True) # auto-runs perf with this context
# Broadcast to all agents
pool.broadcast("security", "Auth uses bcrypt cost=12, 200ms per hash")
# Query pool state
pool.agents() # ["security", "perf"]
pool.agent_info("perf") # {name, depth, children, pending_messages, turns}
Spawn Reservation (Atomic Agent Creation)
For complex workflows where agent creation might fail:
from agent_pool import AgentPool
pool = AgentPool(shared_system="Code review team")
# Reservation pattern: name is reserved, rolled back on exception
with pool.reserve("analyst", parent="lead") as res:
res.configure(system="You analyze code complexity.", model="claude-opus-4-6")
# If configure or any other work raises, the name is released
# Agent "analyst" is now live
# Depth limits prevent unbounded recursion
pool.spawn("sub-analyst", parent="analyst") # depth=2, OK
pool.spawn("sub-sub", parent="sub-analyst") # depth=3, raises ValueError
When to Use AgentPool vs invoke_parallel
| Pattern | Use When |
|---|---|
invoke_parallel() |
Independent tasks, no inter-agent communication needed |
AgentPool |
Agents need to share findings, build on each other's work, or have parent/child relationships |
invoke_parallel_managed() |
Independent tasks with retry, stall detection, concurrency limits |
Setup
Prerequisites:
-
Install anthropic library:
uv pip install anthropic -
Configure API key via project knowledge file:
Option 1 (recommended): Individual file
- Create document:
ANTHROPIC_API_KEY.txt - Content: Your API key (e.g.,
sk-ant-api03-...)
Option 2: Combined file
- Create document:
API_CREDENTIALS.json - Content:
{ "anthropic_api_key": "sk-ant-api03-..." }
Get your API key: https://console.anthropic.com/settings/keys
- Create document:
Installation check:
python3 -c "import anthropic; print(f'✓ anthropic {anthropic.__version__}')"
Error Handling
The module provides comprehensive error handling:
from claude_client import invoke_claude, ClaudeInvocationError
try:
response = invoke_claude("Your prompt here")
except ClaudeInvocationError as e:
print(f"API Error: {e}")
print(f"Status: {e.status_code}")
print(f"Details: {e.details}")
except ValueError as e:
print(f"Configuration Error: {e}")
Common errors:
- API key missing: Add ANTHROPIC_API_KEY.txt to project knowledge (see Setup above)
- Rate limits: Reduce max_workers or add delays
- Token limits: Reduce prompt size or max_tokens
- Network errors: Automatic retry with exponential backoff
Prompt Caching
For detailed caching workflows and best practices, see references/workflows.md.
Performance Considerations
Token efficiency:
- Parallel calls use more tokens but save wall-clock time
- Use prompt caching for shared context (90% cost reduction)
- Use concise system prompts to reduce overhead
- Consider token budgets when setting max_tokens
Rate limits:
- Anthropic API has per-minute rate limits
- Default max_workers=5 is safe for most tiers
- Adjust based on your API tier and rate limits
Cost management:
- Each invocation consumes API credits
- Monitor usage in Anthropic Console
- Use smaller models (haiku) for simple tasks
- Use prompt caching for repeated context (90% savings)
- Cache lifetime: 5 minutes, refreshed on each use
Best Practices
-
Use parallel invocations for independent tasks only
- Don't parallelize sequential dependencies
- Each parallel task should be self-contained
-
Set appropriate system prompts
- Define clear roles/expertise for each instance
- Keeps responses focused and relevant
-
Handle errors gracefully
- Always wrap invocations in try-except
- Provide fallback behavior for failures
-
Test with small batches first
- Verify prompts work before scaling
- Check token usage and costs
-
Consider alternatives
- Not all tasks benefit from multiple instances
- Sometimes sequential with context is better
Token Efficiency
This skill uses ~800 tokens when loaded but enables powerful multi-agent patterns that can dramatically improve complex analysis quality and speed.
See Also
- references/api-reference.md - Detailed API documentation
- Anthropic API Docs - Official documentation
More from oaustegard/claude-skills
developing-preact
Specialized Preact development skill for standards-based web applications with native-first architecture and minimal dependency footprint. Use when building Preact projects, particularly those involving data visualization, interactive applications, single-page apps with HTM syntax, Web Components integration, CSV/JSON data parsing, WebGL shader visualizations, or zero-build solutions with vendored ESM imports.
106reviewing-ai-papers
Analyze AI/ML technical content (papers, articles, blog posts) and extract actionable insights filtered through enterprise AI engineering lens. Use when user provides URL/document for AI/ML content analysis, asks to "review this paper", or mentions technical content in domains like RAG, embeddings, fine-tuning, prompt engineering, LLM deployment.
80exploring-codebases
>-
64mapping-codebases
Generate navigable code maps for unfamiliar codebases. Extracts exports/imports via AST (tree-sitter) to create _MAP.md files per directory showing classes, functions, methods with signatures and line numbers. Use when exploring repositories, understanding project structure, analyzing unfamiliar code, or before modifications. Triggers on "map this codebase", "explore repo", "understand structure", "what does this project contain", or when starting work on an unfamiliar repository.
50accessing-github-repos
GitHub repository access in containerized environments using REST API and credential detection. Use when git clone fails, or when accessing private repos/writing files via API.
44asking-questions
Guidance for asking clarifying questions when user requests are ambiguous, have multiple valid approaches, or require critical decisions. Use when implementation choices exist that could significantly affect outcomes.
42