skills/databricks-solutions/ai-dev-kit/spark-declarative-pipelines

spark-declarative-pipelines

SKILL.md

Lakeflow Spark Declarative Pipelines (SDP)

IMPORTANT: If this is a new pipeline (one does not already exist), see Quick Start. Be sure to use whatever language user has specified only (Python or SQL). Be sure to use Databricks Asset Bundles for new projects.


Critical Rules (always follow)

  • MUST confirm language as Python or SQL. Stick with that language unless told otherwise.
  • MUST if not modifying an existing pipeline, use Quick Start below.
  • MUST create serverless pipelines by default. ** Only use classic clusters if user explicitly requires R language, Spark RDD APIs, or JAR libraries.

Required Steps

Copy this checklist and verify each item:

- [ ] Language selected: Python or SQL
- [ ] Compute type decided: serverless or classic compute
- [ ] Decide on multiple catalogs or schemas vs. all in one default schema
- [ ] Consider what should be parameterized at the pipeline level to make deployment easy.
- [ ] Consider [Multi-Schema Patterns](#multi-schema-patterns) below, ask if unclear on best choices.
- [ ] Consider [Modern Defaults](#modern-defaults) below, ask if unclear on best choices.


## Quick Start: Initialize New Pipeline Project

**RECOMMENDED**: Use `databricks pipelines init` to create production-ready Asset Bundle projects with multi-environment support.

### When to Use Bundle Initialization

Use bundle initialization for **New pipeline projects** for a professional structure from the start

Use manual workflow for:
- Quick prototyping without multi-environment needs
- Existing manual projects you want to continue
- Learning/experimentation

### Step 1: Initialize Project

I will automatically run this command when you request a new pipeline:

```bash
databricks pipelines init

Interactive Prompts:

  • Project name: e.g., customer_orders_pipeline
  • Initial catalog: Unity Catalog name (e.g., main, prod_catalog)
  • Personal schema per user?: yes for dev (each user gets their own schema), no for prod
  • Language: SQL or Python (auto-detected from your request - see language detection below)

Generated Structure:

my_pipeline/
├── databricks.yml              # Multi-environment config (dev/prod)
├── resources/
│   └── *_etl.pipeline.yml      # Pipeline resource definition
└── src/
    └── *_etl/
        ├── explorations/       # Exploratory code in .ipynb
        └── transformations/    # Your .sql or .py files here

Step 2: Customize Transformations

Replace the example code created by the init process with custom transformation files in src/transformations/ based on provided requirements, using best practice guidance from this skill.

For Python pipelines using cloudFiles: Ask the user where to store Auto Loader schema metadata. Recommend:

/Volumes/{catalog}/{schema}/{pipeline_name}_metadata/schemas

Step 3: Deploy and Run

# Deploy to workspace (dev by default)
databricks bundle deploy

# Run pipeline
databricks bundle run my_pipeline_etl

# Deploy to production
databricks bundle deploy --target prod

Quick Reference

Concept Details
Names SDP = Spark Declarative Pipelines = LDP = Lakeflow Declarative Pipelines = Lakeflow Pipelines (all interchangeable)
Python Import from pyspark import pipelines as dp
Primary Decorators @dp.table(), @dp.materialized_view(), @dp.temporary_view()
Temporary Views @dp.temporary_view() creates in-pipeline temporary views (no catalog/schema, no cluster_by). Useful for intermediate logic before AUTO CDC or when a view needs multiple references without persistence.
Replaces Delta Live Tables (DLT) with import dlt
Based On Apache Spark 4.1+ (Databricks' modern data pipeline framework)
Docs https://docs.databricks.com/aws/en/ldp/developer/python-dev

Detailed guides

Ingestion patterns: Use 1-ingestion-patterns.md when planning how to get new data into your Lakeflow pipeline —- covers file formats, batch/streaming options, and tips for incremental and full loads. (Keywords: Auto Loader, Kafka, Event Hub, Kinesis, file formats)

Streaming pipeline patterns: See 2-streaming-patterns.md for designing pipelines with streaming data sources, change data detection, triggers, and windowing. (Keywords: deduplication, windowing, stateful operations, joins)

SCD query patterns: See 3-scd-query-patterns.md for querying Slowly Changing Dimensions Type 2 history tables, including current state queries, point-in-time analysis, temporal joins, and change tracking. (Keywords: SCD Type 2 history tables, temporal joins, querying historical data)

Performance tuning: Use 4-performance-tuning.md for optimizing pipelines with Liquid Clustering, state management, and best practices for high-performance streaming workloads. (Keywords: Liquid Clustering, optimization, state management)

Python API reference: See 5-python-api.md for the modern pyspark.pipelines (dp) API reference and migration from legacy dlt API patterns. (Keywords: dp API, dlt API comparison)

DLT migration: Use 6-dlt-migration.md when migrating existing Delta Live Tables (DLT) pipelines to Spark Declarative Pipelines (SDP). (Keywords: migrating DLT pipelines to SDP)

Advanced configuration: See 7-advanced-configuration.md for advanced pipeline settings including development mode, continuous execution, notifications, Python dependencies, and custom cluster configurations. (Keywords: extra_settings parameter reference, examples)

Project initialization: Use 8-project-initialization.md for setting up new pipeline projects with databricks pipelines init, Asset Bundles, multi-environment deployments, and language detection logic. (Keywords: databricks pipelines init, Asset Bundles, language detection, migration guides)

AUTO CDC patterns: Use 9-auto_cdc.md for implementing Change Data Capture with AUTO CDC, including Slow Changing Dimensions (SCD Type 1 and Type 2) for tracking changes and deduplication. (Keywords: AUTO CDC, Slow Changing Dimension, SCD, SCD Type 1, SCD Type 2, change data capture, deduplication)


Workflow

  1. Determine the task type:

    Setting up new project? → Read 8-project-initialization.md first Creating new pipeline? → Read 1-ingestion-patterns.md Creating stream table? → Read 2-streaming-patterns.md Querying SCD history tables? → Read 3-scd-query-patterns.md Implementing AUTO CDC or SCD? → Read 9-auto_cdc.md Performance issues? → Read 4-performance-tuning.md Using Python API? → Read 5-python-api.md Migrating from DLT? → Read 6-dlt-migration.md Advanced configuration? → Read 7-advanced-configuration.md Validating? → Read validation-checklist.md

  2. Follow the instructions in the relevant guide

  3. Repeat for next task type


Official Documentation

Medallion Architecture Pattern

Bronze Layer (Raw)

  • Raw data ingested from sources in original format
  • Minimal transformations (append-only, add metadata like _ingested_at, _source_file)
  • Single source of truth preserving data lineage

Silver Layer (Validated)

  • Cleaned and validated data.
  • Might deduplicate here with auto_cdc, but often wait until the final step for auto_cdc if possible.
  • Business logic applied (type casting, quality checks, filtering invalid records)
  • Enterprise view of key business entities
  • Enables self-service analytics and ML

Gold Layer (Business-Ready)

  • Aggregated, denormalized, project-specific tables
  • Optimized for consumption (reporting, dashboards, BI tools)
  • Fewer joins, read-optimized data models
  • Kimball star schema tables - dim_<entity_name>, fact_<entity_name>
  • Deduplication often happens here via Slow Changing Dimensions (SCD), using auto_cdc. Sometimes that will happen upstream in silver instead, such as when joining multiple tables or business users plan to query the table from silver.

Typical Flow (Can vary)
Bronze: read_files() or spark.readStream.format("cloudFiles") → streaming table
Silver: read bronze → filter/clean/validate → streaming table Gold: read silver → aggregate/denormalize → auto_cdc or materialized view

Sources:

For medallion architecture (bronze/silver/gold), two approaches work:

  • Flat with naming (template default): bronze_*.sql, silver_*.sql, gold_*.sql
  • Subdirectories: bronze/orders.sql, silver/cleaned.sql, gold/summary.sql

Both work with the transformations/** glob pattern. Choose based on preference.

See 8-project-initialization.md for complete details on bundle initialization, migration, and troubleshooting.


General SDP development guidance

Step 1: Write Pipeline Files Locally

Create .sql or .py files in a local folder:

my_pipeline/
├── bronze/
│   ├── ingest_orders.sql       # SQL (default for most cases)
│   └── ingest_events.py        # Python (for complex logic)
├── silver/
│   └── clean_orders.sql
└── gold/
    └── daily_summary.sql

SQL Example (bronze/ingest_orders.sql):

CREATE OR REFRESH STREAMING TABLE bronze_orders
CLUSTER BY (order_date)
AS
SELECT
  *,
  current_timestamp() AS _ingested_at,
  _metadata.file_path AS _source_file
FROM read_files(
  '/Volumes/catalog/schema/raw/orders/',
  format => 'json',
  schemaHints => 'order_id STRING, customer_id STRING, amount DECIMAL(10,2), order_date DATE'
);

Python Example (bronze/ingest_events.py):

from pyspark import pipelines as dp
from pyspark.sql.functions import col, current_timestamp

# Get schema location from pipeline configuration
schema_location_base = spark.conf.get("schema_location_base")

@dp.table(name="bronze_events", cluster_by=["event_date"])
def bronze_events():
    return (
        spark.readStream.format("cloudFiles")
        .option("cloudFiles.format", "json")
        .option("cloudFiles.schemaLocation", f"{schema_location_base}/bronze_events")
        .load("/Volumes/catalog/schema/raw/events/")
        .withColumn("_ingested_at", current_timestamp())
        .withColumn("_source_file", col("_metadata.file_path"))
    )

IMPORTANT for Python Pipelines: When using spark.readStream.format("cloudFiles") for cloud storage ingestion, with schema inference (no schema specified), you must specify a schema location.

Always ask the user where to store Auto Loader schema metadata. Recommend:

/Volumes/{catalog}/{schema}/{pipeline_name}_metadata/schemas

Example: /Volumes/my_catalog/pipeline_metadata/orders_pipeline_metadata/schemas

Never use the source data volume - this causes permission conflicts. The schema location should be configured in the pipeline settings and accessed via spark.conf.get("schema_location_base").

Language Selection:

CRITICAL RULE: If the user explicitly mentions "Python" in their request (e.g., "Python Spark Declarative Pipeline", "Python SDP", "use Python"), ALWAYS use Python without asking. The same applies to SQL - if they say "SQL pipeline", use SQL.

  • Explicit language request: User says "Python" → Use Python. User says "SQL" → Use SQL. Do not ask for clarification.
  • Auto-detection (only when no explicit language mentioned):
    • SQL indicators: "sql files", "simple transformations", "aggregations", "materialized view", "CREATE OR REFRESH"
    • Python indicators: ".py files", "UDF", "complex logic", "ML inference", "external API", "@dp.table", "pandas", "decorator"
  • Prompt for clarification only when language intent is truly ambiguous (no explicit mention, mixed signals)
  • Default to SQL only when ambiguous AND no Python indicators present

See 8-project-initialization.md for detailed language detection logic.

Option 1: Pipelines with DABs:

Use asset bundles and pipeline CLI. See Quick Start and 8-project-initialization.md for complete details.

Option 2: Manual Workflow (Advanced)

For rapid prototyping, experimentation, or when you prefer direct control without Asset Bundles, use the manual workflow with MCP tools.

Use MCP tools to create, run, and iterate on serverless SDP pipelines. The primary tool is create_or_update_pipeline which handles the entire lifecycle.

IMPORTANT: Always create serverless pipelines (default). Only use classic clusters if user explicitly ask for classic, pro, advances compute or requires R language, Spark RDD APIs, or JAR libraries.

See 10-mcp-approach.md for detailed guide.

Best Practices (2026)

Project Structure

  • Default to databricks pipelines init for new projects (creates Asset Bundle)
  • Use Asset Bundles for multi-environment deployments (dev/staging/prod)
  • Manual structure only for quick prototypes or legacy migration
  • Medallion architecture: Two approaches work with Asset Bundles:
    • Flat structure (template default): bronze_*.sql, silver_*.sql, gold_*.sql in transformations/
    • Subdirectories: transformations/bronze/, transformations/silver/, transformations/gold/
    • Both work with the transformations/** glob pattern - choose based on team preference
  • See 8-project-initialization.md for project setup details

Minimal pipeline config pointers

  • Define parameters in your pipeline’s configuration and access them in code with spark.conf.get("key").
  • In Databricks Asset Bundles, set these under resources.pipelines..configuration; validate with databricks bundle validate.

Modern Defaults

  • CLUSTER BY (Liquid Clustering), not PARTITION BY - see 4-performance-tuning.md
  • Raw .sql/.py files, not notebooks
  • Serverless compute ONLY - Do not use classic clusters unless explicitly required
  • Unity Catalog (required for serverless)
  • read_files() when using SQL for cloud storage ingestion - see 1-ingestion-patterns.md

Multi-Schema Patterns

Default: Single target schema per pipeline. Each pipeline has one target catalog and schema where all tables are written.

Option 1: Single Pipeline, Single Schema with Prefixes (Recommended)

Use one schema with table name prefixes to distinguish layers:

# All tables write to: catalog.schema.bronze_*, silver_*, gold_*
@dp.table(name="bronze_orders")  # → catalog.schema.bronze_orders
@dp.table(name="silver_orders")  # → catalog.schema.silver_orders
@dp.table(name="gold_summary")   # → catalog.schema.gold_summary

Advantages:

  • Simpler configuration (one pipeline)
  • All tables in one schema for easy discovery

Option 2:

Use varaiables to specific separate catalog and/or schema for different steps.

Below are Python SDP examples that source variables from pipeline configs via spark.conf.get, and use the default catalog/schema for bronze.

Same catalog, separate schemas; bronze uses pipeline defaults
  • Set your pipeline’s default catalog and default schema to the bronze layer (for example, catalog=my_catalog, schema=bronze). When you omit catalog/schema in code, reads/writes go to these defaults.
  • Use pipeline parameters for the other schemas and any source schema/path, retrieved in code with spark.conf.get(...).
from pyspark import pipelines as dp
from pyspark.sql.functions import col

# Pull variables from pipeline configuration parameters
silver_schema = spark.conf.get("silver_schema")  # e.g., "silver"
gold_schema   = spark.conf.get("gold_schema")    # e.g., "gold"
landing_schema = spark.conf.get("landing_schema")  # e.g., "landing"

# Bronze → uses default catalog/schema (set to bronze in pipeline settings)
@dp.table(name="orders_bronze")
def orders_bronze():
    # Read from another schema in the same default catalog
    return spark.readStream.table(f"{landing_schema}.orders_raw")

# Silver → same catalog, schema from parameter
@dp.table(name=f"{silver_schema}.orders_clean")
def orders_clean():
    return (spark.read.table("orders_bronze")  # unqualified = default catalog/schema
            .filter(col("order_id").isNotNull()))

# Gold → same catalog, schema from parameter
@dp.materialized_view(name=f"{gold_schema}.orders_by_date")
def orders_by_date():
    return (spark.read.table(f"{silver_schema}.orders_clean")
            .groupBy("order_date")
            .count().withColumnRenamed("count", "order_count"))
  • Using unqualified names for bronze ensures it lands in the pipeline’s default catalog/schema; silver/gold are explicitly schema-qualified within the same catalog.

Custom catalog/schema per layer; bronze still uses pipeline defaults
  • Keep bronze in the pipeline defaults (default catalog/schema set to your bronze layer). For silver/gold, use fully-qualified names with catalog and schema variables from pipeline configuration.
from pyspark import pipelines as dp
from pyspark.sql.functions import col

# Pull variables from pipeline configuration parameters
silver_catalog = spark.conf.get("silver_catalog")  # e.g., "my_catalog"
silver_schema  = spark.conf.get("silver_schema")   # e.g., "silver"
gold_catalog   = spark.conf.get("gold_catalog")    # e.g., "my_catalog"
gold_schema    = spark.conf.get("gold_schema")     # e.g., "gold"
landing_catalog = spark.conf.get("landing_catalog")  # optional, if source is in another catalog
landing_schema  = spark.conf.get("landing_schema")

# Bronze → uses default catalog/schema (set to bronze)
@dp.table(name="orders_bronze")
def orders_bronze():
    # If source is in a specified catalog/schema:
    return spark.readStream.table(f"{landing_catalog}.{landing_schema}.orders_raw")

# Silver → custom catalog + schema via parameters
@dp.table(name=f"{silver_catalog}.{silver_schema}.orders_clean")
def orders_clean():
    # Read bronze by its unqualified name (defaults), or fully qualify if preferred
    return (spark.read.table("orders_bronze")
            .filter(col("order_id").isNotNull()))

# Gold → custom catalog + schema via parameters
@dp.materialized_view(name=f"{gold_catalog}.{gold_schema}.orders_by_date}")
def orders_by_date():
    return (spark.read.table(f"{silver_catalog}.{silver_schema}.orders_clean")
            .groupBy("order_date")
            .count().withColumnRenamed("count", "order_count"))
  • Multipart names in the decorator’s name argument let you publish to explicit catalog.schema targets within one pipeline.
  • Unqualified reads/writes use the pipeline defaults; use fully-qualified names when crossing catalogs or when you need explicit namespace control.

Note: The @dp.table() decorator does not currently support separate for schema= or catalog= parameters. The table parameter is a string that contains the catalog.schema.table_name, or it can leave off catalog and or schema to use the pipeilnes configured default target schema.

Reading Tables in Python

Modern SDP Best Practice:

  • Use spark.read.table() for batch reads
  • Use spark.readStream.table() for streaming reads
  • Don't use dp.read() or dp.read_stream() (old syntax, no longer documented)
  • Don't use dlt.read() or dlt.read_stream() (legacy DLT API)

Key Point: SDP automatically tracks table dependencies from standard Spark DataFrame operations. No special read APIs are needed.

Three-Tier Identifier Resolution

SDP supports three levels of table name qualification:

Level Syntax When to Use
Unqualified spark.read.table("my_table") Reading tables within the same pipeline's target catalog/schema (recommended)
Partially-qualified spark.read.table("other_schema.my_table") Reading from different schema in same catalog
Fully-qualified spark.read.table("other_catalog.other_schema.my_table") Reading from external catalogs/schemas

Option 1: Unqualified Names (Recommended for Pipeline Tables)

Best practice for tables within the same pipeline. SDP resolves unqualified names to the pipeline's configured target catalog and schema. This makes code portable across environments (dev/prod).

@dp.table(name="silver_clean")
def silver_clean():
    # Reads from pipeline's target catalog/schema (e.g., dev_catalog.dev_schema.bronze_raw)
    return (
        spark.read.table("bronze_raw")
        .filter(F.col("valid") == True)
    )

@dp.table(name="silver_events")
def silver_events():
    # Streaming read from same pipeline's bronze_events table
    return (
        spark.readStream.table("bronze_events")
        .withColumn("processed_at", F.current_timestamp())
    )

Option 2: Pipeline Parameters (For External Sources)

Use spark.conf.get() to parameterize external catalog/schema references. Define parameters in pipeline configuration, then reference them at the module level.

from pyspark import pipelines as dp
from pyspark.sql import functions as F

# Get parameterized values at module level (evaluated once at pipeline start)
source_catalog = spark.conf.get("source_catalog")
source_schema = spark.conf.get("source_schema", "sales")  # with default

@dp.table(name="transaction_summary")
def transaction_summary():
    return (
        spark.read.table(f"{source_catalog}.{source_schema}.transactions")
        .groupBy("account_id")
        .agg(
            F.count("txn_id").alias("txn_count"),
            F.sum("txn_amount").alias("account_revenue")
        )
    )

Configure parameters in pipeline settings:

  • Asset Bundles: Add to pipeline.yml under configuration:
  • Manual/MCP: Pass via extra_settings.configuration dict
# In resources/my_pipeline.pipeline.yml
configuration:
  source_catalog: "shared_catalog"
  source_schema: "sales"

Option 3: Fully-Qualified Names (For Fixed External References)

Use when referencing specific external tables that don't change across environments:

@dp.table(name="enriched_orders")
def enriched_orders():
    # Pipeline-internal table (unqualified)
    orders = spark.read.table("bronze_orders")

    # External reference table (fully-qualified)
    products = spark.read.table("shared_catalog.reference.products")

    return orders.join(products, "product_id")

Choosing the Right Approach

Scenario Recommended Approach
Reading tables created in same pipeline Unqualified names - portable, uses target catalog/schema
Reading from external source that varies by environment Pipeline parameters - configurable per deployment
Reading from shared/reference tables with fixed location Fully-qualified names - explicit and clear
Mixed pipeline (some internal, some external) Combine approaches - unqualified for internal, parameters for external

Common Issues

Issue Solution
Empty output tables Use get_table_details to verify, check upstream sources
Pipeline stuck INITIALIZING Normal for serverless, wait a few minutes
"Column not found" Check schemaHints match actual data
Streaming reads fail For file ingestion in a streaming table, you must use the STREAM keyword with read_files: FROM STREAM read_files(...). For table streams use FROM stream(table). See read_files — Usage in streaming tables.
Timeout during run Increase timeout, or use wait_for_completion=False and poll with get_update
MV doesn't refresh Enable row tracking on source tables
SCD2: query column not found Lakeflow uses __START_AT and __END_AT (double underscore), not START_AT/END_AT. Use WHERE __END_AT IS NULL for current rows. See 3-scd-patterns.md.
AUTO CDC parse error at APPLY/SEQUENCE Put APPLY AS DELETE WHEN before SEQUENCE BY. Only list columns in COLUMNS * EXCEPT (...) that exist in the source (omit _rescued_data unless bronze uses rescue data). Omit TRACK HISTORY ON * if it causes "end of input" errors; default is equivalent. See 2-streaming-patterns.md.
"Cannot create streaming table from batch query" In a streaming table query, use FROM STREAM read_files(...) so read_files leverages Auto Loader; FROM read_files(...) alone is batch. See 1-ingestion-patterns.md and read_files — Usage in streaming tables.

For detailed errors, the result["message"] from create_or_update_pipeline includes suggested next steps. Use get_pipeline_events(pipeline_id=...) for full stack traces.


Advanced Pipeline Configuration

For advanced configuration options (development mode, continuous pipelines, custom clusters, notifications, Python dependencies, etc.), see 7-advanced-configuration.md.


Platform Constraints

Serverless Pipeline Requirements (Default)

Requirement Details
Unity Catalog Required - serverless pipelines always use UC
Workspace Region Must be in serverless-enabled region
Serverless Terms Must accept serverless terms of use
CDC Features Requires serverless (or Pro/Advanced with classic clusters)

Serverless Limitations (When Classic Clusters Required)

Limitation Workaround
R language Not supported - use classic clusters if required
Spark RDD APIs Not supported - use classic clusters if required
JAR libraries Not supported - use classic clusters if required
Maven coordinates Not supported - use classic clusters if required
DBFS root access Limited - must use Unity Catalog external locations
Global temp views Not supported

General Constraints

Constraint Details
Schema Evolution Streaming tables require full refresh for incompatible changes
SQL Limitations PIVOT clause unsupported
Sinks Python only, streaming only, append flows only

Default to serverless unless user explicitly requires R, RDD APIs, or JAR libraries.

Related Skills

Weekly Installs
6
GitHub Stars
900
First Seen
Feb 9, 2026
Installed on
opencode6
github-copilot4
amp3
continue3
kimi-cli3
codex3