skills/databricks-solutions/vibe-coding-workshop-template/lakehouse-monitoring-comprehensive

lakehouse-monitoring-comprehensive

SKILL.md

Lakehouse Monitoring Comprehensive

Overview

Data Profiling monitors provide table-level custom business metrics that track data quality, detect drift, and support business KPI monitoring on Gold layer tables. This is the custom metrics counterpart to Anomaly Detection's automated freshness/completeness checks.

API Status: Public Preview (POST /api/data-quality/v1/monitors)

SDK Module: databricks.sdk.service.dataquality (replaces legacy databricks.sdk.service.catalog)

When to Use This Skill vs. Anomaly Detection

Need Use This Skill Use Anomaly Detection
Custom business KPIs (revenue, velocity) Yes No
Table freshness checks No Yes
Schema-wide row completeness No Yes
Period-over-period drift Yes (DRIFT metrics) No
Per-table deep profiling Yes No (schema-level only)
ML model monitoring Yes (InferenceLog) No

Use both together for comprehensive monitoring: data profiling for business KPIs + anomaly detection for baseline reliability.

Quick Start (2-Hour Workflow)

Phase Duration Activities
Phase 1: Design 30 min Define metrics per table using references/metric-design-guide.md
Phase 2: Setup 30 min Run setup script from scripts/create_monitor.py
Phase 3: Wait 15-20 min Monitor initialization (async) using scripts/wait_for_initialization.py
Phase 4: Validate 15 min Query _profile_metrics and _drift_metrics tables

Critical Rules

Rule 1: SDK Module — Use dataquality, NOT catalog

# ✅ CORRECT (new API)
from databricks.sdk.service.dataquality import (
    Monitor,
    DataProfilingConfig,
    DataProfilingCustomMetric,
    DataProfilingCustomMetricType,
    TimeSeriesConfig,
    SnapshotConfig,
    AggregationGranularity,
)

# ❌ WRONG (legacy API — deprecated)
from databricks.sdk.service.catalog import MonitorMetric, MonitorMetricType

Rule 2: create_monitor() Takes a Monitor Object

# ✅ CORRECT: Wrap in Monitor object
table_info = w.tables.get(full_name=f"{catalog}.{schema}.{table}")
w.data_quality.create_monitor(
    monitor=Monitor(
        object_type="table",
        object_id=table_info.table_id,
        data_profiling_config=config,
    )
)

# ❌ WRONG: Three-level name
w.quality_monitors.create(table_name=f"{catalog}.{schema}.{table}", ...)

Rule 3: Use SDK Objects, NOT Dictionaries

# ✅ CORRECT: DataProfilingCustomMetric objects
DataProfilingCustomMetric(
    type=DataProfilingCustomMetricType.DATA_PROFILING_CUSTOM_METRIC_TYPE_AGGREGATE,
    name="total_revenue",
    input_columns=[":table"],
    definition="SUM(net_revenue)",
    output_data_type=T.StructField("output", T.DoubleType()).json()
)

# ❌ WRONG: Plain dictionaries
{"name": "total_revenue", "type": "AGGREGATE", "definition": "SUM(net_revenue)"}

Rule 4: output_data_type MUST Be StructField JSON

# ✅ CORRECT: StructField JSON format
output_data_type=T.StructField("output", T.DoubleType()).json()

# ❌ WRONG: String will silently fail monitor initialization
output_data_type="double"

Rule 5: DERIVED Syntax — Direct Reference, No Templates

# ✅ CORRECT: Direct reference to aggregate metric name
definition="(total_cancellations / NULLIF(total_bookings, 0)) * 100"

# ❌ WRONG: Template syntax causes INVALID_DERIVED_METRIC error
definition="({{total_cancellations}} / NULLIF({{total_bookings}}, 0)) * 100"

Rule 6: DRIFT Syntax — MUST Use Window Templates

# ✅ CORRECT: Compare windows
definition="{{current_df}}.daily_revenue - {{base_df}}.daily_revenue"

# ❌ WRONG: Missing window comparison
definition="{{daily_revenue}}"

Rule 7: Use Typed Granularity Enums

# ✅ CORRECT: AggregationGranularity enum
from databricks.sdk.service.dataquality import AggregationGranularity
TimeSeriesConfig(
    timestamp_column="transaction_date",
    granularities=[AggregationGranularity.AGGREGATION_GRANULARITY_1_DAY]
)

# ❌ WRONG: String granularity (legacy API)
MonitorTimeSeries(timestamp_col="transaction_date", granularities=["1 day"])

Rule 8: Use Output Schema UUID

# ✅ CORRECT: Get monitoring schema UUID
monitoring_schema = w.schemas.get(full_name=f"{catalog}.{schema}_monitoring")
DataProfilingConfig(output_schema_id=monitoring_schema.schema_id, ...)

# ❌ WRONG: output_schema_name is legacy
create_monitor(output_schema_name=f"{catalog}.{schema}_monitoring")

Rule 9: Monitor Initialization Is Async (15-20 Minutes)

After creating a monitor, the first profile computation runs automatically. Use the Refresh tracking pattern in scripts/wait_for_initialization.py to poll for completion.

Rule 10: Delete Cleanup — Drop Output Tables Too

# Delete monitor definition
w.data_quality.delete_monitor(object_type="table", object_id=table_id)

