skills/apache/beam/python-development

python-development

SKILL.md

Python Development in Apache Beam

Project Structure

Key Directories

  • sdks/python/ - Python SDK root
    • apache_beam/ - Main Beam package
      • transforms/ - Core transforms (ParDo, GroupByKey, etc.)
      • io/ - I/O connectors
      • ml/ - Beam ML code (RunInference, etc.)
      • runners/ - Runner implementations and wrappers
      • runners/worker/ - SDK worker harness
    • container/ - Docker container configuration
    • test-suites/ - Test configurations
    • scripts/ - Utility scripts

Configuration Files

  • setup.py - Package configuration
  • pyproject.toml - Build configuration
  • tox.ini - Test automation
  • pytest.ini - Pytest configuration
  • .pylintrc - Linting rules
  • .isort.cfg - Import sorting
  • mypy.ini - Type checking

Environment Setup

Using pyenv (Recommended)

# Install Python
pyenv install 3.X  # Use supported version from gradle.properties

# Create virtual environment
pyenv virtualenv 3.X beam-dev
pyenv activate beam-dev

Install in Editable Mode

cd sdks/python
pip install -e .[gcp,test]

Enable Pre-commit Hooks

pip install pre-commit
pre-commit install

# To disable
pre-commit uninstall

Running Tests

Unit Tests (filename: *_test.py)

# Run all tests in a file
pytest -v apache_beam/io/textio_test.py

# Run tests in a class
pytest -v apache_beam/io/textio_test.py::TextSourceTest

# Run a specific test
pytest -v apache_beam/io/textio_test.py::TextSourceTest::test_progress

Integration Tests (filename: *_it_test.py)

On Direct Runner

python -m pytest -o log_cli=True -o log_level=Info \
  apache_beam/ml/inference/pytorch_inference_it_test.py::PyTorchInference \
  --test-pipeline-options='--runner=TestDirectRunner'

On Dataflow Runner

# First build SDK tarball
pip install build && python -m build --sdist

# Run integration test
python -m pytest -o log_cli=True -o log_level=Info \
  apache_beam/ml/inference/pytorch_inference_it_test.py::PyTorchInference \
  --test-pipeline-options='--runner=TestDataflowRunner --project=<project>
                           --temp_location=gs://<bucket>/tmp
                           --sdk_location=dist/apache-beam-2.XX.0.dev0.tar.gz
                           --region=us-central1'

Building Python SDK

Build Source Distribution

cd sdks/python
pip install build && python -m build --sdist
# Output: sdks/python/dist/apache-beam-X.XX.0.dev0.tar.gz

Build Wheel (faster installation)

./gradlew :sdks:python:bdistPy311linux  # For Python 3.11 on Linux

Build and Push SDK Container Image

./gradlew :sdks:python:container:py311:docker \
  -Pdocker-repository-root=gcr.io/your-project/your-name \
  -Pdocker-tag=custom \
  -Ppush-containers

# Container image will be pushed to: gcr.io/your-project/your-name/beam_python3.11_sdk:custom

To use this container image, supply it via --sdk_container_image.

Running Pipelines with Modified Code

# Install modified SDK
pip install /path/to/apache-beam.tar.gz[gcp]

# Run pipeline
python my_pipeline.py \
  --runner=DataflowRunner \
  --sdk_location=/path/to/apache-beam.tar.gz \
  --project=my_project \
  --region=us-central1 \
  --temp_location=gs://my-bucket/temp

Common Issues

NameError when running DoFn

Global imports, functions, and variables in the main pipeline module are not serialized by default. Use:

--save_main_session

Specifying Additional Dependencies

Use --requirements_file=requirements.txt or custom containers.

Test Markers

  • @pytest.mark.it_postcommit - Include in PostCommit test suite

Gradle Commands for Python

# Run WordCount
./gradlew :sdks:python:wordCount

# Check environment
./gradlew :checkSetup

Code Quality Tools

# Linting
pylint apache_beam/

# Type checking
mypy apache_beam/

# Formatting (via yapf)
yapf -i apache_beam/file.py

# Import sorting
isort apache_beam/file.py
Weekly Installs
20
Repository
apache/beam
GitHub Stars
8.5K
First Seen
14 days ago
Installed on
opencode20
gemini-cli20
github-copilot20
codex20
kimi-cli20
amp20