ml-pipeline-setup
MLflow & ML Models Patterns
Phase 0: Read Plan (5 minutes)
Before starting implementation, check for a planning manifest that defines what to build.
import yaml
from pathlib import Path
manifest_path = Path("plans/manifests/ml-manifest.yaml")
if manifest_path.exists():
with open(manifest_path) as f:
manifest = yaml.safe_load(f)
# Extract implementation checklist from manifest
feature_tables = manifest.get('feature_tables', [])
models = manifest.get('models', [])
experiments = manifest.get('experiments', [])
print(f"Plan: {len(feature_tables)} feature tables, {len(models)} models, {len(experiments)} experiments")
# Each model has: name, domain, model_type, algorithm, feature_table,
# label_column, label_type, business_questions
# Each feature table has: name, primary_keys, source_gold_tables, features
else:
# Fallback: self-discovery from Gold tables
print("No manifest found — falling back to Gold table self-discovery")
# Discover Gold fact tables, infer feature columns, create one model per domain
If manifest exists: Use it as the implementation checklist. Every feature table, model, and experiment is pre-defined with configuration details. Track completion against the manifest's summary counts.
If manifest doesn't exist: Fall back to self-discovery — inventory Gold fact tables, infer feature columns from numeric columns, and create one model per domain. This works but may miss specific label derivations and business context the planning phase would have defined.
Quick Start (4-6 hours)
Goal: Build production-ready ML pipelines with MLflow 3.1+, Unity Catalog Model Registry, and Databricks Feature Engineering for training-serving consistency.
What You'll Create:
features/create_feature_tables.py- Feature tables in Unity Catalog{domain}/train_{model_name}.py- Training pipelines with Feature Engineeringinference/batch_inference_all_models.py- Batch scoring withfe.score_batch- Asset Bundle jobs for orchestration
Fast Track:
# 1. Create Feature Tables
databricks bundle run ml_feature_pipeline_job -t dev
# 2. Train all models (parallel)
databricks bundle run ml_training_pipeline_job -t dev
# 3. Run batch inference
databricks bundle run ml_inference_pipeline_job -t dev
Overview
Production-grade patterns for implementing ML pipelines on Databricks using MLflow, Unity Catalog, and Feature Store. Based on production experience with 25 models across 5 domains, achieving 96% inference success rate and 93% reduction in debugging time.
Pattern Origin: December 2025 (Updated: February 6, 2026 - v5.0)
When to Use This Skill
Use this skill when:
- Implementing ML pipelines on Databricks with MLflow tracking
- Training models with Feature Store integration
- Deploying batch inference jobs
- Registering models to Unity Catalog
- Troubleshooting MLflow experiment, model registration, or inference errors
- Setting up Databricks Asset Bundle jobs for ML workflows
- Creating feature tables in Unity Catalog with proper primary keys and NaN handling
Critical for:
- Ensuring training and inference consistency via
fe.score_batch - Preventing common MLflow signature errors
- Handling data quality issues (NaN, label binarization, single-class data)
- Configuring serverless ML jobs correctly
Working Memory Management
This orchestrator covers Phase 0 (plan reading) plus multiple implementation sections (feature tables, training, inference, deployment). To maintain coherence without context pollution:
After each major section, persist a brief summary note capturing:
- Phase 0 output: Manifest found (yes/no), model count, feature table count, experiment names from manifest or discovery
- Feature tables output: Feature table names and paths, primary key columns, NaN handling decisions
- Training output: Experiment names, model URIs, MLflow signature details, label binarization strategy
- Inference output: Batch inference notebook paths,
fe.score_batchconfig, output table names - Jobs output: Job YAML file paths, environment config,
databricks.ymlsync status
What to keep in working memory: Only the current section's reference skill, the model/feature inventory (from Phase 0), and the previous section's summary note. Discard intermediate outputs (full DataFrames, training logs, model artifacts) — they are in MLflow and reproducible.
Architecture Overview
┌─────────────────────────────────────────────────────────────────┐
│ Gold Layer │
│ (fact_tables, dim_tables - source for feature engineering) │
└───────────────────────┬─────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ Feature Tables (Unity Catalog) │
│ ┌───────────────────┐ ┌──────────────────┐ ┌─────────────────┐ │
│ │ cost_features │ │ security_features│ │ performance_ │ │
│ │ PK: workspace_id, │ │ PK: user_id, │ │ features │ │
│ │ usage_date │ │ event_date │ │ PK: warehouse_id│ │
│ └───────────────────┘ └──────────────────┘ │ query_date │ │
│ └─────────────────┘ │
└───────────────────────┬─────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ Training Pipelines │
│ ┌─────────────────────────────────────────────────────────────┐│
│ │ FeatureLookup → create_training_set → train → fe.log_model ││
│ │ (Embeds feature metadata for inference consistency) ││
│ └─────────────────────────────────────────────────────────────┘│
└───────────────────────┬─────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ Unity Catalog Model Registry (MLflow 3.1+) │
│ catalog.{feature_schema}.{model_name} │
│ (Model + Feature Lookup Metadata embedded) │
└───────────────────────┬─────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ Inference Layer │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ fe.score_batch(model_uri, df_with_lookup_keys_only) │ │
│ │ → Automatically retrieves features from Feature Tables │ │
│ │ → Guarantees training-serving consistency │ │
│ └──────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
Directory Structure
src/{project}_ml/
├── features/
│ └── create_feature_tables.py # Feature table creation
├── cost/
│ ├── train_budget_forecaster.py
│ ├── train_cost_anomaly_detector.py
│ └── train_chargeback_attribution.py
├── security/
│ └── train_security_threat_detector.py
├── performance/
│ └── train_query_performance_forecaster.py
├── reliability/
│ └── train_job_failure_predictor.py
├── quality/
│ └── train_data_drift_detector.py
├── inference/
│ └── batch_inference_all_models.py # Uses fe.score_batch
└── README.md
resources/ml/
├── ml_feature_pipeline_job.yml # Feature table creation
├── ml_training_pipeline_job.yml # Training orchestrator
└── ml_inference_pipeline_job.yml # Batch inference
Critical Rules (Quick Reference)
| # | Rule | Pattern | Why It Fails Otherwise |
|---|---|---|---|
| 0 | Pin Package Versions | mlflow==3.7.0 (exact) in training AND inference |
Version mismatch warnings, deserialization failures |
| 1 | Experiment Path | /Shared/{project}_ml_{model_name} |
/Users/... fails silently if subfolder doesn't exist |
| 2 | Dataset Logging | Inside mlflow.start_run() context |
Won't associate with run, invisible in UI |
| 3 | Exit Signal | dbutils.notebook.exit("SUCCESS") |
Job status unclear, may show SUCCESS on failure |
| 4 | UC Model Logging | Use fe.log_model() with output_schema (PRIMARY) or infer_signature (alternative) |
Unity Catalog rejects models without output spec |
| 5 | Feature Engineering Workflow | FeatureLookup + create_training_set + fe.log_model |
Feature skew between training and inference |
| 6 | NaN Handling at Source | Clean NaN/Inf at feature table creation with clean_numeric() |
sklearn GradientBoosting fails at inference; XGBoost handles NaN but sklearn doesn't |
| 7 | Label Binarization | Convert 0-1 rates to binary for classifiers | XGBoostError: base_score must be in (0,1) |
| 8 | Single-Class Check | Verify label distribution before training | Classifier can't train on all-same labels |
| 9 | Exclude Labels | Use exclude_columns=[LABEL_COLUMN] in create_training_set |
Label included as feature causes inference failure |
| 10 | Label Type Casting | Cast to INT (classification) or DOUBLE (regression) before training | Type mismatch in model output |
| 11 | Double Type Casting | Cast ALL numeric features to DOUBLE in feature tables | MLflow signatures reject DecimalType |
| 12 | Lookup Keys Match PKs | lookup_key MUST match Feature Table primary keys EXACTLY |
Unable to find feature errors |
| 13 | Use fe.score_batch | Use fe.score_batch NOT manual feature joins for inference |
Automatic feature retrieval ensures training-serving consistency |
| 14 | Feature Registry | Query feature table schemas dynamically | Hardcoded feature lists drift out of sync |
| 15 | Custom Inference | Separate task for TF-IDF/NLP models that need runtime features | fe.score_batch() can't compute runtime features |
| 16 | Helper Functions Inline | ALWAYS inline helper functions (don't import modules) | ModuleNotFoundError in serverless Asset Bundle notebooks |
| 17 | Bundle Path Setup | Use sys.path.insert(0, _bundle_root) pattern |
Module imports fail in serverless |
| 18 | Standardized Templates | Copy-and-customize from skill templates (don't roll custom) | Custom implementations miss edge cases |
Core Patterns (Quick Examples)
Experiment Setup
# ✅ CORRECT: Always use /Shared/ path
experiment_name = f"/Shared/{project}_ml_{model_name}"
mlflow.set_experiment(experiment_name)
See: Experiment Patterns for full details
Model Registration with Feature Store
from databricks.feature_engineering import FeatureEngineeringClient
from mlflow.types import ColSpec, DataType, Schema
fe = FeatureEngineeringClient()
mlflow.set_registry_uri("databricks-uc")
# Regression: output_schema = Schema([ColSpec(DataType.double)])
# Classification: output_schema = Schema([ColSpec(DataType.long)])
fe.log_model(
model=model,
artifact_path="model",
flavor=mlflow.sklearn, # REQUIRED
training_set=training_set,
registered_model_name=f"{catalog}.{schema}.{model_name}",
infer_input_example=True,
output_schema=output_schema # REQUIRED for UC
)
See: Model Registry for full patterns by model type
Feature Table Creation with NaN Cleaning
# ✅ CORRECT: Clean NaN/Inf at source for sklearn compatibility
from pyspark.sql.functions import F, isnan
from pyspark.sql.types import DoubleType
def clean_numeric(col_name):
return F.when(
F.col(col_name).isNull() | isnan(F.col(col_name)) |
(F.col(col_name) == float('inf')) | (F.col(col_name) == float('-inf')),
F.lit(0.0)
).otherwise(F.col(col_name))
# Apply to ALL DoubleType columns
for field in df.schema.fields:
if isinstance(field.dataType, DoubleType):
df = df.withColumn(field.name, clean_numeric(field.name))
See: Data Quality Patterns for full patterns
Batch Inference with fe.score_batch
from databricks.feature_engineering import FeatureEngineeringClient
fe = FeatureEngineeringClient()
# ✅ CORRECT: scoring_df has ONLY lookup keys (features auto-retrieved)
scoring_df = spark.table(feature_table).select(*lookup_keys).distinct()
predictions_df = fe.score_batch(
model_uri=model_uri,
df=scoring_df # Contains only lookup keys
)
See: Feature Engineering Workflow for full inference patterns
Asset Bundle Job Configuration
resources:
jobs:
ml_training_job:
environments:
- environment_key: default
spec:
environment_version: "4"
dependencies:
- "mlflow==3.7.0" # Pin exact version
- "xgboost==2.0.3"
tasks:
- task_key: train_model
notebook_task:
notebook_path: ../../src/ml/models/train.py
base_parameters: # NOT parameters with --flags!
catalog: ${var.catalog}
model_name: my_model
# ❌ DO NOT define experiments in Asset Bundle
See: DAB Integration for full patterns
Reference Files
Detailed documentation is organized in the references/ directory:
Experiment Patterns
Complete experiment setup, tracking, metric logging, dataset logging, hyperparameter tuning. Covers /Shared/ vs /Users/ paths, run context requirements, helper function inlining, exit signals, and common errors.
Model Registry
Model registration, versioning, aliases, deployment patterns, serving endpoints. Covers Unity Catalog integration with fe.log_model(), output schema patterns by model type (regression, classification, anomaly detection), signature-driven preprocessing, and model loading from UC. Documents both output_schema (primary) and infer_signature (alternative) approaches.
DAB Integration
Asset Bundle integration, notebook patterns, inline helpers, parameter passing. Covers training and inference job configuration, package version pinning, base_parameters vs argparse, serverless environment setup, and common deployment errors.
Feature Store Patterns
Feature table creation, feature lookup configuration, column conflict resolution, Feature Registry pattern for dynamic schema querying. Covers training set creation, feature lookups, and inference patterns.
Data Quality Patterns
NaN/Inf handling at feature table source, label binarization for XGBoost classifiers, single-class data detection, feature column exclusion. Covers sklearn vs XGBoost compatibility, preprocessing requirements, and training/inference checklists.
Troubleshooting
Comprehensive error reference table, schema verification patterns, SCD2 vs regular dimension table handling, pre-development checklist. Covers common MLflow errors, signature issues, and debugging workflows.
Requirements Template
Fill-in-the-blank requirements template for ML projects. Includes project context (catalog, schemas), feature table inventory (primary keys, features, source tables), model inventory (type, algorithm, label column, label type), and label type reference (regression vs classification vs anomaly detection casting).
Feature Engineering Workflow
End-to-end Feature Engineering workflow with FeatureLookup, create_training_set, fe.log_model, and fe.score_batch. Covers feature table creation with NaN cleaning, training set creation with proper base_df (ONLY keys + label), model logging with embedded feature metadata, and batch inference with automatic feature retrieval.
Scripts
setup_experiment.py
Utility functions for MLflow experiment setup. CRITICAL: These functions should be INLINED in each training notebook, not imported.
Functions:
setup_mlflow_experiment(model_name)- Set up experiment with/Shared/pathlog_training_dataset(spark, catalog, schema, table_name)- Log dataset inside run contextget_run_name(model_name, algorithm, version)- Generate descriptive run namesget_standard_tags(...)- Get standard MLflow tagsget_parameters()- Get job parameters from dbutils widgets (returns 4: catalog, gold_schema, feature_schema, model_name)
Usage:
# Copy these functions into your notebook (don't import)
# See scripts/setup_experiment.py for full implementation
create_feature_tables_template.py
Complete feature table creation template with NaN/Inf cleaning at source, DOUBLE type casting, PK NULL filtering, rolling window aggregations, and proper fe.create_table() calls. Uses 3-parameter get_parameters() (catalog, gold_schema, feature_schema).
Key functions:
get_parameters()- Returns (catalog, gold_schema, feature_schema) — no model_name neededcreate_feature_table(spark, fe, features_df, ...)- Create with NaN cleaning + PK filteringcompute_{domain}_features(spark, catalog, gold_schema)- Domain-specific feature engineeringmain()- Orchestrates schema creation and all feature tables
train_model_template.py
Complete training pipeline template using Feature Engineering with FeatureLookup, create_training_set, and fe.log_model with output_schema. Includes inline helpers, label type casting, label binarization, single-class detection, and proper exit signals.
Key functions:
setup_mlflow_experiment(model_name)- Inlined/Shared/path setupget_parameters()- Returns (catalog, gold_schema, feature_schema, model_name)create_training_set_with_features(...)- FeatureLookup + create_training_setprepare_and_train(training_df, ...)- Data prep with DECIMAL→DOUBLE + train/evallog_model_with_feature_engineering(...)-fe.log_model()withoutput_schemamain()- Full pipeline with error handling + exit signal
batch_inference_template.py
Complete batch inference template using fe.score_batch for automatic feature retrieval. Supports multi-model scoring loop with per-model error isolation and PARTIAL_FAILURE exit signal.
Key functions:
get_parameters()- Returns (catalog, gold_schema, feature_schema)load_model_uri(catalog, feature_schema, model_name)- Get latest model version URIscore_with_feature_engineering(...)-fe.score_batch()+ metadata columns + Delta saverun_inference_for_model(...)- Single-model inference with error isolationmain()- Multi-model loop with summary + PARTIAL_FAILURE handling
Assets
ml-feature-pipeline-job.yaml
Asset Bundle job template for feature table creation. Includes databricks-feature-engineering dependency, notebook_task with base_parameters (catalog, gold_schema, feature_schema), and proper tags.
Usage:
# Copy and customize for your project
# Replace {project} placeholder in notebook_path
ml-training-pipeline-job.yaml
Asset Bundle job template for parallel model training. Includes pinned package versions (mlflow==3.7.0, scikit-learn==1.3.2, xgboost==2.0.3), multiple parallel tasks (one per model), weekly schedule (paused by default), and 4-hour timeout. Does NOT define experiments (experiments created in notebook code).
Usage:
# Copy and customize: add one task_key per model
# Pin package versions to match your training environment
ml-inference-pipeline-job.yaml
Asset Bundle job template for batch inference with fe.score_batch. Includes pinned package versions (MUST match training), daily schedule (paused by default), 2-hour timeout, and proper tags.
Usage:
# Copy and customize for your project
# Package versions MUST match training pipeline exactly
Quick Validation Checklists
Pre-Development
- Verify ALL column names against Gold layer YAML schemas
- Check if dimension tables are SCD2 (has
is_current?) - Confirm data types (DECIMAL, STRING, BOOLEAN, etc.)
- Identify categorical columns needing encoding
- Review Feature Store tables if using feature lookups
- Fill in Requirements Template
MLflow Setup
- Use
/Shared/{project}_ml_{model_name}experiment path - Do NOT define experiments in Asset Bundle
- Inline ALL helper functions (no module imports)
- Set
mlflow.set_registry_uri("databricks-uc")at module level - Add
mlflow.autolog(disable=True)before custom logging
Feature Table Creation
- Cast ALL numeric columns to DOUBLE
- Clean NaN/Inf at source with
clean_numeric()(sklearn compatibility) - Filter NULL primary key rows before writing
- All columns verified against Gold layer schema
- Feature table has descriptive description
- Rolling window aggregations use proper Window specs
Feature Engineering (Training Set)
- Uses
FeatureLookupfor feature retrieval -
base_dfhas ONLY lookup keys + label column -
lookup_keymatches feature table primary keys EXACTLY -
training_setpassed tofe.log_model(embeds feature metadata) - Exclude Labels: Use
exclude_columns=[LABEL_COLUMN]— label MUST NOT be a feature
Training Pipeline
- Log dataset INSIDE
mlflow.start_run()context - Use
fe.log_model()withflavor=mlflow.sklearnandoutput_schema(PRIMARY) -
input_exampleprovided (or useinfer_input_example=True) - Register model with 3-level name:
catalog.schema.model_name - Label column CAST to correct type (INT for classification, DOUBLE for regression)
- All features CAST to float64 before training
- Label Binarization: Binarize 0-1 rates for XGBClassifier
- Single-Class Check: Verify label has multiple classes before training classifier
- Add
dbutils.notebook.exit("SUCCESS")at end - All helper functions inlined (not imported)
Batch Inference
- Uses
fe.score_batchfor automatic feature retrieval - Scoring DataFrame has ONLY lookup keys
- Model URI points to latest version
- Predictions saved with metadata columns (model_name, model_uri, scored_at)
- Verify feature table has no NaN before inference (sklearn compatibility)
- Error handling with proper exit signals (SUCCESS / PARTIAL_FAILURE / FAILED)
- Test with small batch before full inference
Job Configuration
- Use
notebook_taskwithbase_parameters(not argparse) - Pin exact package versions (match training and inference)
- DO NOT define experiments in Asset Bundle
See: Troubleshooting for detailed checklists
Time Estimates
| Task | Duration |
|---|---|
| Feature Tables Setup | 2-3 hours |
| First Model (with FE) | 3-4 hours |
| Additional Models (each) | 1-2 hours |
| Batch Inference Pipeline | 2-3 hours |
| Asset Bundle Configuration | 1 hour |
| Total (5 models) | 10-16 hours |
Version History
v5.0 (February 2026)
- Merged comprehensive implementation guide (12-ml-models-prompt.md)
- Added Quick Start, Architecture, Directory Structure, Time Estimates
- Expanded to 19 non-negotiable rules (merged from 10+16)
- Created 3 complete script templates (feature tables, training, inference)
- Split asset templates into 3 separate job YAMLs
- Added requirements-template.md and feature-engineering-workflow.md
- Replaced hardcoded project names with
{project}placeholders - Resolved model registration conflict (
output_schemaas primary) - Resolved NaN handling (clean at source, not training time)
Future Enhancements (v6.0)
- Model aliases (
@champion/@challenger) for lifecycle management - Hyperparameter tuning (Hyperopt/Optuna + MLflow)
- Feature importance logging and visualization
- Cross-validation patterns (TimeSeriesSplit)
- Prediction monitoring integration with Lakehouse Monitoring
- AutoML baseline patterns
v4.0 (January 14, 2026)
- Restructured to comply with AgentSkills.io specification
- Split into reference files for better organization
- Extracted scripts and templates
v3.0 (January 4, 2026)
- NaN handling at feature table source
- Label binarization patterns
- Single-class data detection
- Feature column exclusion
v2.0 (January 2026)
fe.log_model()andoutput_schemapatterns- Model type to DataType mapping
v1.0 (December 2025)
- Initial patterns from 5 model implementation
Pipeline Progression
Previous stage: monitoring/00-observability-setup → Monitoring, dashboards, and alerts should be configured
Next stage: After completing ML setup, proceed to:
genai-agents/00-genai-agents-setup— Implement GenAI agents with ResponsesAgent, Genie Spaces, and evaluation
Post-Completion: Skill Usage Summary (MANDATORY)
After completing all sections of this orchestrator, output a Skill Usage Summary reflecting what you ACTUALLY did — not a pre-written summary.
What to Include
- Every skill
SKILL.mdorreferences/file you read (via the Read tool), in the order you read them - Which section or step you were in when you read it (e.g., "Feature Tables", "Training", "Inference", "Jobs")
- Whether it was a Common, Reference, or Template file
- A one-line description of what you specifically used it for in this session
Format
| # | Section | Skill / Reference Read | Type | What It Was Used For |
|---|---|---|---|---|
| 1 | Section Name | path/to/SKILL.md |
Common / Reference / Template | One-line description |
Summary Footer
End with:
- Totals: X common skills, Y reference files, Z templates read across N sections
- Models trained: List each model name, type (classification/regression/anomaly), and algorithm
- Skipped: List any skills from the dependency table above that you did NOT need to read, and why (e.g., "section not applicable", "user skipped", "no issues encountered")
- Unplanned: List any skills you read that were NOT listed in the dependency table (e.g., for troubleshooting, edge cases, or user-requested detours)
References
Official Documentation
- FeatureEngineeringClient.log_model API
- MLflow Experiments - Databricks
- MLflow 3.1 LoggedModel
- Unity Catalog Model Registry
- Databricks Feature Store
Feature Engineering
Related Skills
databricks-python-imports- sys.path setup for Asset Bundlesdatabricks-asset-bundles- Infrastructure-as-code patternsdatabricks-autonomous-operations- Troubleshooting: Read when jobs fail — provides Deploy → Poll → Diagnose → Fix → Redeploy autonomous loop, error-solution matrix, and self-healing patterns