skills/smithery.ai/data-pipeline-operations

data-pipeline-operations

SKILL.md

Data Pipeline Operations Skill

This skill provides guidance for working with Landbruget.dk's data pipelines following the medallion architecture.

Activation Context

This skill activates when:

  • Running or debugging data pipelines
  • Working with GCS (Google Cloud Storage)
  • Handling data transformations (Bronze → Silver → Gold)
  • Validating Danish identifiers (CVR, CHR, BFE)
  • Working with geospatial data (GeoPandas, PostGIS)

Environment Setup

ALWAYS start with:

cd backend
source venv/bin/activate

Verify environment:

python -c "import geopandas, supabase; print('Environment OK')"

Data Processing Philosophy

PREFER DuckDB over Pandas:

  • DuckDB queries files directly without memory limits
  • Much faster for large datasets
  • Use SQL instead of DataFrame operations
  • Only use Pandas for final small result sets or when GeoPandas is required

CRS Strategy

Process in EPSG:25832, transform to EPSG:4326 only at final Supabase upload.

EPSG Name Use
25832 UTM 32N Processing (Bronze/Silver/Gold)
4326 WGS84 Final storage (Supabase only)

This eliminates unnecessary transforms:

  • ❌ Old: Source(25832) → Silver(4326) → Gold(25832 for calc) → Supabase(4326) = 2-3 transforms
  • ✅ New: Source(25832) → Process(25832) → Supabase(4326) = 1 transform

Medallion Architecture

Bronze Layer (Raw Data)

  • Purpose: Preserve data exactly as received
  • Location: gs://landbruget-data/bronze/<source>/<date>/
  • CRS: Keep native (usually EPSG:25832 from Danish WFS sources)
  • Rules:
    • Never modify raw data or geometry
    • Add metadata: _fetch_timestamp, _source, _source_crs
    • Use Parquet format
    • Immutable - never overwrite
# Track source CRS in metadata
_source_crs = detect_crs_from_response(wfs_capabilities)  # e.g., "EPSG:25832"

Silver Layer (Cleaned Data)

  • Purpose: Clean, validate, standardize
  • CRS: Keep EPSG:25832 (no transformation yet!)
  • Transformations:
    • Type coercion (dates, numbers)
    • CVR formatting: 8 digits, zero-padded
    • CHR formatting: 6 digits
    • Transform non-25832 sources (DAGI, H3) to EPSG:25832 here
    • Deduplication
    • Null handling
# Only transform sources that aren't already EPSG:25832
if source_crs != "EPSG:25832":
    ST_Transform(geometry, source_crs, 'EPSG:25832')

Gold Layer (Analysis-Ready)

  • Purpose: Enriched, joined datasets
  • CRS: Keep EPSG:25832 for processing (area/buffer/distance work natively!)
  • Operations:
    • Join multiple sources on CVR/CHR/BFE
    • Calculate derived metrics (meters work directly!)
    • Aggregate by company/farm
    • Transform to EPSG:4326 only at final Supabase upload
# Area/buffer/distance work directly in EPSG:25832 - no transforms needed!
ST_Area(geometry) / 10000  # hectares (geometry already in meters)
ST_Buffer(geometry, 1000)  # 1km buffer (meters work directly)

# Transform ONCE at final Supabase upload
ST_Transform(geometry, 'EPSG:25832', 'EPSG:4326')

Data Quality Validation

CVR Number (Company ID)

import re

def validate_cvr(cvr: str) -> bool:
    """CVR must be 8 digits."""
    return bool(re.match(r'^\d{8}$', str(cvr).zfill(8)))

# Format CVR
df['cvr'] = df['cvr'].astype(str).str.zfill(8)

CHR Number (Herd ID)

def validate_chr(chr_num: str) -> bool:
    """CHR must be 6 digits."""
    return bool(re.match(r'^\d{6}$', str(chr_num)))

Geospatial CRS

import geopandas as gpd

# Danish data comes in EPSG:25832 (UTM zone 32N) - keep it there!
# Only convert to EPSG:4326 at final Supabase upload
gdf_for_supabase = gdf.to_crs('EPSG:4326')

Buffer/Distance in DuckDB

With EPSG:25832, buffer/distance work natively in meters!

-- EPSG:25832 data - buffer works directly in meters ✓
ST_Buffer(geometry, 1000)  -- 1km buffer

-- Area calculation works directly in square meters
ST_Area(geometry) / 10000  -- hectares

If working with EPSG:4326 data (avoid when possible):

from common.crs_utils import sql_buffer_meters, sql_intersects_with_buffer_meters

# These helpers transform to UTM internally
buffer_sql = sql_buffer_meters("geometry", 100)  # 100 meters
intersect_sql = sql_intersects_with_buffer_meters("a.geom", "b.geom", 1000)  # 1km

GCS Operations

Bucket: Set via GCS_BUCKET environment variable (see .env)

Upload to GCS with DuckDB

import os
from google.cloud import storage
import duckdb
import io

