skills/mineru98/skills-store/langgraph-fundamentals-py

langgraph-fundamentals-py

Installation
SKILL.md
  • StateGraph: Main class for building stateful graphs
  • Nodes: Functions that perform work and update state
  • Edges: Define execution order (static or conditional)
  • START/END: Special nodes marking entry and exit points
  • State with Reducers: Control how state updates are merged

Graphs must be compile()d before execution.

Designing a LangGraph application

Follow these 5 steps when building a new graph:

  1. Map out discrete steps — sketch a flowchart of your workflow. Each step becomes a node.
  2. Identify what each step does — categorize nodes: LLM step, data step, action step, or user input step. For each, determine static context (prompt), dynamic context (from state), retry strategy, and desired outcome.
  3. Design your state — state is shared memory for all nodes. Store raw data, format prompts on-demand inside nodes.
  4. Build your nodes — implement each step as a function that takes state and returns partial updates.
  5. Wire it together — connect nodes with edges, add conditional routing, compile with a checkpointer if needed.
Use LangGraph When Use Alternatives When
Need fine-grained control over agent orchestration Quick prototyping → LangChain agents
Building complex workflows with branching/loops Simple stateless workflows → LangChain direct
Require human-in-the-loop, persistence Batteries-included features → Deep Agents

State Management

Need Solution Example
Overwrite value No reducer (default) Simple fields like counters
Append to list Reducer (operator.add / concat) Message history, logs
Custom logic Custom reducer function Complex merging

class State(TypedDict): name: str # Default: overwrites on update messages: Annotated[list, operator.add] # Appends to list total: Annotated[int, operator.add] # Sums integers

</ex-state-with-reducer>

<fix-forgot-reducer-for-list>
Without a reducer, returning a list overwrites previous values.
```python
# WRONG: List will be OVERWRITTEN
class State(TypedDict):
    messages: list  # No reducer!

# Node 1 returns: {"messages": ["A"]}
# Node 2 returns: {"messages": ["B"]}
# Final: {"messages": ["B"]}  # "A" is LOST!

# CORRECT: Use Annotated with operator.add
from typing import Annotated
import operator

class State(TypedDict):
    messages: Annotated[list, operator.add]
# Final: {"messages": ["A", "B"]}

CORRECT: Return dict with only the updates

def my_node(state: State) -> dict: return {"field": "updated"}

</fix-state-must-return-dict>

---

## Nodes

<node-function-signatures>

Node functions accept these arguments:

| Signature | When to Use |
|-----------|-------------|
| `def node(state: State)` | Simple nodes that only need state |
| `def node(state: State, config: RunnableConfig)` | Need thread_id, tags, or configurable values |
| `def node(state: State, runtime: Runtime[Context])` | Need runtime context, store, or stream_writer |

```python
from langchain_core.runnables import RunnableConfig
from langgraph.runtime import Runtime

def plain_node(state: State):
    return {"results": "done"}

def node_with_config(state: State, config: RunnableConfig):
    thread_id = config["configurable"]["thread_id"]
    return {"results": f"Thread: {thread_id}"}

def node_with_runtime(state: State, runtime: Runtime[Context]):
    user_id = runtime.context.user_id
    return {"results": f"User: {user_id}"}

Edges

Need Edge Type When to Use
Always go to same node add_edge() Fixed, deterministic flow
Route based on state add_conditional_edges() Dynamic branching
Update state AND route Command Combine logic in single node
Fan-out to multiple nodes Send Parallel processing with dynamic inputs

class State(TypedDict): input: str output: str

def process_input(state: State) -> dict: return {"output": f"Processed: {state['input']}"}

def finalize(state: State) -> dict: return {"output": state["output"].upper()}

graph = ( StateGraph(State) .add_node("process", process_input) .add_node("finalize", finalize) .add_edge(START, "process") .add_edge("process", "finalize") .add_edge("finalize", END) .compile() )

result = graph.invoke({"input": "hello"}) print(result["output"]) # "PROCESSED: HELLO"

</ex-basic-graph>

<ex-conditional-edges>
Route to different nodes based on state with conditional edges.
```python
from typing import Literal
from langgraph.graph import StateGraph, START, END

class State(TypedDict):
    query: str
    route: str
    result: str

def classify(state: State) -> dict:
    if "weather" in state["query"].lower():
        return {"route": "weather"}
    return {"route": "general"}

def route_query(state: State) -> Literal["weather", "general"]:
    return state["route"]

graph = (
    StateGraph(State)
    .add_node("classify", classify)
    .add_node("weather", lambda s: {"result": "Sunny, 72F"})
    .add_node("general", lambda s: {"result": "General response"})
    .add_edge(START, "classify")
    .add_conditional_edges("classify", route_query, ["weather", "general"])
    .add_edge("weather", END)
    .add_edge("general", END)
    .compile()
)

Command

Command combines state updates and routing in a single return value. Fields:

  • update: State updates to apply (like returning a dict from a node)
  • goto: Node name(s) to navigate to next
  • resume: Value to resume after interrupt() — see human-in-the-loop skill

class State(TypedDict): count: int result: str

def node_a(state: State) -> Command[Literal["node_b", "node_c"]]: """Update state AND decide next node in one return.""" new_count = state["count"] + 1 if new_count > 5: return Command(update={"count": new_count}, goto="node_c") return Command(update={"count": new_count}, goto="node_b")

graph = ( StateGraph(State) .add_node("node_a", node_a) .add_node("node_b", lambda s: {"result": "B"}) .add_node("node_c", lambda s: {"result": "C"}) .add_edge(START, "node_a") .add_edge("node_b", END) .add_edge("node_c", END) .compile() )

</ex-command-state-and-routing>

<command-return-type-annotations>

**Python**: Use `Command[Literal["node_a", "node_b"]]` as the return type annotation to declare valid goto destinations.

**TypeScript**: Pass `{ ends: ["node_a", "node_b"] }` as the third argument to `addNode` to declare valid goto destinations.

</command-return-type-annotations>

<warning-command-static-edges>

**Warning**: `Command` only adds **dynamic** edges — static edges defined with `add_edge` / `addEdge` still execute. If `node_a` returns `Command(goto="node_c")` and you also have `graph.add_edge("node_a", "node_b")`, **both** `node_b` and `node_c` will run.

</warning-command-static-edges>

---

## Send API

Fan-out with `Send`: return `[Send("worker", {...})]` from a conditional edge to spawn parallel workers. Requires a reducer on the results field.

<ex-orchestrator-worker>
Fan out tasks to parallel workers using the Send API and aggregate results.
```python
from langgraph.types import Send
from typing import Annotated
import operator

