spark-python-data-source
spark-python-data-source
Build custom Python data sources for Apache Spark 4.0+ to read from and write to external systems in batch and streaming modes.
Instructions
You are an experienced Spark developer building custom Python data sources using the PySpark DataSource API. Follow these principles and patterns.
Core Architecture
Each data source follows a flat, single-level inheritance structure:
- DataSource class — entry point that returns readers/writers
- Base Reader/Writer classes — shared logic for options and data processing
- Batch classes — inherit from base +
DataSourceReader/DataSourceWriter - Stream classes — inherit from base +
DataSourceStreamReader/DataSourceStreamWriter
See implementation-template.md for the full annotated skeleton covering all four modes (batch read/write, stream read/write).
Spark-Specific Design Constraints
These are specific to the PySpark DataSource API and its driver/executor architecture — general Python best practices (clean code, minimal dependencies, no premature abstraction) still apply but aren't repeated here.
Flat single-level inheritance only. PySpark serializes reader/writer instances to ship them to executors. Complex inheritance hierarchies and abstract base classes break serialization and make cross-process debugging painful. Use one shared base class mixed with the PySpark interface (e.g., class YourBatchWriter(YourWriter, DataSourceWriter)).
Import third-party libraries inside executor methods. The read() and write() methods run on remote executor processes that don't share the driver's Python environment. Top-level imports from the driver won't be available on executors — always import libraries like requests or database drivers inside the methods that run on workers.
Minimize dependencies. Every package you add must be installed on all executor nodes in the cluster, not just the driver. Prefer the standard library; when external packages are needed, keep them few and well-known.
No async/await unless the external system's SDK is async-only. The PySpark DataSource API is synchronous, so async adds complexity with no benefit.
Project Setup
Create a Python project using a packaging tool such as uv, poetry, or hatch. Examples use uv (substitute your tool of choice):
uv init your-datasource
cd your-datasource
uv add pyspark pytest pytest-spark
your-datasource/
├── pyproject.toml
├── src/
│ └── your_datasource/
│ ├── __init__.py
│ └── datasource.py
└── tests/
├── conftest.py
└── test_datasource.py
Run all commands through the packaging tool so they execute within the correct virtual environment:
uv run pytest # Run tests
uv run ruff check src/ # Lint
uv run ruff format src/ # Format
uv build # Build wheel
Key Implementation Decisions
Partitioning Strategy — choose based on data source characteristics:
- Time-based: for APIs with temporal data
- Token-range: for distributed databases
- ID-range: for paginated APIs
- See partitioning-patterns.md for implementations of each strategy
Authentication — support multiple methods in priority order:
- Databricks Unity Catalog credentials
- Cloud default credentials (managed identity)
- Explicit credentials (service principal, API key, username/password)
- See authentication-patterns.md for patterns with fallback chains
Type Conversion — map between Spark and external types:
- Handle nulls, timestamps, UUIDs, collections
- See type-conversion.md for bidirectional mapping tables and helpers
Streaming Offsets — design for exactly-once semantics:
- JSON-serializable offset class
- Non-overlapping partition boundaries
- See streaming-patterns.md for offset tracking and watermark patterns
Error Handling — implement retries and resilience:
- Exponential backoff for transient failures (network, rate limits)
- Circuit breakers for cascading failures
- See error-handling.md for retry decorators and failure classification
Testing
import pytest
from unittest.mock import patch, Mock
@pytest.fixture
def spark():
from pyspark.sql import SparkSession
return SparkSession.builder.master("local[2]").getOrCreate()
def test_data_source_name():
assert YourDataSource.name() == "your-format"
def test_writer_sends_data(spark):
with patch('requests.post') as mock_post:
mock_post.return_value = Mock(status_code=200)
df = spark.createDataFrame([(1, "test")], ["id", "value"])
df.write.format("your-format").option("url", "http://api").save()
assert mock_post.called
See testing-patterns.md for unit/integration test patterns, fixtures, and running tests.
Reference Implementations
Study these for real-world patterns:
- cyber-spark-data-connectors — Sentinel, Splunk, REST
- spark-cassandra-data-source — Token-range partitioning
- pyspark-hubspot — REST API pagination
- pyspark-mqtt — Streaming with TLS
Example Prompts
Create a Spark data source for reading from MongoDB with sharding support
Build a streaming connector for RabbitMQ with at-least-once delivery
Implement a batch writer for Snowflake with staged uploads
Write a data source for REST API with OAuth2 authentication and pagination
Related
- databricks-testing: Test data sources on Databricks clusters
- databricks-spark-declarative-pipelines: Use custom sources in DLT pipelines
- python-dev: Python development best practices
References
- implementation-template.md — Full annotated skeleton; read when starting a new data source
- partitioning-patterns.md — Read when the source supports parallel reads and you need to split work across executors
- authentication-patterns.md — Read when the external system requires credentials or tokens
- type-conversion.md — Read when mapping between Spark types and the external system's type system
- streaming-patterns.md — Read when implementing
DataSourceStreamReaderorDataSourceStreamWriter - error-handling.md — Read when adding retry logic or handling transient failures
- testing-patterns.md — Read when writing tests; covers unit, integration, and performance testing
- production-patterns.md — Read when hardening for production: observability, security, input validation
- Official Databricks Documentation
- Apache Spark Python DataSource Tutorial
- awesome-python-datasources — Directory of community implementations
More from databricks-solutions/ai-dev-kit
databricks-python-sdk
Databricks development guidance including Python SDK, Databricks Connect, CLI, and REST API. Use when working with databricks-sdk, databricks-connect, or Databricks APIs.
132python-dev
Python development guidance with code quality standards, error handling, testing practices, and environment management. Use when writing, reviewing, or modifying Python code (.py files) or Jupyter notebooks (.ipynb files).
68skill-test
Testing framework for evaluating Databricks skills. Use when building test cases for skills, running skill evaluations, comparing skill versions, or creating ground truth datasets with the Generate-Review-Promote (GRP) pipeline. Triggers include "test skill", "evaluate skill", "skill regression", "ground truth", "GRP pipeline", "skill quality", and "skill metrics".
53databricks-docs
Databricks documentation reference via llms.txt index. Use when other skills do not cover a topic, looking up unfamiliar Databricks features, or needing authoritative docs on APIs, configurations, or platform capabilities.
29databricks-config
Manage Databricks workspace connections: check current workspace, switch profiles, list available workspaces, or authenticate to a new workspace. Use when the user mentions \"switch workspace\", \"which workspace\", \"current profile\", \"databrickscfg\", \"connect to workspace\", or \"databricks auth\".
26databricks-app-python
Builds Python-based Databricks applications using Dash, Streamlit, Gradio, Flask, FastAPI, or Reflex. Handles OAuth authorization (app and user auth), app resources, SQL warehouse and Lakebase connectivity, model serving integration, foundation model APIs, LLM integration, and deployment. Use when building Python web apps, dashboards, ML demos, or REST APIs for Databricks, or when the user mentions Streamlit, Dash, Gradio, Flask, FastAPI, Reflex, or Databricks app.
22