runners
Apache Beam Runners
Overview
Runners execute Beam pipelines on distributed processing backends. Each runner translates the portable Beam model to its native execution engine.
Available Runners
| Runner | Location | Description |
|---|---|---|
| Direct | runners/direct-java/ |
Local execution for testing |
| Prism | runners/prism/ |
Portable local runner |
| Dataflow | runners/google-cloud-dataflow-java/ |
Google Cloud Dataflow |
| Flink | runners/flink/ |
Apache Flink |
| Spark | runners/spark/ |
Apache Spark |
| Samza | runners/samza/ |
Apache Samza |
| Jet | runners/jet/ |
Hazelcast Jet |
| Twister2 | runners/twister2/ |
Twister2 |
Direct Runner
For local development and testing.
Java
PipelineOptions options = PipelineOptionsFactory.create();
options.setRunner(DirectRunner.class);
Pipeline p = Pipeline.create(options);
Python
options = PipelineOptions()
options.view_as(StandardOptions).runner = 'DirectRunner'
p = beam.Pipeline(options=options)
Command Line
--runner=DirectRunner
Dataflow Runner
Prerequisites
- GCP project with Dataflow API enabled
- Service account with Dataflow Admin role
- GCS bucket for staging
Java Usage
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);
options.setRunner(DataflowRunner.class);
options.setProject("my-project");
options.setRegion("us-central1");
options.setTempLocation("gs://my-bucket/temp");
Python Usage
options = PipelineOptions([
'--runner=DataflowRunner',
'--project=my-project',
'--region=us-central1',
'--temp_location=gs://my-bucket/temp'
])
Runner v2
--experiments=use_runner_v2
Custom SDK Container
--sdkContainerImage=gcr.io/project/beam_java11_sdk:custom
Flink Runner
Embedded Mode
FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
options.setRunner(FlinkRunner.class);
options.setFlinkMaster("[local]");
Cluster Mode
options.setFlinkMaster("host:port");
Portable Mode (Python)
options = PipelineOptions([
'--runner=FlinkRunner',
'--flink_master=host:port',
'--environment_type=LOOPBACK' # or DOCKER, EXTERNAL
])
Spark Runner
Java
SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);
options.setRunner(SparkRunner.class);
options.setSparkMaster("local[*]"); # or spark://host:port
Python (Portable)
options = PipelineOptions([
'--runner=SparkRunner',
'--spark_master_url=local[*]'
])
Testing with Runners
ValidatesRunner Tests
Tests that validate runner correctness:
# Direct Runner
./gradlew :runners:direct-java:validatesRunner
# Flink Runner
./gradlew :runners:flink:1.18:validatesRunner
# Spark Runner
./gradlew :runners:spark:3:validatesRunner
# Dataflow Runner
./gradlew :runners:google-cloud-dataflow-java:validatesRunner
TestPipeline with Runners
@Rule public TestPipeline pipeline = TestPipeline.create();
// Set runner via system property
-DbeamTestPipelineOptions='["--runner=TestDataflowRunner"]'
Portable Runners
Concept
- SDK-independent execution via Fn API
- SDK runs in container, communicates via gRPC
Environment Types
DOCKER- SDK in Docker containerLOOPBACK- SDK in same process (testing)EXTERNAL- SDK at specified addressPROCESS- SDK in subprocess
Job Server
Start Flink job server:
./gradlew :runners:flink:1.18:job-server:runShadow
Start Spark job server:
./gradlew :runners:spark:3:job-server:runShadow
Runner-Specific Options
Dataflow
| Option | Description |
|---|---|
--project |
GCP project |
--region |
GCP region |
--tempLocation |
GCS temp location |
--stagingLocation |
GCS staging |
--numWorkers |
Initial workers |
--maxNumWorkers |
Max workers |
--workerMachineType |
VM type |
Flink
| Option | Description |
|---|---|
--flinkMaster |
Flink master address |
--parallelism |
Default parallelism |
--checkpointingInterval |
Checkpoint interval |
Spark
| Option | Description |
|---|---|
--sparkMaster |
Spark master URL |
--sparkConf |
Additional Spark config |
Building Runner Artifacts
Dataflow Worker Jar
./gradlew :runners:google-cloud-dataflow-java:worker:shadowJar
Flink Job Server
./gradlew :runners:flink:1.18:job-server:shadowJar
Spark Job Server
./gradlew :runners:spark:3:job-server:shadowJar
Debugging
Direct Runner
- Enable logging:
-Dorg.slf4j.simpleLogger.defaultLogLevel=debug - Use
--targetParallelism=1for deterministic execution
Dataflow
- Check Dataflow UI: console.cloud.google.com/dataflow
- Use
--experiments=upload_graphfor graph debugging - Worker logs in Cloud Logging
Portable Runners
- Enable debug logging on job server
- Check SDK harness logs in worker containers
More from apache/beam
gradle-build
Guides understanding and using the Gradle build system in Apache Beam. Use when building projects, understanding dependencies, or troubleshooting build issues.
48java-development
Guides Java SDK development in Apache Beam, including building, testing, running examples, and understanding the project structure. Use when working with Java code in sdks/java/, runners/, or examples/java/.
27python-development
Guides Python SDK development in Apache Beam, including environment setup, testing, building, and running pipelines. Use when working with Python code in sdks/python/.
25license-compliance
Ensures all new files include proper Apache 2.0 license headers. Use when creating any new file in the Apache Beam repository.
24ci-cd
Guides understanding and working with Apache Beam's CI/CD system using GitHub Actions. Use when debugging CI failures, understanding test workflows, or modifying CI configuration.
23contributing
Guides the contribution workflow for Apache Beam, including creating PRs, issue management, code review process, and release cycles. Use when contributing code, creating PRs, or understanding the contribution process.
23