def upload_to_gcs_duckdb(query: str, gcs_path: str, bucket_name: str = None):
    """Query with DuckDB and upload directly to GCS."""
    bucket_name = bucket_name or os.environ.get('GCS_BUCKET')

    # Execute query and get result
    result = duckdb.query(query)

    # Export to parquet buffer
    buffer = io.BytesIO()
    result.write_parquet(buffer)
    buffer.seek(0)

    # Upload to GCS
    client = storage.Client()
    bucket = client.bucket(bucket_name)
    blob = bucket.blob(gcs_path)
    blob.upload_from_file(buffer, content_type='application/octet-stream')

# Example usage
upload_to_gcs_duckdb(
    "SELECT * FROM 'input.csv' WHERE cvr_number ~ '^\\d{8}$'",
    "silver/cleaned_data.parquet"
)

Query Files Directly from GCS with DuckDB

import os
import duckdb

# Install and load httpfs extension
duckdb.execute("INSTALL httpfs")
duckdb.execute("LOAD httpfs")

bucket = os.environ.get('GCS_BUCKET')

# Query parquet directly from GCS
result = duckdb.query(f"""
    SELECT cvr_number, SUM(area_ha) as total_area
    FROM 'gs://{bucket}/silver/fields.parquet'
    GROUP BY cvr_number
""").df()

# For authenticated access, set credentials first
duckdb.execute(f"SET gcs_access_key_id='{key_id}'")
duckdb.execute(f"SET gcs_secret_access_key='{secret}'")

Running Pipelines

Standard Pipeline Execution

cd backend
source venv/bin/activate
cd pipelines/<pipeline_name>
python main.py

Common Pipelines

Pipeline Purpose Frequency
unified_pipeline 18+ Danish govt sources Weekly
chr_pipeline Livestock tracking Weekly
svineflytning_pipeline Pig movements Weekly
drive_data_pipeline Regulatory compliance On-demand

DuckDB for Large Files

DuckDB is excellent for querying large files without loading into memory:

import duckdb

# Query CSV directly
result = duckdb.query("""
    SELECT cvr_number, SUM(area_ha) as total_area
    FROM 'large_file.csv'
    WHERE date >= '2024-01-01'
    GROUP BY cvr_number
""").df()

# Query Parquet files
result = duckdb.query("""
    SELECT *
    FROM 'data.parquet'
    WHERE cvr_number = '12345678'
""").df()

# Join multiple files
result = duckdb.query("""
    SELECT a.*, b.name
    FROM 'fields.parquet' a
    JOIN 'companies.csv' b ON a.cvr_number = b.cvr_number
    WHERE a.area_ha > 100
""").df()

# Aggregate on large datasets
result = duckdb.query("""
    SELECT
        cvr_number,
        COUNT(*) as field_count,
        SUM(area_ha) as total_area,
        AVG(area_ha) as avg_area
    FROM 'fields.parquet'
    GROUP BY cvr_number
    HAVING total_area > 1000
""").df()

DuckDB Advantages

  • No memory limits: Queries files directly without loading
  • SQL interface: Use familiar SQL syntax
  • Fast: Highly optimized columnar engine
  • Multiple formats: CSV, Parquet, JSON
  • Joins: Combine multiple files efficiently

Troubleshooting

"Module not found"

cd backend
source venv/bin/activate
pip install -e .

GCS Authentication

export GOOGLE_APPLICATION_CREDENTIALS="/path/to/service-account.json"

Memory Issues

ALWAYS use DuckDB for large files - avoid Pandas:

# ✅ CORRECT: Use DuckDB
import duckdb
result = duckdb.query("""
    SELECT cvr_number, area_ha
    FROM 'large.csv'
    WHERE condition
""").df()

# ❌ AVOID: Pandas chunking (slow, complex)
# for chunk in pd.read_csv('large.csv', chunksize=10000):
#     process(chunk)

# ❌ AVOID: Pandas column selection (still loads into memory)
# df = pd.read_csv('large.csv', usecols=['cvr_number', 'area_ha'])

When to Use Pandas vs DuckDB

Use DuckDB (preferred):

  • Reading CSV/Parquet files
  • Filtering, aggregating, joining data
  • Any operation on data > 1GB
  • Transformations that can be expressed in SQL

Use Pandas only when:

  • Working with GeoPandas (spatial operations)
  • Final result set is small (<100MB)
  • Need very specific Python operations unavailable in SQL

Use GeoPandas only for:

  • Geometry operations (ST_Transform, ST_Within, etc.)
  • Spatial joins
  • CRS transformations

Quality Checklist

Before marking pipeline work complete:

  • Bronze data preserved unchanged (native CRS, usually EPSG:25832)
  • Silver data cleaned and validated (EPSG:25832)
  • Gold data uploaded to Supabase (transformed to EPSG:4326 at upload)
  • CVR/CHR/BFE formats validated
  • No duplicate records
  • Tests pass: pytest tests/
Weekly Installs
1
First Seen
4 days ago
Installed on
codex1