skills/dexploarer/claudius-skills/data-cleaning-pipeline-generator

data-cleaning-pipeline-generator

SKILL.md

Data Cleaning Pipeline Generator

Generates comprehensive data cleaning and preprocessing pipelines using pandas, polars, or PySpark with best practices for handling messy data.

When to Use

  • "Clean my dataset"
  • "Generate data cleaning pipeline"
  • "Handle missing values"
  • "Remove duplicates"
  • "Fix data types"
  • "Detect and remove outliers"

Instructions

1. Analyze Dataset

import pandas as pd

# Load data
df = pd.read_csv('data.csv')

# Basic info
print(df.info())
print(df.describe())
print(df.head())

# Check for issues
print("\nMissing values:")
print(df.isnull().sum())

print("\nDuplicates:")
print(f"Total duplicates: {df.duplicated().sum()}")

print("\nData types:")
print(df.dtypes)

2. Generate Pandas Cleaning Pipeline

Complete Pipeline:

import pandas as pd
import numpy as np
from datetime import datetime

class DataCleaningPipeline:
    """Data cleaning pipeline for pandas DataFrames."""

    def __init__(self, df: pd.DataFrame):
        self.df = df.copy()
        self.original_shape = df.shape
        self.cleaning_log = []

    def log(self, message: str):
        """Log cleaning steps."""
        self.cleaning_log.append(f"[{datetime.now()}] {message}")
        print(message)

    def remove_duplicates(self, subset=None, keep='first'):
        """Remove duplicate rows."""
        before = len(self.df)
        self.df = self.df.drop_duplicates(subset=subset, keep=keep)
        removed = before - len(self.df)
        self.log(f"Removed {removed} duplicate rows")
        return self

    def handle_missing_values(self, strategy='auto'):
        """Handle missing values based on strategy."""
        missing = self.df.isnull().sum()
        columns_with_missing = missing[missing > 0]

        for col in columns_with_missing.index:
            missing_pct = (missing[col] / len(self.df)) * 100

            if missing_pct > 50:
                self.log(f"Dropping column '{col}' ({missing_pct:.1f}% missing)")
                self.df = self.df.drop(columns=[col])
                continue

            if self.df[col].dtype in ['int64', 'float64']:
                # Numeric columns
                if strategy == 'mean':
                    fill_value = self.df[col].mean()
                elif strategy == 'median':
                    fill_value = self.df[col].median()
                else:  # auto
                    fill_value = self.df[col].median()
                self.df[col] = self.df[col].fillna(fill_value)
                self.log(f"Filled '{col}' with {strategy}: {fill_value:.2f}")
            else:
                # Categorical columns
                if strategy == 'mode':
                    fill_value = self.df[col].mode()[0]
                else:  # auto
                    fill_value = 'Unknown'
                self.df[col] = self.df[col].fillna(fill_value)
                self.log(f"Filled '{col}' with: {fill_value}")

        return self

    def fix_data_types(self, type_mapping=None):
        """Convert columns to appropriate data types."""
        if type_mapping is None:
            type_mapping = {}

        for col in self.df.columns:
            if col in type_mapping:
                try:
                    self.df[col] = self.df[col].astype(type_mapping[col])
                    self.log(f"Converted '{col}' to {type_mapping[col]}")
                except Exception as e:
                    self.log(f"Failed to convert '{col}': {e}")
            else:
                # Auto-detect dates
                if 'date' in col.lower() or 'time' in col.lower():
                    try:
                        self.df[col] = pd.to_datetime(self.df[col])
                        self.log(f"Converted '{col}' to datetime")
                    except:
                        pass

        return self

    def remove_outliers(self, columns=None, method='iqr', threshold=1.5):
        """Remove outliers using IQR or Z-score method."""
        if columns is None:
            columns = self.df.select_dtypes(include=[np.number]).columns

        before = len(self.df)

        for col in columns:
            if method == 'iqr':
                Q1 = self.df[col].quantile(0.25)
                Q3 = self.df[col].quantile(0.75)
                IQR = Q3 - Q1
                lower = Q1 - threshold * IQR
                upper = Q3 + threshold * IQR
                mask = (self.df[col] >= lower) & (self.df[col] <= upper)
            else:  # z-score
                z_scores = np.abs((self.df[col] - self.df[col].mean()) / self.df[col].std())
                mask = z_scores < threshold

            self.df = self.df[mask]

        removed = before - len(self.df)
        self.log(f"Removed {removed} outlier rows using {method} method")
        return self

    def normalize_text(self, columns=None):
        """Normalize text columns (lowercase, strip whitespace)."""
        if columns is None:
            columns = self.df.select_dtypes(include=['object']).columns

        for col in columns:
            self.df[col] = self.df[col].str.strip().str.lower()
            self.log(f"Normalized text in '{col}'")

        return self

    def encode_categorical(self, columns=None, method='label'):
        """Encode categorical variables."""
        if columns is None:
            columns = self.df.select_dtypes(include=['object']).columns

        for col in columns:
            if method == 'label':
                self.df[col] = pd.Categorical(self.df[col]).codes
                self.log(f"Label encoded '{col}'")
            elif method == 'onehot':
                dummies = pd.get_dummies(self.df[col], prefix=col)
                self.df = pd.concat([self.df.drop(columns=[col]), dummies], axis=1)
                self.log(f"One-hot encoded '{col}'")

        return self

    def validate_ranges(self, range_checks):
        """Validate numeric columns are within expected ranges."""
        for col, (min_val, max_val) in range_checks.items():
            invalid = ((self.df[col] < min_val) | (self.df[col] > max_val)).sum()
            if invalid > 0:
                self.log(f"WARNING: {invalid} values in '{col}' outside range [{min_val}, {max_val}]")
                # Remove invalid rows
                self.df = self.df[(self.df[col] >= min_val) & (self.df[col] <= max_val)]

        return self

    def generate_report(self):
        """Generate cleaning report."""
        report = f"""
Data Cleaning Report
====================

Original Shape: {self.original_shape}
Final Shape: {self.df.shape}
Rows Removed: {self.original_shape[0] - self.df.shape[0]}
Columns Removed: {self.original_shape[1] - self.df.shape[1]}

Cleaning Steps:
"""
        for step in self.cleaning_log:
            report += f"  - {step}\n"

        return report

    def get_cleaned_data(self):
        """Return cleaned DataFrame."""
        return self.df


