langgraph-streaming
SKILL.md
LangGraph Streaming
Real-time updates and progress tracking for LangGraph workflows.
5 Stream Modes
# Available modes
for mode, chunk in graph.stream(inputs, stream_mode=["values", "updates", "messages", "custom", "debug"]):
print(f"[{mode}] {chunk}")
| Mode | Purpose | Use Case |
|---|---|---|
| values | Full state after each step | Debugging, state inspection |
| updates | State deltas after each step | Efficient UI updates |
| messages | LLM tokens + metadata | Chat interfaces, typing indicators |
| custom | User-defined events | Progress bars, status updates |
| debug | Maximum information | Development, troubleshooting |
Custom Events with StreamWriter
from langgraph.config import get_stream_writer
def node_with_progress(state: State):
"""Emit custom progress events."""
writer = get_stream_writer()
for i, item in enumerate(state["items"]):
writer({
"type": "progress",
"current": i + 1,
"total": len(state["items"]),
"status": f"Processing {item}"
})
result = process(item)
writer({"type": "complete", "message": "All items processed"})
return {"results": results}
# Consume custom events
for mode, chunk in graph.stream(inputs, stream_mode=["updates", "custom"]):
if mode == "custom":
if chunk.get("type") == "progress":
print(f"Progress: {chunk['current']}/{chunk['total']}")
elif mode == "updates":
print(f"State updated: {list(chunk.keys())}")
LLM Token Streaming
# Stream tokens from LLM calls
for message_chunk, metadata in graph.stream(
{"topic": "AI safety"},
stream_mode="messages"
):
if message_chunk.content:
print(message_chunk.content, end="", flush=True)
# Filter by node
for msg, meta in graph.stream(inputs, stream_mode="messages"):
if meta["langgraph_node"] == "writer_agent":
print(msg.content, end="")
# Filter by tags
model = init_chat_model("claude-sonnet-4-20250514", tags=["main_response"])
for msg, meta in graph.stream(inputs, stream_mode="messages"):
if "main_response" in meta.get("tags", []):
print(msg.content, end="")
Subgraph Streaming
# Enable subgraph visibility
for namespace, chunk in graph.stream(
inputs,
subgraphs=True,
stream_mode="updates"
):
# namespace shows graph hierarchy: (), ("child",), ("child", "grandchild")
print(f"[{'/'.join(namespace) or 'root'}] {chunk}")
Multiple Modes Simultaneously
# Combine modes for comprehensive feedback
async for mode, chunk in graph.astream(
inputs,
stream_mode=["updates", "custom", "messages"]
):
match mode:
case "updates":
update_ui_state(chunk)
case "custom":
show_progress(chunk)
case "messages":
append_to_chat(chunk)
Non-LangChain LLM Streaming
def call_custom_llm(state: State):
"""Stream from arbitrary LLM APIs."""
writer = get_stream_writer()
for chunk in your_streaming_client.generate(state["prompt"]):
writer({"type": "llm_token", "content": chunk.text})
return {"response": full_response}
FastAPI SSE Integration
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import json
app = FastAPI()
@app.post("/stream")
async def stream_workflow(request: WorkflowRequest):
async def event_generator():
async for mode, chunk in graph.astream(
request.inputs,
stream_mode=["updates", "custom"]
):
yield f"data: {json.dumps({'mode': mode, 'data': chunk})}\n\n"
return StreamingResponse(
event_generator(),
media_type="text/event-stream"
)
Python < 3.11 Async
# Manual config propagation required
async def call_model(state: State, config: RunnableConfig):
response = await model.ainvoke(state["messages"], config)
return {"messages": [response]}
# Explicit writer injection
async def node_with_custom_stream(state: State, writer: StreamWriter):
writer({"status": "processing"})
result = await process_async(state)
return {"result": result}
Key Decisions
| Decision | Recommendation |
|---|---|
| Mode selection | Use ["updates", "custom"] for most UIs |
| Token streaming | Use messages mode with node filtering |
| Progress tracking | Use custom mode with get_stream_writer() |
| Subgraph visibility | Enable subgraphs=True for complex workflows |
Common Mistakes
- Forgetting
stream_modeparameter (defaults tovaluesonly) - Not handling async properly in Python < 3.11
- Missing
flush=Trueon print for real-time display - Not filtering messages by node/tags (noisy output)
Evaluations
See references/evaluations.md for test cases.
Related Skills
langgraph-subgraphs- Stream updates from nested graphslanggraph-human-in-loop- Stream status while awaiting humanlanggraph-supervisor- Stream agent progress in supervisor workflowslanggraph-parallel- Stream from parallel execution brancheslanggraph-tools- Stream tool execution progressapi-design-framework- SSE endpoint design patterns
Capability Details
stream-modes
Keywords: stream mode, values, updates, messages, custom, debug Solves:
- Configure streaming output format
- Choose appropriate mode for use case
- Combine multiple stream modes
custom-events
Keywords: custom event, progress, status, stream writer, get_stream_writer Solves:
- Emit custom progress events
- Track workflow status
- Implement progress bars
token-streaming
Keywords: token, LLM stream, chat, typing indicator, messages mode Solves:
- Stream LLM tokens in real-time
- Build chat interfaces
- Show typing indicators
subgraph-streaming
Keywords: subgraph, nested, hierarchy, namespace Solves:
- Stream from nested graphs
- Track subgraph progress
- Debug complex workflows
Weekly Installs
6
Repository
yonatangross/orchestkitGitHub Stars
94
First Seen
Feb 6, 2026
Security Audits
Installed on
claude-code5
opencode4
github-copilot4
gemini-cli4
codex3
antigravity3