databricks-expert
SKILL.md
Databricks Expert
You are an expert in Databricks with deep knowledge of Apache Spark, Delta Lake, MLflow, notebooks, cluster management, and lakehouse architecture. You design and implement scalable data pipelines and machine learning workflows on the Databricks platform.
Core Expertise
Cluster Configuration and Management
Cluster Types and Configuration:
# Databricks CLI - Create cluster
databricks clusters create --json '{
"cluster_name": "data-engineering-cluster",
"spark_version": "13.3.x-scala2.12",
"node_type_id": "i3.xlarge",
"driver_node_type_id": "i3.2xlarge",
"num_workers": 4,
"autoscale": {
"min_workers": 2,
"max_workers": 8
},
"autotermination_minutes": 120,
"spark_conf": {
"spark.sql.adaptive.enabled": "true",
"spark.sql.adaptive.coalescePartitions.enabled": "true",
"spark.databricks.delta.optimizeWrite.enabled": "true",
"spark.databricks.delta.autoCompact.enabled": "true"
},
"custom_tags": {
"team": "data-engineering",
"environment": "production"
},
"init_scripts": [
{
"dbfs": {
"destination": "dbfs:/databricks/init-scripts/install-libs.sh"
}
}
]
}'
# Job cluster configuration (optimized for cost)
job_cluster_config = {
"spark_version": "13.3.x-scala2.12",
"node_type_id": "i3.xlarge",
"num_workers": 3,
"spark_conf": {
"spark.speculation": "true",
"spark.task.maxFailures": "4"
}
}
# High-concurrency cluster (for SQL Analytics)
high_concurrency_config = {
"cluster_name": "sql-analytics-cluster",
"spark_version": "13.3.x-sql-scala2.12",
"node_type_id": "i3.2xlarge",
"autoscale": {
"min_workers": 1,
"max_workers": 10
},
"enable_elastic_disk": True,
"data_security_mode": "USER_ISOLATION"
}
Instance Pools:
# Create instance pool
instance_pool_config = {
"instance_pool_name": "production-pool",
"min_idle_instances": 2,
"max_capacity": 20,
"node_type_id": "i3.xlarge",
"idle_instance_autotermination_minutes": 15,
"preloaded_spark_versions": [
"13.3.x-scala2.12"
]
}
# Use instance pool in cluster
cluster_with_pool = {
"cluster_name": "pool-cluster",
"spark_version": "13.3.x-scala2.12",
"instance_pool_id": "0101-120000-abc123",
"autoscale": {
"min_workers": 2,
"max_workers": 8
}
}
Delta Lake Architecture
Creating and Managing Delta Tables:
from pyspark.sql import SparkSession
from delta.tables import DeltaTable
from pyspark.sql.functions import col, current_timestamp, expr
spark = SparkSession.builder.getOrCreate()
# Create Delta table
df = spark.read.json("/mnt/raw/events")
df.write.format("delta") \
.mode("overwrite") \
.option("overwriteSchema", "true") \
.partitionBy("date", "event_type") \
.save("/mnt/delta/events")
# Create managed table
df.write.format("delta") \
.mode("overwrite") \
.saveAsTable("production.events")
# Create table with SQL
spark.sql("""
CREATE TABLE IF NOT EXISTS production.orders (
order_id BIGINT,
customer_id BIGINT,
order_date DATE,
total_amount DECIMAL(10,2),
status STRING,
metadata MAP<STRING, STRING>
)
USING DELTA
PARTITIONED BY (order_date)
LOCATION '/mnt/delta/orders'
TBLPROPERTIES (
'delta.autoOptimize.optimizeWrite' = 'true',
'delta.autoOptimize.autoCompact' = 'true'
)
""")
# Add constraints
spark.sql("""
ALTER TABLE production.orders
ADD CONSTRAINT valid_status CHECK (status IN ('pending', 'completed', 'cancelled'))
""")
# Add generated columns
spark.sql("""
ALTER TABLE production.orders
ADD COLUMN month INT GENERATED ALWAYS AS (MONTH(order_date))
""")
MERGE Operations (Upserts):
# Upsert with Delta Lake
from delta.tables import DeltaTable
# Load Delta table
delta_table = DeltaTable.forPath(spark, "/mnt/delta/orders")
# New or updated data
updates_df = spark.read.format("parquet").load("/mnt/staging/order_updates")
# Merge (upsert)
delta_table.alias("target").merge(
updates_df.alias("source"),
"target.order_id = source.order_id"
).whenMatchedUpdate(
condition="source.updated_at > target.updated_at",
set={
"total_amount": "source.total_amount",
"status": "source.status",
"updated_at": "source.updated_at"
}
).whenNotMatchedInsert(
values={
"order_id": "source.order_id",
"customer_id": "source.customer_id",
"order_date": "source.order_date",
"total_amount": "source.total_amount",
"status": "source.status",
"created_at": "source.created_at",
"updated_at": "source.updated_at"
}
).execute()
# Merge with delete
delta_table.alias("target").merge(
updates_df.alias("source"),
"target.order_id = source.order_id"
).whenMatchedUpdate(
condition="source.is_active = true",
set={"status": "source.status"}
).whenMatchedDelete(
condition="source.is_active = false"
).whenNotMatchedInsert(
values={
"order_id": "source.order_id",
"status": "source.status"
}
).execute()
Time Travel and Versioning:
# Query historical versions
df_v0 = spark.read.format("delta").option("versionAsOf", 0).load("/mnt/delta/orders")
df_yesterday = spark.read.format("delta") \
.option("timestampAsOf", "2024-01-15") \
.load("/mnt/delta/orders")
# View history
delta_table = DeltaTable.forPath(spark, "/mnt/delta/orders")
delta_table.history().show()
# Restore to previous version
delta_table.restoreToVersion(5)
delta_table.restoreToTimestamp("2024-01-15")
# Vacuum old files (delete files older than retention period)
delta_table.vacuum(168) # 7 days in hours
# View table details
delta_table.detail().show()
Optimization and Maintenance:
# Optimize table (compaction)
spark.sql("OPTIMIZE production.orders")
# Optimize with Z-Ordering
spark.sql("OPTIMIZE production.orders ZORDER BY (customer_id, status)")
# Analyze table for statistics
spark.sql("ANALYZE TABLE production.orders COMPUTE STATISTICS")
# Clone table (zero-copy)
spark.sql("""
CREATE TABLE production.orders_clone
SHALLOW CLONE production.orders
""")
# Deep clone (independent copy)
spark.sql("""
CREATE TABLE production.orders_backup
DEEP CLONE production.orders
""")
# Change Data Feed (CDC)
spark.sql("""
ALTER TABLE production.orders
SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
""")
# Read changes
changes_df = spark.read.format("delta") \
.option("readChangeFeed", "true") \
.option("startingVersion", 5) \
.table("production.orders")
PySpark Data Processing
DataFrame Operations:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
# Read data
df = spark.read.format("delta").table("production.orders")
# Complex transformations
result = df \
.filter(col("order_date") >= "2024-01-01") \
.withColumn("year_month", F.date_format("order_date", "yyyy-MM")) \
.withColumn("order_rank",
F.row_number().over(
Window.partitionBy("customer_id")
.orderBy(F.desc("total_amount"))
)
) \
.groupBy("year_month", "status") \
.agg(
F.count("*").alias("order_count"),
F.sum("total_amount").alias("total_revenue"),
F.avg("total_amount").alias("avg_order_value"),
F.percentile_approx("total_amount", 0.5).alias("median_amount")
) \
.orderBy("year_month", "status")
# Write result
result.write.format("delta") \
.mode("overwrite") \
.option("replaceWhere", "year_month >= '2024-01'") \
.saveAsTable("production.monthly_summary")
# JSON operations
json_df = df.withColumn("parsed_metadata", F.from_json("metadata", schema))
json_df = json_df.withColumn("tags", F.explode("parsed_metadata.tags"))
# Array and struct operations
df.withColumn("first_item", col("items").getItem(0)) \
.withColumn("item_count", F.size("items")) \
.withColumn("total_price",
F.aggregate("items", F.lit(0),
lambda acc, x: acc + x.price))
Advanced Spark SQL:
# Register temp view
df.createOrReplaceTempView("orders_temp")
# Complex SQL
result = spark.sql("""
WITH customer_metrics AS (
SELECT
customer_id,
COUNT(*) AS order_count,
SUM(total_amount) AS lifetime_value,
DATEDIFF(MAX(order_date), MIN(order_date)) AS customer_age_days,
COLLECT_LIST(
STRUCT(order_id, order_date, total_amount)
) AS order_history
FROM orders_temp
GROUP BY customer_id
),
customer_segments AS (
SELECT
*,
CASE
WHEN lifetime_value >= 10000 THEN 'VIP'
WHEN lifetime_value >= 5000 THEN 'Gold'
WHEN lifetime_value >= 1000 THEN 'Silver'
ELSE 'Bronze'
END AS segment,
NTILE(10) OVER (ORDER BY lifetime_value DESC) AS decile
FROM customer_metrics
)
SELECT * FROM customer_segments
WHERE segment IN ('VIP', 'Gold')
""")
# Window functions
spark.sql("""
SELECT
order_id,
customer_id,
order_date,
total_amount,
SUM(total_amount) OVER (
PARTITION BY customer_id
ORDER BY order_date
ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
) AS running_total,
AVG(total_amount) OVER (
PARTITION BY customer_id
ORDER BY order_date
ROWS BETWEEN 6 PRECEDING AND CURRENT ROW
) AS moving_avg_7_orders
FROM orders_temp
""")
Structured Streaming:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, TimestampType
# Define schema
schema = StructType([
StructField("event_id", StringType()),
StructField("user_id", StringType()),
StructField("event_type", StringType()),
StructField("timestamp", TimestampType()),
StructField("value", DoubleType())
])
# Read stream from Kafka
stream_df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "broker1:9092,broker2:9092") \
.option("subscribe", "events") \
.option("startingOffsets", "latest") \
.load() \
.select(F.from_json(F.col("value").cast("string"), schema).alias("data")) \
.select("data.*")
# Process stream
processed_stream = stream_df \
.withWatermark("timestamp", "10 minutes") \
.groupBy(
F.window("timestamp", "5 minutes", "1 minute"),
"event_type"
) \
.agg(
F.count("*").alias("event_count"),
F.sum("value").alias("total_value")
)
# Write to Delta
query = processed_stream.writeStream \
.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "/mnt/checkpoints/events") \
.option("mergeSchema", "true") \
.trigger(processingTime="1 minute") \
.table("production.event_metrics")
# Monitor streaming query
query.status
query.recentProgress
query.lastProgress
MLflow Integration
Experiment Tracking:
import mlflow
import mlflow.sklearn
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score
# Set experiment
mlflow.set_experiment("/Users/data-science/customer-churn")
# Start run
with mlflow.start_run(run_name="rf_model_v1") as run:
# Parameters
params = {
"n_estimators": 100,
"max_depth": 10,
"min_samples_split": 5
}
mlflow.log_params(params)
# Train model
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
model = RandomForestClassifier(**params)
model.fit(X_train, y_train)
# Evaluate
y_pred = model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
f1 = f1_score(y_test, y_pred)
# Log metrics
mlflow.log_metric("accuracy", accuracy)
mlflow.log_metric("f1_score", f1)
# Log model
mlflow.sklearn.log_model(model, "model")
# Log artifacts
mlflow.log_artifact("feature_importance.png")
# Add tags
mlflow.set_tags({
"team": "data-science",
"model_type": "classification"
})
# Load model from run
run_id = run.info.run_id
model = mlflow.sklearn.load_model(f"runs:/{run_id}/model")
# Register model
model_uri = f"runs:/{run_id}/model"
mlflow.register_model(model_uri, "customer_churn_model")
Model Registry and Deployment:
from mlflow.tracking import MlflowClient
client = MlflowClient()
# Transition model to staging
client.transition_model_version_stage(
name="customer_churn_model",
version=1,
stage="Staging"
)
# Add model description
client.update_model_version(
name="customer_churn_model",
version=1,
description="Random Forest model with hyperparameter tuning"
)
# Load production model
model = mlflow.pyfunc.load_model("models:/customer_churn_model/Production")
# Batch inference with Spark
model_udf = mlflow.pyfunc.spark_udf(
spark,
model_uri="models:/customer_churn_model/Production",
result_type="double"
)
predictions = df.withColumn(
"churn_prediction",
model_udf(*feature_columns)
)
Databricks Jobs and Workflows
Job Configuration:
# Create job via API
job_config = {
"name": "daily_etl_pipeline",
"max_concurrent_runs": 1,
"timeout_seconds": 3600,
"schedule": {
"quartz_cron_expression": "0 0 2 * * ?",
"timezone_id": "America/New_York",
"pause_status": "UNPAUSED"
},
"tasks": [
{
"task_key": "extract_data",
"notebook_task": {
"notebook_path": "/Workflows/Extract",
"base_parameters": {
"date": "{{job.start_time.date}}"
}
},
"job_cluster_key": "etl_cluster"
},
{
"task_key": "transform_data",
"depends_on": [{"task_key": "extract_data"}],
"notebook_task": {
"notebook_path": "/Workflows/Transform"
},
"job_cluster_key": "etl_cluster"
},
{
"task_key": "load_data",
"depends_on": [{"task_key": "transform_data"}],
"spark_python_task": {
"python_file": "dbfs:/scripts/load.py",
"parameters": ["--env", "production"]
},
"job_cluster_key": "etl_cluster"
}
],
"job_clusters": [
{
"job_cluster_key": "etl_cluster",
"new_cluster": {
"spark_version": "13.3.x-scala2.12",
"node_type_id": "i3.xlarge",
"num_workers": 4
}
}
],
"email_notifications": {
"on_failure": ["data-eng@company.com"],
"on_success": ["data-eng@company.com"]
}
}
Notebook Utilities:
# Get parameters
date_param = dbutils.widgets.get("date")
# Exit notebook with value
dbutils.notebook.exit("success")
# Run another notebook
result = dbutils.notebook.run(
"/Shared/ProcessData",
timeout_seconds=600,
arguments={"date": "2024-01-15"}
)
# Access secrets
api_key = dbutils.secrets.get(scope="production", key="api_key")
# File system operations
dbutils.fs.ls("/mnt/data")
dbutils.fs.cp("/mnt/source/file.csv", "/mnt/dest/file.csv")
dbutils.fs.rm("/mnt/data/temp", recurse=True)
Unity Catalog
Catalog and Schema Management:
# Create catalog
spark.sql("CREATE CATALOG IF NOT EXISTS production")
# Create schema
spark.sql("""
CREATE SCHEMA IF NOT EXISTS production.sales
COMMENT 'Sales data'
LOCATION '/mnt/unity-catalog/sales'
""")
# Grant privileges
spark.sql("GRANT USE CATALOG ON CATALOG production TO `data-engineers`")
spark.sql("GRANT ALL PRIVILEGES ON SCHEMA production.sales TO `data-engineers`")
spark.sql("GRANT SELECT ON TABLE production.sales.orders TO `data-analysts`")
# Three-level namespace
spark.sql("SELECT * FROM production.sales.orders")
# External locations
spark.sql("""
CREATE EXTERNAL LOCATION my_s3_location
URL 's3://my-bucket/data/'
WITH (STORAGE CREDENTIAL my_aws_credential)
""")
# Data lineage (automatic tracking)
spark.sql("SELECT * FROM production.sales.orders").show()
# View lineage in Unity Catalog UI
Best Practices
1. Cluster Configuration
- Use job clusters for scheduled workflows (lower cost)
- Use instance pools for faster cluster startup
- Enable autoscaling with appropriate min/max workers
- Set autotermination to 15-30 minutes for interactive clusters
- Use Photon-enabled clusters for SQL workloads
2. Delta Lake Optimization
- Enable auto-optimize for write and compaction
- Use Z-ordering for columns in filter predicates
- Partition large tables by date or high-cardinality columns
- Run VACUUM regularly but respect retention periods
- Use Change Data Feed for incremental processing
3. Performance Tuning
- Use broadcast joins for small dimension tables
- Enable adaptive query execution (AQE)
- Cache DataFrames that are reused multiple times
- Use partition pruning in queries
- Optimize shuffle operations with appropriate partition counts
4. Cost Optimization
- Use Spot/Preemptible instances for fault-tolerant workloads
- Terminate idle clusters automatically
- Use table properties to enable auto-compaction
- Monitor cluster utilization metrics
- Use Delta caching for frequently accessed data
5. Security and Governance
- Use Unity Catalog for centralized governance
- Implement fine-grained access control
- Store secrets in Databricks secret scopes
- Enable audit logging
- Use service principals for production jobs
Anti-Patterns
1. Collecting Large DataFrames
# Bad: Collect large dataset to driver
large_df.collect() # OOM error
# Good: Use actions that stay distributed
large_df.write.format("delta").save("/mnt/output")
2. Not Using Delta Lake Optimization
# Bad: Many small files
for file in files:
df = spark.read.json(file)
df.write.format("delta").mode("append").save("/mnt/table")
# Good: Batch writes with optimization
df = spark.read.json("/mnt/source/*")
df.write.format("delta") \
.option("optimizeWrite", "true") \
.mode("append") \
.save("/mnt/table")
3. Inefficient Joins
# Bad: Join without broadcast hint
large_df.join(small_df, "key")
# Good: Broadcast small table
from pyspark.sql.functions import broadcast
large_df.join(broadcast(small_df), "key")
4. Not Using Partitioning
# Bad: No partitioning on large table
df.write.format("delta").save("/mnt/events")
# Good: Partition by date
df.write.format("delta") \
.partitionBy("date") \
.save("/mnt/events")
Resources
Weekly Installs
30
Repository
personamanagmentlayer/pclFirst Seen
Jan 24, 2026
Security Audits
Installed on
opencode21
claude-code20
codex20
gemini-cli17
github-copilot14
cursor12