# Usage
pipeline = DataCleaningPipeline(df)
cleaned_df = (
    pipeline
    .remove_duplicates()
    .handle_missing_values(strategy='auto')
    .fix_data_types()
    .remove_outliers(method='iqr', threshold=1.5)
    .normalize_text()
    .validate_ranges({'age': (0, 120), 'price': (0, 1000000)})
    .get_cleaned_data()
)

print(pipeline.generate_report())
cleaned_df.to_csv('cleaned_data.csv', index=False)

3. Polars Pipeline (Faster for Large Data)

import polars as pl

# Load data
df = pl.read_csv('data.csv')

# Cleaning pipeline
cleaned_df = (
    df
    # Remove duplicates
    .unique()
    # Handle missing values
    .with_columns([
        pl.col('age').fill_null(pl.col('age').median()),
        pl.col('name').fill_null('Unknown'),
    ])
    # Fix data types
    .with_columns([
        pl.col('date').str.strptime(pl.Date, '%Y-%m-%d'),
        pl.col('amount').cast(pl.Float64),
    ])
    # Remove outliers
    .filter(
        (pl.col('age') >= 0) & (pl.col('age') <= 120)
    )
    # Normalize text
    .with_columns([
        pl.col('name').str.to_lowercase().str.strip(),
        pl.col('email').str.to_lowercase(),
    ])
)

# Save
cleaned_df.write_csv('cleaned_data.csv')

4. PySpark Pipeline (For Big Data)

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, mean, trim, lower
from pyspark.sql.types import IntegerType, DoubleType

spark = SparkSession.builder.appName('DataCleaning').getOrCreate()

# Load data
df = spark.read.csv('data.csv', header=True, inferSchema=True)

# Cleaning pipeline
cleaned_df = (
    df
    # Remove duplicates
    .dropDuplicates()
    # Handle missing values
    .fillna({
        'age': df.select(mean('age')).collect()[0][0],
        'name': 'Unknown',
    })
    # Fix data types
    .withColumn('age', col('age').cast(IntegerType()))
    .withColumn('amount', col('amount').cast(DoubleType()))
    # Remove outliers
    .filter((col('age') >= 0) & (col('age') <= 120))
    # Normalize text
    .withColumn('name', trim(lower(col('name'))))
    .withColumn('email', trim(lower(col('email'))))
)

# Save
cleaned_df.write.csv('cleaned_data', header=True, mode='overwrite')

5. Data Quality Checks