# Also drop the output tables
spark.sql(f"DROP TABLE IF EXISTS {catalog}.{monitoring_schema}.{table}_profile_metrics")
spark.sql(f"DROP TABLE IF EXISTS {catalog}.{monitoring_schema}.{table}_drift_metrics")

SDK Migration Summary

Legacy (catalog module) New (dataquality module)
MonitorMetric DataProfilingCustomMetric
MonitorMetricType.CUSTOM_METRIC_TYPE_AGGREGATE DataProfilingCustomMetricType.DATA_PROFILING_CUSTOM_METRIC_TYPE_AGGREGATE
MonitorMetricType.CUSTOM_METRIC_TYPE_DERIVED DataProfilingCustomMetricType.DATA_PROFILING_CUSTOM_METRIC_TYPE_DERIVED
MonitorMetricType.CUSTOM_METRIC_TYPE_DRIFT DataProfilingCustomMetricType.DATA_PROFILING_CUSTOM_METRIC_TYPE_DRIFT
MonitorTimeSeries(timestamp_col=..., granularities=["1 day"]) TimeSeriesConfig(timestamp_column=..., granularities=[AggregationGranularity.AGGREGATION_GRANULARITY_1_DAY])
MonitorSnapshot() SnapshotConfig()
MonitorInfoStatus.MONITOR_STATUS_ACTIVE DataProfilingStatus.DATA_PROFILING_STATUS_ACTIVE
w.quality_monitors.create(table_name=...) w.data_quality.create_monitor(monitor=Monitor(object_type="table", object_id=uuid, data_profiling_config=...))
w.quality_monitors.get(table_name=...) w.data_quality.get_monitor(object_type="table", object_id=uuid)
w.quality_monitors.delete(table_name=...) w.data_quality.delete_monitor(object_type="table", object_id=uuid)
Manual status polling Refresh object with RefreshState enum

Reference Files

custom-metrics.md

Custom metric type reference including:

  • Required imports (DataProfilingCustomMetric, DataProfilingCustomMetricType)
  • AGGREGATE syntax (SQL on table columns)
  • DERIVED syntax (direct reference, NO {{ }})
  • DRIFT syntax ({{current_df}}.metric - {{base_df}}.metric)
  • Business-focused metric categories
  • output_data_type format (T.StructField().json())

monitor-configuration.md

Monitor setup patterns including:

  • UUID lookup helpers (table and schema)
  • DataProfilingConfig construction
  • TimeSeries vs Snapshot configuration with typed granularities
  • Graceful degradation (try/except for SDK imports)
  • Notification settings
  • Schedule configuration

deployment-guide.md

Operational deployment including:

  • Genie Space documentation patterns
  • Query patterns for _profile_metrics and _drift_metrics tables
  • Ad-hoc ratio calculations (alternative to DERIVED metrics)
  • Asset Bundle job configuration

quick-start-guide.md

Fast-track setup including:

  • Phase-based implementation checklist
  • Fast-track code (corrected patterns)
  • Sample metric queries
  • Critical validation steps

metric-design-guide.md

Metric design and planning including:

  • Fill-in-the-blank requirements template
  • Monitor priority definitions (P1-P3)
  • Custom metric templates by category
  • Alert strategy table

example-monitor-definitions.md

Concrete implementation examples including:

  • create_fact_sales_daily_monitor() — 9 metrics (aggregate + derived + drift)
  • create_dim_store_monitor() — 4 metrics (snapshot)

Scripts

create_monitor.py

Core monitor management functions:

  • get_table_id() / get_schema_id() — UUID lookup helpers
  • create_table_monitor() — Full monitor creation with DataProfilingConfig
  • delete_monitor_if_exists() — Safe cleanup with output table drops
  • wait_with_progress() — Timer-based fallback wait

setup_monitors_template.py

Complete notebook template:

  • argparse parameter handling
  • Monitor tracking with success/failure reporting
  • Error handling with graceful degradation
  • create_monitor_with_custom_metrics() — Full pipeline

wait_for_initialization.py

Async monitoring:

  • wait_for_monitor_refresh() — Refresh-based tracking with RefreshState
  • wait_for_all_monitors() — Multi-table status polling with timeout

Assets

monitoring-requirements-template.md

Fill-in-the-blank markdown template for defining monitoring requirements.

monitoring-job-template.yml

Databricks Asset Bundle job template for deploying monitors.

Troubleshooting

Monitor Shows No Data After 20+ Minutes

  1. Check monitor status via w.data_quality.get_monitor()
  2. Check latest refresh: w.data_quality.list_refresh()
  3. Verify table has data: SELECT COUNT(*) FROM table
  4. Verify output_data_type uses T.StructField().json() format

INVALID_DERIVED_METRIC Error

Using {{metric_name}} template syntax instead of direct reference. Remove all {{ }} from DERIVED metric definitions.

Profile Metrics Table Empty

The _profile_metrics table is created by the monitor. If empty, the first refresh hasn't completed yet. Wait 15-20 minutes.

"Monitor Already Exists" Error

Delete the existing monitor first using delete_monitor_if_exists() from scripts/create_monitor.py.

References

Weekly Installs
1
GitHub Stars
2
First Seen
8 days ago
Installed on
amp1
cline1
opencode1
cursor1
kimi-cli1
codex1