mastra-workflows
Mastra Workflows — DAG Orchestration
Build type-safe DAG workflows with @mastra/core/workflows.
Quick Start
import { createStep, createWorkflow } from "@mastra/core/workflows";
import { z } from "zod";
const stepA = createStep({
id: "step-a",
inputSchema: z.object({ value: z.string() }),
outputSchema: z.object({ result: z.string() }),
execute: async ({ inputData }) => ({ result: inputData.value.toUpperCase() }),
});
const workflow = createWorkflow({ id: "my-workflow", inputSchema: z.object({ value: z.string() }) })
.then(stepA)
.commit();
const run = workflow.createRun();
const result = await run.start({ inputData: { value: "hello" } });
// result.results["step-a"].output → { result: "HELLO" }
Imports
Always import from @mastra/core/workflows, NOT from @mastra/core:
import { createStep, createWorkflow } from "@mastra/core/workflows";
Step Anatomy
const step = createStep({
id: "unique-step-id", // Required: unique identifier
inputSchema: z.object({...}), // Required: Zod schema for input
outputSchema: z.object({...}), // Required: Zod schema for output
execute: async (params) => { // Required: execution function
const { inputData, getStepResult, getInitData, suspend, mapiData } = params;
// inputData: typed input from previous step or workflow
// getStepResult: access results from earlier named steps
// getInitData: access original workflow input
// suspend: pause execution (see references/suspend-resume.md)
return { /* matches outputSchema */ };
},
});
Control Flow
Chain steps with fluent API methods. Each returns the workflow for chaining.
| Method | Purpose | Output Shape |
|---|---|---|
.then(step) |
Sequential execution | Direct output |
.parallel([a, b]) |
Run steps concurrently | { "step-a": {...}, "step-b": {...} } keyed by step ID |
.branch([[cond, step], ...]) |
Conditional routing | Output keyed by executed branch step ID (use .optional() in next inputSchema) |
.foreach(step, { concurrency }) |
Iterate over arrays | Array of step outputs |
.dountil(step, condition) |
Loop until condition met | Last iteration output (has iterationCount) |
.dowhile(step, condition) |
Loop while condition true | Last iteration output (has iterationCount) |
.map(fn) |
Transform data between steps | Custom shape |
.commit() |
Required — finalize workflow definition | — |
Critical: Always call .commit() after the last control flow method.
For detailed patterns, examples, and advanced composition: See references/control-flow.md
Error Handling
const step = createStep({
id: "retry-step",
retryConfig: { attempts: 3, delay: 1000 }, // Auto-retry
execute: async ({ inputData, bail }) => {
if (fatalError) bail("Unrecoverable"); // Skip retries, fail immediately
return { result: "ok" };
},
});
Lifecycle callbacks on workflow run:
const result = await run.start({
inputData: {...},
onFinish: (result) => { /* always called */ },
onError: (error) => { /* called on failure */ },
});
For retry strategies, bail patterns, conditional error branching: See references/error-handling.md
Suspend & Resume
Pause workflow execution for human input or external events:
const humanStep = createStep({
id: "approval",
inputSchema: z.object({ request: z.string() }),
outputSchema: z.object({ approved: z.boolean() }),
resumeSchema: z.object({ decision: z.enum(["approve", "reject"]) }),
execute: async ({ inputData, suspend, resumeData }) => {
if (!resumeData) {
await suspend({ request: inputData.request });
return undefined as never;
}
return { approved: resumeData.decision === "approve" };
},
});
// Resume later:
await run.resume({ step: humanStep, resumeData: { decision: "approve" } });
For suspend patterns, multi-step suspend, nested workflows: See references/suspend-resume.md
State & Streaming
Workflow state — shared mutable state across steps:
const workflow = createWorkflow({
id: "stateful",
inputSchema: z.object({...}),
stateSchema: z.object({ count: z.number() }),
});
// In step execute:
execute: async ({ state, setState }) => {
setState({ ...state, count: state.count + 1 });
return { result: state.count };
}
Streaming — real-time step events:
const run = workflow.createRun();
const stream = run.stream({ inputData: {...} });
for await (const chunk of stream) {
// chunk contains step progress, transitions, outputs
}
For state patterns, streaming events, nested state: See references/state-streaming.md
Nested Workflows
Use a workflow as a step inside another workflow:
const subWorkflow = createWorkflow({ id: "sub", inputSchema, outputSchema })
.then(stepX)
.then(stepY)
.commit();
const mainWorkflow = createWorkflow({ id: "main", inputSchema })
.then(subWorkflow)
.commit();
Agent Steps
Create steps from Mastra agents:
import { Agent } from "@mastra/core/agents";
const agent = new Agent({ name: "analyzer", instructions: "...", model });
const agentStep = createStep(agent);
// or with structured output:
const typedAgentStep = createStep(agent, { structuredOutput: { schema: z.object({...}) } });
Key Rules
- Import path:
@mastra/core/workflows— never@mastra/core - Always
.commit()after the last chaining method - Zod schemas required on every step (input + output)
- Parallel output is keyed by step ID — destructure accordingly
- Branch output uses optional fields — next step must handle missing keys
getStepResult<typeof step>("step-id")for accessing earlier results (type-safe)- Foreach returns an array — next step receives
{ results: [...] } - Loop steps receive
iterationCountin their context