def data_quality_checks(df):
    """Run comprehensive data quality checks."""
    report = []

    # Check 1: Missing values
    missing = df.isnull().sum()
    if missing.sum() > 0:
        report.append(f"⚠️  Missing values found:\n{missing[missing > 0]}")

    # Check 2: Duplicates
    duplicates = df.duplicated().sum()
    if duplicates > 0:
        report.append(f"⚠️  {duplicates} duplicate rows found")

    # Check 3: Data types
    expected_types = {
        'age': 'int64',
        'amount': 'float64',
        'date': 'datetime64[ns]',
    }
    for col, expected in expected_types.items():
        if col in df.columns and df[col].dtype != expected:
            report.append(f"⚠️  Column '{col}' has type {df[col].dtype}, expected {expected}")

    # Check 4: Value ranges
    if 'age' in df.columns:
        invalid_age = ((df['age'] < 0) | (df['age'] > 120)).sum()
        if invalid_age > 0:
            report.append(f"⚠️  {invalid_age} invalid age values")

    # Check 5: Unique identifiers
    if 'id' in df.columns:
        if df['id'].duplicated().any():
            report.append(f"⚠️  Duplicate IDs found")

    # Check 6: Consistency
    if 'email' in df.columns:
        invalid_email = ~df['email'].str.contains('@', na=False)
        if invalid_email.sum() > 0:
            report.append(f"⚠️  {invalid_email.sum()} invalid email addresses")

    if report:
        print("Data Quality Issues:")
        for issue in report:
            print(issue)
    else:
        print("✅ All quality checks passed!")

    return len(report) == 0

# Run checks
data_quality_checks(cleaned_df)

6. Automated Cleaning Function

def auto_clean_dataframe(df, config=None):
    """Automatically clean DataFrame with sensible defaults."""
    if config is None:
        config = {
            'remove_duplicates': True,
            'handle_missing': True,
            'remove_outliers': True,
            'fix_types': True,
            'normalize_text': True,
        }

    print(f"Original shape: {df.shape}")

    if config['remove_duplicates']:
        df = df.drop_duplicates()
        print(f"After removing duplicates: {df.shape}")

    if config['handle_missing']:
        # Numeric: fill with median
        numeric_cols = df.select_dtypes(include=[np.number]).columns
        for col in numeric_cols:
            df[col] = df[col].fillna(df[col].median())

        # Categorical: fill with mode or 'Unknown'
        categorical_cols = df.select_dtypes(include=['object']).columns
        for col in categorical_cols:
            if df[col].mode().empty:
                df[col] = df[col].fillna('Unknown')
            else:
                df[col] = df[col].fillna(df[col].mode()[0])
        print(f"After handling missing values: {df.shape}")

    if config['remove_outliers']:
        numeric_cols = df.select_dtypes(include=[np.number]).columns
        for col in numeric_cols:
            Q1 = df[col].quantile(0.25)
            Q3 = df[col].quantile(0.75)
            IQR = Q3 - Q1
            df = df[(df[col] >= Q1 - 1.5 * IQR) & (df[col] <= Q3 + 1.5 * IQR)]
        print(f"After removing outliers: {df.shape}")

    if config['normalize_text']:
        text_cols = df.select_dtypes(include=['object']).columns
        for col in text_cols:
            df[col] = df[col].str.strip().str.lower()

    return df

# Usage
cleaned_df = auto_clean_dataframe(df)

7. Save Pipeline Configuration

# data_cleaning_config.yaml
cleaning_pipeline:
  remove_duplicates:
    enabled: true
    subset: ['id', 'email']
    keep: 'first'

  missing_values:
    strategy: auto
    drop_threshold: 50  # Drop columns with >50% missing
    numeric_fill: median
    categorical_fill: mode

  outliers:
    method: iqr
    threshold: 1.5
    columns: ['age', 'price', 'quantity']

  data_types:
    age: int64
    price: float64
    date: datetime64
    email: string

  text_normalization:
    lowercase: true
    strip_whitespace: true
    remove_special_chars: false

  validation:
    ranges:
      age: [0, 120]
      price: [0, 1000000]
    required_columns: ['id', 'name', 'email']

Best Practices

DO:

  • Always keep original data
  • Log all cleaning steps
  • Validate data quality
  • Handle missing values appropriately
  • Remove duplicates early
  • Check for outliers
  • Validate data types
  • Document assumptions

DON'T:

  • Delete original data
  • Fill all missing with zeros
  • Ignore outliers
  • Mix data types
  • Skip validation
  • Overfit to training data
  • Remove too many rows
  • Forget to save cleaned data

Checklist

  • Loaded and inspected data
  • Removed duplicates
  • Handled missing values
  • Fixed data types
  • Removed/handled outliers
  • Normalized text fields
  • Validated data quality
  • Generated cleaning report
  • Saved cleaned data
Weekly Installs
1
GitHub Stars
4
First Seen
6 days ago
Installed on
zencoder1
amp1
cline1
openclaw1
opencode1
cursor1