skills/jackjin1997/clawforge/LangGraph Execution Control

LangGraph Execution Control

SKILL.md
  1. Workflows vs Agents: Predetermined paths vs dynamic decision-making
  2. Send API: Fan-out to parallel workers (map-reduce)
  3. Interrupts: Pause for human input, resume with Command
  4. Streaming: Real-time state, tokens, and custom data
Characteristic Workflow Agent
Control Flow Fixed, predetermined Dynamic, model-driven
Predictability High Low
Use Case Sequential tasks Open-ended problems

Workflows and Agents

class AgentState(TypedDict): messages: Annotated[list, operator.add]

model_with_tools = model.bind_tools([search])

def agent_node(state: AgentState) -> dict: return {"messages": [model_with_tools.invoke(state["messages"])]}

def tool_node(state: AgentState) -> dict: """Execute tool calls from the last AI message.""" result = [] for tc in state["messages"][-1].tool_calls: observation = tools_by_name[tc["name"]].invoke(tc["args"]) result.append(ToolMessage(content=observation, tool_call_id=tc["id"])) return {"messages": result}

def should_continue(state: AgentState): return "tools" if state["messages"][-1].tool_calls else END

agent = ( StateGraph(AgentState) .add_node("agent", agent_node) .add_node("tools", tool_node) .add_edge(START, "agent") .add_conditional_edges("agent", should_continue) .add_edge("tools", "agent") # Loop back after tool execution .compile() )

</python>
<typescript>
ReAct agent pattern: model decides when to call tools, loop until done.
```typescript
import { StateGraph, StateSchema, MessagesValue, START, END } from "@langchain/langgraph";
import { ToolMessage } from "@langchain/core/messages";

const State = new StateSchema({ messages: MessagesValue });
const modelWithTools = model.bindTools([searchTool]);

const agentNode = async (state: typeof State.State) => ({
  messages: [await modelWithTools.invoke(state.messages)]
});

const toolNode = async (state: typeof State.State) => ({
  messages: await Promise.all(
    (state.messages.at(-1)?.tool_calls ?? []).map(async (tc) =>
      new ToolMessage({ content: await searchTool.invoke(tc.args), tool_call_id: tc.id })
    )
  )
});

const shouldContinue = (state: typeof State.State) =>
  state.messages.at(-1)?.tool_calls?.length ? "tools" : END;

const agent = new StateGraph(State)
  .addNode("agent", agentNode)
  .addNode("tools", toolNode)
  .addEdge(START, "agent")
  .addConditionalEdges("agent", shouldContinue)
  .addEdge("tools", "agent")
  .compile();

class OrchestratorState(TypedDict): tasks: list[str] results: Annotated[list, operator.add] summary: str

def orchestrator(state: OrchestratorState): """Fan out tasks to workers.""" return [Send("worker", {"task": task}) for task in state["tasks"]]

def worker(state: dict) -> dict: return {"results": [f"Completed: {state['task']}"]}

def synthesize(state: OrchestratorState) -> dict: return {"summary": f"Processed {len(state['results'])} tasks"}

graph = ( StateGraph(OrchestratorState) .add_node("worker", worker) .add_node("synthesize", synthesize) .add_conditional_edges(START, orchestrator, ["worker"]) .add_edge("worker", "synthesize") .add_edge("synthesize", END) .compile() )

result = graph.invoke({"tasks": ["Task A", "Task B", "Task C"]})

</python>
<typescript>
Fan out tasks to parallel workers using the Send API and aggregate results.
```typescript
import { Send, StateGraph, StateSchema, ReducedValue, START, END } from "@langchain/langgraph";
import { z } from "zod";

const State = new StateSchema({
  tasks: z.array(z.string()),
  results: new ReducedValue(
    z.array(z.string()).default(() => []),
    { reducer: (curr, upd) => curr.concat(upd) }
  ),
  summary: z.string().default(""),
});

const orchestrator = (state: typeof State.State) => {
  return state.tasks.map((task) => new Send("worker", { task }));
};

const worker = async (state: { task: string }) => {
  return { results: [`Completed: ${state.task}`] };
};

const synthesize = async (state: typeof State.State) => {
  return { summary: `Processed ${state.results.length} tasks` };
};

const graph = new StateGraph(State)
  .addNode("worker", worker)
  .addNode("synthesize", synthesize)
  .addConditionalEdges(START, orchestrator, ["worker"])
  .addEdge("worker", "synthesize")
  .addEdge("synthesize", END)
  .compile();

Interrupts (Human-in-the-Loop)

Type When Set Use Case
interrupt() (recommended) Inside node code Human-in-the-loop, conditional pausing. Resume with Command(resume=value)
interrupt_before At compile time Debugging only, not for HITL. Resume with invoke(None, config)
interrupt_after At compile time Debugging only, not for HITL. Resume with invoke(None, config)

def review_node(state): if state["needs_review"]: user_response = interrupt({ "action": "review", "data": state["draft"], "question": "Approve this draft?" }) if user_response == "reject": return {"status": "rejected"} return {"status": "approved"}

checkpointer = InMemorySaver()

graph = ( StateGraph(State) .add_node("review", review_node) .add_edge(START, "review") .add_edge("review", END) .compile(checkpointer=checkpointer) )

config = {"configurable": {"thread_id": "1"}} result = graph.invoke({"needs_review": True, "draft": "content"}, config)

Check for interrupt

if "interrupt" in result: print(result["interrupt"])

Resume with user decision

result = graph.invoke(Command(resume="approve"), config)

</python>
<typescript>
Pause execution for human review using dynamic interrupt and resume with Command.
```typescript
import { interrupt, Command, MemorySaver, StateGraph, START, END } from "@langchain/langgraph";

const reviewNode = async (state: typeof State.State) => {
  if (state.needsReview) {
    const userResponse = interrupt({
      action: "review",
      data: state.draft,
      question: "Approve this draft?"
    });
    if (userResponse === "reject") {
      return { status: "rejected" };
    }
  }
  return { status: "approved" };
};

const checkpointer = new MemorySaver();

const graph = new StateGraph(State)
  .addNode("review", reviewNode)
  .addEdge(START, "review")
  .addEdge("review", END)
  .compile({ checkpointer });

const config = { configurable: { thread_id: "1" } };
let result = await graph.invoke({ needsReview: true, draft: "content" }, config);

// Check for interrupt
if (result.__interrupt__) {
  console.log(result.__interrupt__);
}

// Resume with user decision
result = await graph.invoke(new Command({ resume: "approve" }), config);

config = {"configurable": {"thread_id": "1"}} graph.invoke({"data": "test"}, config) # Runs until step2 graph.invoke(None, config) # Resume

</python>
<typescript>
Set compile-time breakpoints for debugging. Not recommended for human-in-the-loop — use `interrupt()` instead.
```typescript
const graph = new StateGraph(State)
  .addNode("step1", step1)
  .addNode("step2", step2)
  .addEdge(START, "step1")
  .addEdge("step1", "step2")
  .addEdge("step2", END)
  .compile({
    checkpointer,
    interruptBefore: ["step2"],  // Pause before step2
  });

const config = { configurable: { thread_id: "1" } };
await graph.invoke({ data: "test" }, config);  // Runs until step2
await graph.invoke(null, config);  // Resume

Streaming

Mode What it Streams Use Case
values Full state after each step Monitor complete state
updates State deltas Track incremental updates
messages LLM tokens + metadata Chat UIs
custom User-defined data Progress indicators

def my_node(state): writer = get_stream_writer() writer("Processing step 1...") # Do work writer("Complete!") return {"result": "done"}

for chunk in graph.stream({"data": "test"}, stream_mode="custom"): print(chunk)

</python>
<typescript>
Emit custom progress updates from within nodes using the stream writer.
```typescript
import { getWriter } from "@langchain/langgraph";

const myNode = async (state: typeof State.State) => {
  const writer = getWriter();
  writer("Processing step 1...");
  // Do work
  writer("Complete!");
  return { result: "done" };
};

for await (const chunk of graph.stream({ data: "test" }, { streamMode: "custom" })) {
  console.log(chunk);
}
  • Choose workflow vs agent pattern
  • Use Send API for parallel execution
  • Call interrupt() anywhere in nodes
  • Set compile-time breakpoints
  • Resume with Command(resume=...)
  • Choose stream modes

What You CANNOT Configure

  • Interrupt without checkpointer
  • Resume without thread_id
  • Change Send API message-passing model

CORRECT

class State(TypedDict): results: Annotated[list, operator.add] # Accumulates

</python>
<typescript>
Use ReducedValue to accumulate parallel worker results.
```typescript
// WRONG: No reducer
const State = new StateSchema({ results: z.array(z.string()) });

// CORRECT
const State = new StateSchema({
  results: new ReducedValue(z.array(z.string()).default(() => []), { reducer: (curr, upd) => curr.concat(upd) }),
});

CORRECT

graph = builder.compile(checkpointer=InMemorySaver())

</python>
<typescript>
Checkpointer required for interrupt functionality.
```typescript
// WRONG
const graph = builder.compile();

// CORRECT
const graph = builder.compile({ checkpointer: new MemorySaver() });

CORRECT

graph.invoke(Command(resume="approve"), config)

</python>
<typescript>
Use Command to resume from an interrupt (regular object restarts graph).
```typescript
// WRONG
await graph.invoke({ resumeData: "approve" }, config);

// CORRECT
await graph.invoke(new Command({ resume: "approve" }), config);

BETTER

def should_continue(state): if state["iterations"] > 10: return END if state["messages"][-1].tool_calls: return "tools" return END

</python>
<typescript>
Add max iterations check to prevent infinite loops.
```typescript
// RISKY: Might loop forever
const shouldContinue = (state) => state.messages.at(-1)?.tool_calls?.length ? "tools" : END;

// BETTER
const shouldContinue = (state) => {
  if (state.iterations > 10) return END;
  return state.messages.at(-1)?.tool_calls?.length ? "tools" : END;
};

CORRECT

def node(state): response = model.invoke(state["messages"]) return {"messages": [response]}

</python>
</fix-messages-mode-requires-llm>

<fix-custom-mode-needs-stream-writer>
<python>
Use get_stream_writer() to emit custom data.
```python
# WRONG: print() isn't streamed
def node(state):
    print("Processing...")
    return {"data": "done"}

# CORRECT
def node(state):
    writer = get_stream_writer()
    writer("Processing...")  # Streamed!
    return {"data": "done"}

CORRECT

graph.stream({}, stream_mode=["updates", "messages"])

</python>
</fix-stream-modes-are-lists>
Weekly Installs
0
GitHub Stars
4
First Seen
Jan 1, 1970