class OrchestratorState(TypedDict):
    tasks: list[str]
    results: Annotated[list, operator.add]
    summary: str

def orchestrator(state: OrchestratorState):
    """Fan out tasks to workers."""
    return [Send("worker", {"task": task}) for task in state["tasks"]]

def worker(state: dict) -> dict:
    return {"results": [f"Completed: {state['task']}"]}

def synthesize(state: OrchestratorState) -> dict:
    return {"summary": f"Processed {len(state['results'])} tasks"}

graph = (
    StateGraph(OrchestratorState)
    .add_node("worker", worker)
    .add_node("synthesize", synthesize)
    .add_conditional_edges(START, orchestrator, ["worker"])
    .add_edge("worker", "synthesize")
    .add_edge("synthesize", END)
    .compile()
)

result = graph.invoke({"tasks": ["Task A", "Task B", "Task C"]})

CORRECT

class State(TypedDict): results: Annotated[list, operator.add] # Accumulates

</fix-send-accumulator>

---

## Running Graphs: Invoke and Stream

<invoke-basics>

Call `graph.invoke(input, config)` to run a graph to completion and return the final state.

```python
result = graph.invoke({"input": "hello"})
# With config (for persistence, tags, etc.)
result = graph.invoke({"input": "hello"}, {"configurable": {"thread_id": "1"}})
Mode What it Streams Use Case
values Full state after each step Monitor complete state
updates State deltas Track incremental updates
messages LLM tokens + metadata Chat UIs
custom User-defined data Progress indicators

def my_node(state): writer = get_stream_writer() writer("Processing step 1...") # Do work writer("Complete!") return {"result": "done"}

for chunk in graph.stream({"data": "test"}, stream_mode="custom"): print(chunk)

</ex-stream-custom-data>

---

## Error Handling

Match the error type to the right handler:

<error-handling-table>

| Error Type | Who Fixes | Strategy | Example |
|---|---|---|---|
| Transient (network, rate limits) | System | `RetryPolicy(max_attempts=3)` | `add_node(..., retry_policy=...)` |
| LLM-recoverable (tool failures) | LLM | `ToolNode(tools, handle_tool_errors=True)` | Error returned as ToolMessage |
| User-fixable (missing info) | Human | `interrupt({"message": ...})` | Collect missing data (see HITL skill) |
| Unexpected | Developer | Let bubble up | `raise` |

</error-handling-table>

<ex-retry-policy>
Use RetryPolicy for transient errors (network issues, rate limits).
```python
from langgraph.types import RetryPolicy

workflow.add_node(
    "search_documentation",
    search_documentation,
    retry_policy=RetryPolicy(max_attempts=3, initial_interval=1.0)
)

tool_node = ToolNode(tools, handle_tool_errors=True)

workflow.add_node("tools", tool_node)

</ex-tool-node-error-handling>

---

## Common Fixes

<fix-compile-before-execution>
Must compile() to get executable graph.
```python
# WRONG
builder.invoke({"input": "test"})  # AttributeError!

# CORRECT
graph = builder.compile()
graph.invoke({"input": "test"})

CORRECT

def should_continue(state): return END if state["count"] > 10 else "node_b" builder.add_conditional_edges("node_a", should_continue)

</fix-infinite-loop-needs-exit>

<fix-common-mistakes>
Other common mistakes:
```python
# Router must return names of nodes that exist in the graph
builder.add_node("my_node", func)  # Add node BEFORE referencing in edges
builder.add_conditional_edges("node_a", router, ["my_node"])

# Command return type needs Literal for routing destinations (Python)
def node_a(state) -> Command[Literal["node_b", "node_c"]]:
    return Command(goto="node_b")

# START is entry-only - cannot route back to it
builder.add_edge("node_a", START)  # WRONG!
builder.add_edge("node_a", "entry")  # Use a named entry node instead

# Reducer expects matching types
return {"items": ["item"]}  # List for list reducer, not a string
// Always await graph.invoke() - it returns a Promise
const result = await graph.invoke({ input: "test" });

// TS Command nodes need { ends } to declare routing destinations
builder.addNode("router", routerFn, { ends: ["node_b", "node_c"] });
  • Mutate state directly — always return partial update dicts from nodes
  • Route back to START — it's entry-only; use a named node instead
  • Forget reducers on list fields — without one, last write wins
  • Mix static edges with Command goto without understanding both will execute
Weekly Installs
1
GitHub Stars
3
First Seen
9 days ago