beam-dataflow-python
Apache Beam & Dataflow (Python) Best Practices
Implement Way's architectural patterns and modern (2025+) best practices when building Dataflow Python pipelines.
1. Unified Pipeline Architecture
- Mode-driven routing:
--mode streamingvs--mode batchflag conditionally injects I/O connectors (Pub/Sub vs. BigQuery/GCS) and windowing; transform logic is identical across modes - Layered files:
pipeline.py(PTransform wiring) →transforms.py(DoFn impls) →state_machine.py/ domain logic (pure Python, zero Beam imports) - Event-time first: always develop around event time so backfills produce consistent state
Reference: Way's Pipeline Patterns, Community Best Practices
2. Runner v2 + Streaming Engine
Runner v2 is mandatory for Python SDK 2.45.0+; Streaming Engine is required for Runner v2 streaming jobs.
- Always set these flags for streaming:
--experiments=use_runner_v2 --enable_streaming_engine - Streaming Engine offloads state/timer storage to Google-managed backend → reduces worker memory pressure and enables finer-grained autoscaling
- Runner v2 also unlocks vertical autoscaling, C4A (ARM) workers, and cross-language transforms
Reference: runner-v2.md, streaming-engine.md
3. Deployment: Docker + Flex Templates
setup.py is deprecated as of 2025. Docker is the only supported production
deployment pattern.
- Flex Template Dockerfile is two-stage — the launcher base and SDK are separate
images; copy the SDK into the launcher base:
FROM apache/beam_python3.12_sdk:VERSION AS beam-sdk FROM gcr.io/dataflow-templates-base/python312-template-launcher-base AS final COPY /opt/apache/beam /opt/apache/beam RUN uv pip install --system -r requirements.txt ENV FLEX_TEMPLATE_PYTHON_PY_FILE="/app/main.py" - Use
uv pip installin Dockerfiles for faster dependency resolution - Tag with git commit SHA — never
:latest; enables reproducible rollbacks - Flex Template =
metadata.jsonin GCS pointing to the container image + runtime parameter definitions; launch viagcloud dataflow flex-template run - Pre-baked deps → faster cold-start autoscaling (no pip install on worker boot)
- The
ENTRYPOINTis set by the launcher base image — do not override it
Reference: build-container-image.md, run-custom-container.md, using-custom-containers.md, 08-docker-custom-containers-flex-templates.md, GCP Flex Template examples
4. Data Serialization (Protobuf-first)
Protobuf is Way's canonical schema across all environments (Pub/Sub, Beam shuffles, BigQuery, cross-language). Maximize leverage from protos in every pipeline stage.
- Register coders explicitly — prevents Pickle fallback, which is slow and fragile:
coders.registry.register_coder(MyMessage, coders.ProtoCoder) - Use upb C-backend (protobuf v3.24+):
pip install protobuf>=3.24.0; verifygoogle.protobuf.runtime_versionis"upb"— 3-5x faster than pure Python - Pub/Sub JSON ↔ proto: use integer enums for compact wire format:
- Decode:
json_format.Parse(json_bytes, MyMessage()) - Encode:
json_format.MessageToJson(msg, use_integers_for_enums=True)
- Decode:
- BigQuery mapping: use string enums for queryability:
- Sink:
MessageToDict(msg, preserving_proto_field_name=True, including_default_value_fields=True, use_integers_for_enums=False) - Source:
ParseDict(row, MyMessage(), ignore_unknown_fields=True)
- Sink:
- TimestampedValue from proto timestamp: inject event time from a proto
google.protobuf.Timestampfield usingbeam.window.TimestampedValue+Timestamp.from_rfc3339(ts.ToJsonString()) - 2 GB per-element hard limit: never pass large binary blobs as Beam elements;
pass GCS URIs and load inside
DoFn.process() - Avro for batch temp files: set
temp_file_format='AVRO'on BigQuery writes to save ~20% CPU vs JSON during shuffle - Protobuf Editions (2023/2024 syntax): requires
protobuf>=5.27.0on workers; pin this in your Dockerfile - Cross-language: keep
.protofiles accessible to both Python and Java runtimes when using cross-language transforms
Reference: 13-protobuf-best-practices.md, 03-bigquery-io-optimization.md
5. BigQuery & I/O
- Write method depends on the pipeline mode:
- Streaming / low-latency appends: use
method='STORAGE_WRITE_API'withnum_storage_api_streams=0(auto-shard); never use legacy streaming inserts - Batch / full partition replace: use standard
WriteToBigQuerywithwrite_disposition=WRITE_TRUNCATEand a date-partition suffixtable$YYYYMMDD; simpler, cheaper, and idempotent for full-partition overwrites
- Streaming / low-latency appends: use
- Managed I/O (SDK 2.61.0+): use
beam.managed.Read/beam.managed.Writefor BigQuery, Kafka, and Iceberg — auto-upgrades connector versions without pipeline code changes:pcoll | beam.managed.Write(beam.managed.BIGQUERY, config={...})
Reference: managed-io.md, managed-io-bigquery.md, managed-io-kafka.md, managed-io-iceberg.md, 03-bigquery-io-optimization.md
6. Testing & Logic Decoupling
- Extract domain logic: remove business logic from
DoFns into pure Python classes with zeroapache_beamimports - Three-tier testing:
- Pure Python (80–90%):
pyteston domain logic — instant, no runner overhead - Transform logic:
TestPipeline+assert_thatfor DoFn routing, State/Timer APIs, and side-output correctness - Integration: local end-to-end with mock I/O using Prism Runner (current standard for high-fidelity stateful execution)
- Pure Python (80–90%):
Reference: 01-testing-and-ci-cd.md, Community Testing Patterns
7. Advanced Windowing, Triggers & PaneInfo
- Abstract window config: extract into configuration objects (e.g.,
StreamingSessionWindowConfig) to keep pipeline code readable - Triggers + lateness: pair
AfterWatermarkwith explicitallowed_lateness; throttle EARLY panes withRepeatedly(AfterProcessingTime(delay=...))to avoid pane explosion - PaneInfo injection:
pane_info=beam.DoFn.PaneInfoParaminprocess()signatureEARLY: speculative aggregate — throttle output rateON_TIME: watermark has passed window endLATE: correction after close — sinks must be idempotent using window bounds +pane_info.indexas primary key
Reference: 05-windowing-and-triggers.md
8. Stateful Processing & Thread Safety
- State + Timer APIs: use
ReadModifyWriteState,BagState, andTimerSpecfor complex per-key session logic that session windows cannot express - Thread safety: streaming workers run ~12 threads per process; objects
initialized in
__init__are shared — initialize non-thread-safe objects (clients, parsers, connections) insetup(), not__init__ - Singleton pattern for expensive clients: use
setup()/teardown()lifecycle hooks to manage connection pools and ML model loading
Reference: 09-state-and-timers.md, thread-scaling.md
9. Resilience & Production Gotchas
- DLQ: wrap transforms with
.with_exception_handling()to route failed records to a dead-letter sink; never let poison pills crash the pipeline - Fusion trap: Dataflow fuses adjacent steps to reduce serialization overhead,
but fusing a CPU-heavy step with a fast step causes 3–5x throughput loss; break
fusion with
beam.Reshuffle()or a no-opGroupByKeybetween the steps - Hot key sharding: distribute work across keys by appending a random shard
suffix before
GroupByKey, then strip it after aggregation - Exactly-once misconception:
DoFn.process()may execute multiple times for the same element (retries, speculative execution); only sinks get exactly-once delivery guarantees — all API calls and external writes must be idempotent - ML inference: use
RunInferencetransform — never load models insideprocess(); models must be loaded insetup()and shared safely
Reference: 02-dead-letter-queues.md, 11-frontline-lessons-learned.md, machine-learning.md
10. Cost Optimization
- Streaming Engine: reduces per-vCPU cost by offloading state to managed backend
- FlexRS (batch): mix preemptible VMs with on-demand; typically 40% cost reduction for non-latency-sensitive batch
- C4A (ARM) workers:
--worker_machine_type=c4a-standard-8; 20–30% better price/performance for CPU-bound transforms - Vertical autoscaling:
--enable_vertical_memory_scaling; prevents OOM without over-provisioning RAM across the fleet - Shuffle Service (batch):
--experiments=shuffle_mode=service; offloads GroupByKey shuffle to managed backend worker_utilization_hint:--experiments=worker_utilization_hint=0.8; sets the target CPU utilization for autoscaling decisions (0.0–1.0); tune down for latency-sensitive streaming, up for throughput-bound batch
Reference: flexrs.md, use-arm-vms.md, vertical-autoscaling.md, shuffle-for-batch.md, right-fitting.md, optimize-costs.md
Agent Reference Index
Do not guess syntax or patterns. Load exact procedures from these references.
Architecture & Core Strategy
- Way's Internal Patterns: baseline architectural expectations (unified pipelines, pure-Python state machines, mode routing)
- 2024+ Community Best Practices: industry consensus (logic-first decoupling, DLQs, Managed I/O)
Community Guides (2025+)
- Testing & CI: 01-testing-and-ci-cd.md
—
TestStream, PrismRunner, CI/CD setup - Error Handling: 02-dead-letter-queues.md
—
with_exception_handling, side-output DLQ patterns - BigQuery I/O: 03-bigquery-io-optimization.md — Storage Write API syntax, schema mapping
- Autoscaling: 04-autoscaling-resource-management.md — C4A workers, vertical autoscaling, FlexRS config
- Windowing: 05-windowing-and-triggers.md
— event-time windows, triggers,
allowed_lateness - Cross-language: 06-cross-language-transforms.md — Java connectors (KafkaIO) from Python
- DataFrames: 07-dataframe-api.md — scalar/tabular operations with DataFrame API
- Docker/Flex Templates: 08-docker-custom-containers-flex-templates.md — Dockerfile patterns, metadata.json, launch commands
- Stateful Logic: 09-state-and-timers.md — State + Timer APIs, session management
- Beam YAML: 10-beam-yaml-declarative.md — no-code ingestion routing
- Scale Debugging: 11-frontline-lessons-learned.md — stuck pipelines, fusion breaking, hot keys
- Pythonic Patterns: 12-modern-pythonic-patterns.md — Pydantic validation, structural pattern matching
- Protobuf Deep Dive: 13-protobuf-best-practices.md — schema evolution, upb backend, coder registration, Editions
- Strategic Direction: 14-trends-and-strategic-direction.md — Beam/Dataflow roadmap for 2026
Dataflow Deep-Dives
- Runner v2: runner-v2.md
- Streaming Engine: streaming-engine.md
- Vertical Autoscaling: vertical-autoscaling.md
- Horizontal Autoscaling: horizontal-autoscaling.md
- Build Container Image: build-container-image.md
- Run Custom Container: run-custom-container.md
- Using Custom Containers: using-custom-containers.md
- Managed I/O: managed-io.md
- Managed I/O — BigQuery: managed-io-bigquery.md
- Managed I/O — Kafka: managed-io-kafka.md
- Managed I/O — Iceberg: managed-io-iceberg.md
- Shuffle for Batch: shuffle-for-batch.md
- Thread Scaling: thread-scaling.md
- FlexRS: flexrs.md
- ARM VMs: use-arm-vms.md
- Right-Fitting: right-fitting.md
- ML / RunInference: machine-learning.md
Dataflow Operations & Troubleshooting
- Monitoring: monitoring-overview.md
- Cost Optimization: optimize-costs.md
- Logging: logging.md
- Common Errors: common-errors.md
- Slow Jobs: troubleshoot-slow-jobs.md
- Bottlenecks: troubleshoot-bottlenecks.md
- OOM: troubleshoot-oom.md
- Streaming Stragglers: troubleshoot-streaming-stragglers.md
- Custom Container Issues: troubleshoot-custom-container.md
- Autoscaling Issues: troubleshoot-autoscaling.md
Core SDK Fallback References
- Apache Beam SDK: references/beam/ — programming guides, transform catalogs, runner specifics
- Google Cloud Dataflow: references/dataflow/ — cloud ops, IAM, billing, troubleshooting
More from way-platform/skills
way-magefile
Build tool for Go projects. Use when the user wants to create, edit, or understand Way-specific Magefiles, build targets, or automate Go project tasks.
18way-go-style
Guide for writing idiomatic, effective, and standard Go code. Use this skill when writing, refactoring, or reviewing Go code to ensure adherence to established conventions and best practices.
18ileap
>-
17agents-md
This skill should be used when the user asks to "create AGENTS.md", "update AGENTS.md", "maintain agent docs", "set up CLAUDE.md", or needs to keep agent instructions concise. Guides discovery of local skills and enforces minimal documentation style.
11way-brand-identity
Write copy and use colors according to the Way brand.
11aep
AEP (API Enhancement Proposals) design standards. Use when designing, reviewing, or implementing APIs to ensure compliance with AEP conventions.
5