anthropic-streaming-patterns
Anthropic Claude API Streaming Patterns
Overview
Claude API integration with streaming, tool execution, and cost tracking using Anthropic SDK.
Core principle: Stream (don't buffer). Track costs. Handle tools correctly.
Announce at start: "I'm using the anthropic-streaming-patterns skill for Claude API integration."
When to Use
- Implementing Claude API service (Task 3.4)
- Implementing streaming responses
- Implementing tool execution within streams
- Tracking API costs
- Debugging streaming issues
Quick Reference
| Pattern | SDK Method | Purpose |
|---|---|---|
| Initialize | messages.stream() | Start streaming |
| Text deltas | stream.on('text') | Receive text chunks |
| Tool start | stream.on('contentBlockStart') | Tool use begins |
| Tool input | stream.on('contentBlockDelta') | Accumulate params |
| Tool complete | stream.on('contentBlockStop') | Execute tool |
| Stream end | stream.on('message') | Calculate costs |
| Errors | stream.on('error') | Handle failures |
Streaming Pattern (Complete)
const stream = client.messages.stream({
model: 'claude-sonnet-4-20250514',
max_tokens: 8192,
messages: messageHistory,
tools: toolDefinitions,
});
let currentToolUse = null;
let accumulatedInput = '';
// Text deltas → forward to client
stream.on('text', (text) => {
sendToClient({type: 'content_delta', delta: text});
});
// Tool use started
stream.on('contentBlockStart', (block) => {
if (block.type === 'tool_use') {
currentToolUse = {name: block.name, id: block.id};
accumulatedInput = '';
sendToClient({type: 'tool_execution', tool: block.name});
}
});
// Tool input accumulation
stream.on('contentBlockDelta', (delta) => {
if (delta.type === 'input_json_delta' && currentToolUse) {
accumulatedInput = delta.partial_json;
}
});
// Tool execution
stream.on('contentBlockStop', async () => {
if (currentToolUse) {
const input = JSON.parse(accumulatedInput);
const result = await executeTool(currentToolUse.name, input);
sendToClient({type: 'tool_result', result});
currentToolUse = null;
}
});
// Stream complete with usage
stream.on('message', (message) => {
if (message.usage) {
const inputCost = (message.usage.input_tokens / 1000) * 0.003;
const outputCost = (message.usage.output_tokens / 1000) * 0.015;
saveSessionCost(sessionId, {
inputTokens: message.usage.input_tokens,
outputTokens: message.usage.output_tokens,
cost: inputCost + outputCost
});
}
});
stream.on('error', (error) => {
logger.error('Streaming error:', error);
sendToClient({type: 'error', error: error.message});
});
await stream.finalMessage(); // Wait for completion
Cost Tracking (MANDATORY)
const PRICING = {
input: 0.003, // $0.003 per 1k tokens
output: 0.015, // $0.015 per 1k output tokens
};
// Calculate per message
const cost = {
input: (inputTokens / 1000) * PRICING.input,
output: (outputTokens / 1000) * PRICING.output,
total: inputCost + outputCost
};
// Aggregate per session
sessionCosts.push(cost);
const sessionTotal = sessionCosts.reduce((sum, c) => sum + c.total, 0);
Error Handling
try {
const stream = await client.messages.stream({...});
} catch (error) {
if (error.status === 429) {
// Rate limit - wait and retry
await delay(60000);
return retry();
} else if (error.status === 401) {
// Auth error
throw new Error('Invalid API key');
} else {
logger.error(error);
throw error;
}
}
Common Mistakes
| Mistake | Reality |
|---|---|
| "Buffering is simpler" | WRONG. Streaming provides real-time UX. Required. |
| "Cost tracking is optional" | WRONG. Users need visibility. Prevents surprise bills. |
| "I can figure out SDK" | WRONG. Event handling is subtle. Use proven patterns. |
| "Error handling later" | WRONG. Streams fail. Handle from start. |
❌ WRONG: Buffering
const response = await client.messages.create({...}); // Buffering
const fullText = response.content[0].text;
sendToClient(fullText);
✅ CORRECT: Streaming
const stream = await client.messages.stream({...});
stream.on('text', (delta) => sendToClient({type: 'content_delta', delta}));
Red Flags
- "Buffering is easier" → WRONG. Stream for real-time.
- "Cost tracking is overhead" → WRONG. Mandatory feature.
- "Skip error handling" → WRONG. Streams fail often.
Integration
- Use FOR: Task 3.4 (claude.service.ts)
- Use WITH:
@claude-mobile-cost-tracking - Integrate: Task 3.11 (cost.service.ts)
More from krzemienski/claude-mobile-expo
react-native-expo-development
Use when developing React Native components, installing packages via expo-mcp, implementing screens, or following RN best practices - integrates expo-mcp workflows (add_library, search_documentation) with production patterns from Gifted Chat and Stream
23claude-mobile-ios-testing
Use when testing iOS apps on simulator, capturing screenshots for validation gates, automating UI testing with expo-mcp and xc-mcp, or verifying visual correctness - combines expo-mcp autonomous testing (React Native level) with xc-mcp simulator management (iOS level)
8claude-mobile-metro-manager
Use when starting Metro bundler for Expo development, debugging Metro errors, or enabling expo-mcp local capabilities - manages Metro lifecycle with EXPO_UNSTABLE_MCP_SERVER=1 flag for autonomous testing
4idb-claude-mobile-testing
Use when testing Claude Code Mobile app on iOS simulator with IDB CLI, when xc-mcp tools unavailable, or when needing testID-based UI automation - provides systematic workflow for finding elements by testID, tapping, typing, and verifying interactions using IDB accessibility tree
3claude-mobile-validation-gate
Use when executing validation gates 3A, 4A, or 6A-E, verifying phase completion with expo-mcp visual testing, or encountering test failures - automates gate execution with expo-mcp autonomous verification and HARD STOP enforcement
3claude-mobile-cost-tracking
Use when implementing Claude API cost tracking, monitoring token usage, displaying cost metrics in Settings, or user asks about costs - calculates exact costs using $0.003/1k input and $0.015/1k output pricing with per-session aggregation
2