runners

SKILL.md

Apache Beam Runners

Overview

Runners execute Beam pipelines on distributed processing backends. Each runner translates the portable Beam model to its native execution engine.

Available Runners

Runner Location Description
Direct runners/direct-java/ Local execution for testing
Prism runners/prism/ Portable local runner
Dataflow runners/google-cloud-dataflow-java/ Google Cloud Dataflow
Flink runners/flink/ Apache Flink
Spark runners/spark/ Apache Spark
Samza runners/samza/ Apache Samza
Jet runners/jet/ Hazelcast Jet
Twister2 runners/twister2/ Twister2

Direct Runner

For local development and testing.

Java

PipelineOptions options = PipelineOptionsFactory.create();
options.setRunner(DirectRunner.class);
Pipeline p = Pipeline.create(options);

Python

options = PipelineOptions()
options.view_as(StandardOptions).runner = 'DirectRunner'
p = beam.Pipeline(options=options)

Command Line

--runner=DirectRunner

Dataflow Runner

Prerequisites

  • GCP project with Dataflow API enabled
  • Service account with Dataflow Admin role
  • GCS bucket for staging

Java Usage

DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setRunner(DataflowRunner.class);
options.setProject("my-project");
options.setRegion("us-central1");
options.setTempLocation("gs://my-bucket/temp");

Python Usage

options = PipelineOptions([
    '--runner=DataflowRunner',
    '--project=my-project',
    '--region=us-central1',
    '--temp_location=gs://my-bucket/temp'
])

Runner v2

--experiments=use_runner_v2

Custom SDK Container

--sdkContainerImage=gcr.io/project/beam_java11_sdk:custom

Flink Runner

Embedded Mode

FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
options.setRunner(FlinkRunner.class);
options.setFlinkMaster("[local]");

Cluster Mode

options.setFlinkMaster("host:port");

Portable Mode (Python)

options = PipelineOptions([
    '--runner=FlinkRunner',
    '--flink_master=host:port',
    '--environment_type=LOOPBACK'  # or DOCKER, EXTERNAL
])

Spark Runner

Java

SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
options.setRunner(SparkRunner.class);
options.setSparkMaster("local[*]");  # or spark://host:port

Python (Portable)

options = PipelineOptions([
    '--runner=SparkRunner',
    '--spark_master_url=local[*]'
])

Testing with Runners

ValidatesRunner Tests

Tests that validate runner correctness:

# Direct Runner
./gradlew :runners:direct-java:validatesRunner

# Flink Runner
./gradlew :runners:flink:1.18:validatesRunner

# Spark Runner
./gradlew :runners:spark:3:validatesRunner

# Dataflow Runner
./gradlew :runners:google-cloud-dataflow-java:validatesRunner

TestPipeline with Runners

@Rule public TestPipeline pipeline = TestPipeline.create();

// Set runner via system property
-DbeamTestPipelineOptions='["--runner=TestDataflowRunner"]'

Portable Runners

Concept

  • SDK-independent execution via Fn API
  • SDK runs in container, communicates via gRPC

Environment Types

  • DOCKER - SDK in Docker container
  • LOOPBACK - SDK in same process (testing)
  • EXTERNAL - SDK at specified address
  • PROCESS - SDK in subprocess

Job Server

Start Flink job server:

./gradlew :runners:flink:1.18:job-server:runShadow

Start Spark job server:

./gradlew :runners:spark:3:job-server:runShadow

Runner-Specific Options

Dataflow

Option Description
--project GCP project
--region GCP region
--tempLocation GCS temp location
--stagingLocation GCS staging
--numWorkers Initial workers
--maxNumWorkers Max workers
--workerMachineType VM type

Flink

Option Description
--flinkMaster Flink master address
--parallelism Default parallelism
--checkpointingInterval Checkpoint interval

Spark

Option Description
--sparkMaster Spark master URL
--sparkConf Additional Spark config

Building Runner Artifacts

Dataflow Worker Jar

./gradlew :runners:google-cloud-dataflow-java:worker:shadowJar

Flink Job Server

./gradlew :runners:flink:1.18:job-server:shadowJar

Spark Job Server

./gradlew :runners:spark:3:job-server:shadowJar

Debugging

Direct Runner

  • Enable logging: -Dorg.slf4j.simpleLogger.defaultLogLevel=debug
  • Use --targetParallelism=1 for deterministic execution

Dataflow

  • Check Dataflow UI: console.cloud.google.com/dataflow
  • Use --experiments=upload_graph for graph debugging
  • Worker logs in Cloud Logging

Portable Runners

  • Enable debug logging on job server
  • Check SDK harness logs in worker containers
Weekly Installs
18
Repository
apache/beam
GitHub Stars
8.5K
First Seen
Mar 1, 2026
Installed on
gemini-cli18
opencode18
codebuddy18
github-copilot18
codex18
kimi-cli18