03-deduplication
Gold Layer Delta MERGE Deduplication Pattern
Overview
Gold layer merge operations read from Silver and write to Gold using Delta MERGE. This skill standardizes the deduplication pattern required to prevent [DELTA_MULTIPLE_SOURCE_ROW_MATCHING_TARGET_ROW_IN_MERGE] errors.
When to Use This Skill
Use this skill when:
- Merging from Silver to Gold tables
- Silver may contain duplicate business keys due to:
- Incremental DLT streaming ingestion
- Change Data Capture (CDC) patterns
- Historical SCD Type 2 tracking
- Test data generation creating duplicates
- Multiple batch loads
- Encountering
DELTA_MULTIPLE_SOURCE_ROW_MATCHING_TARGET_ROW_IN_MERGEerrors
Critical Rule: Always Deduplicate Before MERGE
Delta MERGE operations REQUIRE unique source keys. Multiple source rows matching the same target row causes ambiguity and fails the merge.
Without deduplication: MERGE fails with error:
[DELTA_MULTIPLE_SOURCE_ROW_MATCHING_TARGET_ROW_IN_MERGE] Cannot perform Merge as multiple source rows matched and attempted to modify the same target row in the Delta table in possibly conflicting ways.
Standard Deduplication Pattern
✅ CORRECT: Deduplicate Before MERGE
def merge_dim_store(spark: SparkSession, catalog: str, silver_schema: str, gold_schema: str):
"""Merge dim_store from Silver to Gold (SCD Type 2)."""
print("Merging dim_store...")
silver_table = f"{catalog}.{silver_schema}.silver_store_dim"
gold_table = f"{catalog}.{gold_schema}.dim_store"
# CRITICAL: Deduplicate Silver source before merging
silver_raw = spark.table(silver_table)
original_count = silver_raw.count()
silver_df = (
silver_raw
.orderBy(col("processed_timestamp").desc()) # Order by timestamp first (latest first)
.dropDuplicates(["store_number"]) # Keep only first occurrence per business key
)
dedupe_count = silver_df.count()
print(f" Deduplicated: {original_count} → {dedupe_count} records ({original_count - dedupe_count} duplicates removed)")
# Prepare Gold updates
updates_df = (
silver_df
.withColumn("store_key", md5(concat_ws("||", col("store_id"), col("processed_timestamp"))))
# ... other transformations
)
# MERGE into Gold
delta_gold = DeltaTable.forName(spark, gold_table)
delta_gold.alias("target").merge(
updates_df.alias("source"),
"target.store_number = source.store_number AND target.is_current = true"
).whenMatchedUpdate(set={
"record_updated_timestamp": "source.record_updated_timestamp"
}).whenNotMatchedInsertAll(
).execute()
record_count = updates_df.count()
print(f"✓ Merged {record_count} records into dim_store")
❌ WRONG: No Deduplication
# ❌ BAD: No deduplication - reads ALL Silver records including duplicates
silver_df = spark.table(silver_table)
# ❌ MERGE will fail if multiple rows have same store_number
delta_gold.alias("target").merge(
updates_df.alias("source"),
"target.store_number = source.store_number"
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
Key Requirements
1. Order by Timestamp Descending
Always order by processed_timestamp DESC before deduplication to keep the LATEST record:
.orderBy(col("processed_timestamp").desc()) # Latest first
Why important:
- Ensures most recent data is kept
- Older versions are dropped
- Consistent with SCD Type 2 patterns
2. Match Deduplication Key to Merge Key
CRITICAL: The column used in .dropDuplicates() MUST match the column in the MERGE condition.
✅ CORRECT: Keys Match
# Deduplicate on product_code
silver_df = (
silver_raw
.orderBy(col("processed_timestamp").desc())
.dropDuplicates(["product_code"])
)
# Merge on product_code (SAME KEY)
delta_gold.alias("target").merge(
updates_df.alias("source"),
"target.product_code = source.product_code"
).execute()
❌ WRONG: Keys Don't Match
# Deduplicate on upc_code
.dropDuplicates(["upc_code"]) # ❌ WRONG KEY
# Merge on product_code (DIFFERENT KEY!)
"target.product_code = source.product_code" # Still has duplicates!
Result: MERGE still fails because multiple upc_code records can have the same product_code.
3. Use dropDuplicates() Not Window Functions
Preferred approach: .dropDuplicates() with ordering
# ✅ SIMPLE and RELIABLE
silver_df = (
spark.table(silver_table)
.orderBy(col("processed_timestamp").desc())
.dropDuplicates(["store_number"])
)
Avoid: Window functions with row_number() - more complex with no benefit.
4. Add Debug Logging
Always log deduplication metrics for visibility:
original_count = silver_raw.count()
dedupe_count = silver_df.count()
print(f" Deduplicated: {original_count} → {dedupe_count} records ({original_count - dedupe_count} duplicates removed)")
Benefits:
- Proves deduplication is working
- Shows magnitude of duplicate problem
- Helps troubleshoot merge failures
- Provides data quality insights
Quick Reference
Standard Pattern Template
silver_raw = spark.table(silver_table)
original_count = silver_raw.count()
silver_df = (
silver_raw
.orderBy(col("processed_timestamp").desc()) # Latest first
.dropDuplicates([business_key]) # Keep first (latest) per business key
)
dedupe_count = silver_df.count()
print(f" Deduplicated: {original_count} → {dedupe_count} records")
# Then proceed with merge
delta_gold.alias("target").merge(
updates_df.alias("source"),
f"target.{business_key} = source.{business_key}" # MUST MATCH deduplication key
).execute()
Core Patterns
Pattern 1: Simple SCD Type 1 (Overwrite)
# Deduplicate on product_code (business key)
silver_df = (
silver_raw
.orderBy(col("processed_timestamp").desc())
.dropDuplicates(["product_code"])
)
delta_gold.alias("target").merge(
updates_df.alias("source"),
"target.product_code = source.product_code"
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
Pattern 2: SCD Type 2 (Historical Tracking)
# Deduplicate on store_number (business key)
silver_df = (
silver_raw
.orderBy(col("processed_timestamp").desc())
.dropDuplicates(["store_number"])
)
delta_gold.alias("target").merge(
updates_df.alias("source"),
"target.store_number = source.store_number AND target.is_current = true"
).whenMatchedUpdate(set={
"record_updated_timestamp": "source.record_updated_timestamp"
}).whenNotMatchedInsertAll().execute()
Pattern 3: Fact Table with Aggregation
# Aggregation handles deduplication, but MERGE key must be composite
daily_sales = (
transactions
.groupBy("store_number", "upc_code", "transaction_date")
.agg(
spark_sum(col("final_sales_price")).alias("net_revenue"),
spark_sum(col("quantity_sold")).alias("net_units")
)
)
delta_gold.alias("target").merge(
daily_sales.alias("source"),
"""target.store_number = source.store_number
AND target.upc_code = source.upc_code
AND target.transaction_date = source.transaction_date"""
).execute()
See references/dedup-patterns.md for complete pattern examples.
Validation Checklist
Before deploying Gold MERGE operations:
- Source data is deduplicated with
.dropDuplicates() - Deduplication key matches MERGE condition key
- Ordered by
processed_timestamp DESC - Debug logging shows deduplication metrics
- Error handling with try/except
- Success/failure messages printed
- Tested with actual Silver data containing duplicates
- MERGE condition uses correct business key
Common Mistakes to Avoid
❌ Mistake 1: No Deduplication
# BAD
silver_df = spark.table(silver_table)
❌ Mistake 2: Wrong Deduplication Key
# BAD - Deduplicate on upc_code but merge on product_code
.dropDuplicates(["upc_code"])
# ... later ...
"target.product_code = source.product_code"
❌ Mistake 3: No Ordering
# BAD - Which duplicate to keep is non-deterministic
.dropDuplicates(["store_number"]) # No orderBy!
✅ CORRECT: Simple and Reliable
# GOOD
silver_df = (
spark.table(silver_table)
.orderBy(col("processed_timestamp").desc())
.dropDuplicates(["store_number"])
)
Performance Considerations
1. Deduplicate Before Transformations
Deduplicate BEFORE applying transformations to reduce compute:
# ✅ GOOD - Deduplicate first, then transform
silver_df = (
spark.table(silver_table)
.orderBy(col("processed_timestamp").desc())
.dropDuplicates(["store_number"]) # Reduces data size
)
updates_df = apply_transformations(silver_df) # Works on smaller dataset
2. Partition Pruning
If Silver tables are partitioned, leverage partition pruning:
silver_df = (
spark.table(silver_table)
.filter(col("transaction_date") >= "2025-01-01") # Partition pruning
.orderBy(col("processed_timestamp").desc())
.dropDuplicates(["store_number"])
)
Reference Files
references/dedup-patterns.md- Detailed deduplication SQL patterns, generic reusable functions, complete scenario examples, and troubleshooting patterns
Related Patterns
- Merge Schema Validation - See
gold/pipeline-workers/05-schema-validationskill - Gold Layer Merge Patterns - See
gold/pipeline-workers/02-merge-patternsskill
References
- Delta Lake MERGE - Official documentation
- PySpark dropDuplicates
Key Takeaway
Delta MERGE operations require unique source keys. ALWAYS deduplicate Silver data before merging to Gold using
.orderBy(col("processed_timestamp").desc()).dropDuplicates([business_key])to prevent[DELTA_MULTIPLE_SOURCE_ROW_MATCHING_TARGET_ROW_IN_MERGE]errors.
This pattern is mandatory for all Gold layer MERGE operations.
Inputs
- Silver DataFrame with potential duplicates (from
spark.table(silver_table)) - Business key columns from YAML (
meta["business_key"]) - Ordering column for recency (typically
processed_timestamp)
Outputs
- Deduplicated DataFrame ready for MERGE (one row per business key)
- Deduplication statistics: original count, deduplicated count, duplicates removed
Pipeline Notes to Carry Forward
- Deduplication key MUST match MERGE condition key (both from YAML
business_key) - Always order by
processed_timestampdescending to keep latest record - Run deduplication BEFORE grain validation and schema validation
- If deduplication removes 0 rows, Silver source is already clean (still run for safety)
Next Step
Pass deduplicated DataFrame to pipeline-workers/04-grain-validation (for fact tables) or directly to MERGE (for dimensions).