eino-compose

Installation
SKILL.md

Orchestration Overview

The github.com/cloudwego/eino/compose package provides three orchestration APIs:

API Topology Cycles Type Alignment
Graph Directed graph Yes (Pregel mode) / No (DAG mode) Whole input/output
Chain Linear sequence No Whole input/output
Workflow DAG No Field-level mapping

*Chain is implemented on top of Graph in Pregel mode but enforces linear topology.

All three compile into Runnable[I, O] which exposes Invoke, Stream, Collect, and Transform.

import "github.com/cloudwego/eino/compose"

Choosing an API

  • Chain -- sequential pipeline: prompt -> model -> parser. Simplest API.
  • Graph -- need branching, loops (ReAct agent), or fan-out/fan-in.
  • Workflow -- need field-level mapping between nodes with different struct types; DAG only.

Graph Quick Reference

g := compose.NewGraph[InputType, OutputType]()

// Add nodes
g.AddChatModelNode("model", chatModel)
g.AddChatTemplateNode("tmpl", tmpl)
g.AddToolsNode("tools", toolsNode)
g.AddLambdaNode("fn", compose.InvokableLambda(myFunc))
g.AddPassthroughNode("pass")
g.AddGraphNode("sub", subGraph)

// Connect nodes
g.AddEdge(compose.START, "tmpl")
g.AddEdge("tmpl", "model")
g.AddEdge("model", compose.END)

// Branch (conditional routing)
branch := compose.NewGraphBranch(conditionFn, map[string]bool{"a": true, "b": true})
g.AddBranch("model", branch)

// Compile and run
r, err := g.Compile(ctx)
out, err := r.Invoke(ctx, input)

Chain Quick Reference

chain := compose.NewChain[InputType, OutputType]()
chain.
    AppendChatTemplate(tmpl).
    AppendChatModel(model).
    AppendLambda(compose.InvokableLambda(parseFn))

r, err := chain.Compile(ctx)
out, err := r.Invoke(ctx, input)

Append methods: AppendChatModel, AppendChatTemplate, AppendToolsNode, AppendLambda, AppendGraph, AppendParallel, AppendBranch, AppendPassthrough, AppendRetriever, AppendEmbedding, AppendLoader, AppendIndexer, AppendDocumentTransformer.

Workflow Quick Reference

wf := compose.NewWorkflow[InputStruct, OutputStruct]()

wf.AddLambdaNode("node1", compose.InvokableLambda(fn1)).
    AddInput(compose.START, compose.MapFields("FieldA", "InputField"))

wf.AddLambdaNode("node2", compose.InvokableLambda(fn2)).
    AddInput("node1", compose.ToField("Result"))

wf.End().AddInput("node2")

r, err := wf.Compile(ctx)

Field mapping helpers: MapFields, ToField, FromField, MapFieldPaths, ToFieldPath, FromFieldPath.

Stream Programming

Four interaction modes on Runnable[I, O]:

Mode Input Output Lambda Constructor
Invoke I O compose.InvokableLambda
Stream I *StreamReader[O] compose.StreamableLambda
Collect *StreamReader[I] O compose.CollectableLambda
Transform *StreamReader[I] *StreamReader[O] compose.TransformableLambda

Framework auto-converts between modes:

  • Invoke call: all internal nodes run in Invoke mode.
  • Stream/Collect/Transform call: all internal nodes run in Transform mode; missing modes are auto-filled.

Stream primitives live in github.com/cloudwego/eino/schema:

sr, sw := schema.Pipe[T](capacity)
// sw.Send(chunk, nil); sw.Close()
// chunk, err := sr.Recv(); sr.Close()

Compile & Run

r, err := g.Compile(ctx,
    compose.WithGraphName("my_graph"),
    compose.WithNodeTriggerMode(compose.AllPredecessor), // DAG mode
)

// Non-streaming
out, err := r.Invoke(ctx, input)

// Streaming
stream, err := r.Stream(ctx, input)
defer stream.Close()
for {
    chunk, err := stream.Recv()
    if err == io.EOF { break }
    if err != nil { return err }
    process(chunk)
}

State Graph

Share state across nodes within a single request:

g := compose.NewGraph[string, string](compose.WithGenLocalState(func(ctx context.Context) *MyState {
    return &MyState{}
}))

g.AddLambdaNode("node", lambda,
    compose.WithStatePreHandler(func(ctx context.Context, in string, state *MyState) (string, error) {
        // read/write state before node executes
        return in, nil
    }),
    compose.WithStatePostHandler(func(ctx context.Context, out string, state *MyState) (string, error) {
        // read/write state after node executes
        return out, nil
    }),
)

Instructions to Agent

When helping users build orchestration:

  1. Default to Graph for most use cases. Use Chain only for simple linear pipelines. Use Workflow when field-level mapping between different struct types is needed.
  2. Always show the Compile step -- g.Compile(ctx) returns Runnable[I,O].
  3. Always close StreamReaders -- use defer sr.Close() immediately after obtaining a stream.
  4. Upstream output type must match downstream input type (or use WithInputKey/WithOutputKey for map conversion).
  5. For cyclic graphs (e.g., ReAct agent), use default Pregel mode (AnyPredecessor). For DAGs, set AllPredecessor.
  6. Use compose.WithCallbacks(handler) to inject logging/tracing at runtime.
  7. Use compose.WithCheckPointStore(store) with interrupt nodes for pause/resume workflows.

Reference Files

Read these files on-demand for detailed API, examples, and advanced usage:

Related skills
Installs
87
GitHub Stars
695
First Seen
Mar 17, 2026