data-cleaning-pipeline-generator
SKILL.md
Data Cleaning Pipeline Generator
Generates comprehensive data cleaning and preprocessing pipelines using pandas, polars, or PySpark with best practices for handling messy data.
When to Use
- "Clean my dataset"
- "Generate data cleaning pipeline"
- "Handle missing values"
- "Remove duplicates"
- "Fix data types"
- "Detect and remove outliers"
Instructions
1. Analyze Dataset
import pandas as pd
# Load data
df = pd.read_csv('data.csv')
# Basic info
print(df.info())
print(df.describe())
print(df.head())
# Check for issues
print("\nMissing values:")
print(df.isnull().sum())
print("\nDuplicates:")
print(f"Total duplicates: {df.duplicated().sum()}")
print("\nData types:")
print(df.dtypes)
2. Generate Pandas Cleaning Pipeline
Complete Pipeline:
import pandas as pd
import numpy as np
from datetime import datetime
class DataCleaningPipeline:
"""Data cleaning pipeline for pandas DataFrames."""
def __init__(self, df: pd.DataFrame):
self.df = df.copy()
self.original_shape = df.shape
self.cleaning_log = []
def log(self, message: str):
"""Log cleaning steps."""
self.cleaning_log.append(f"[{datetime.now()}] {message}")
print(message)
def remove_duplicates(self, subset=None, keep='first'):
"""Remove duplicate rows."""
before = len(self.df)
self.df = self.df.drop_duplicates(subset=subset, keep=keep)
removed = before - len(self.df)
self.log(f"Removed {removed} duplicate rows")
return self
def handle_missing_values(self, strategy='auto'):
"""Handle missing values based on strategy."""
missing = self.df.isnull().sum()
columns_with_missing = missing[missing > 0]
for col in columns_with_missing.index:
missing_pct = (missing[col] / len(self.df)) * 100
if missing_pct > 50:
self.log(f"Dropping column '{col}' ({missing_pct:.1f}% missing)")
self.df = self.df.drop(columns=[col])
continue
if self.df[col].dtype in ['int64', 'float64']:
# Numeric columns
if strategy == 'mean':
fill_value = self.df[col].mean()
elif strategy == 'median':
fill_value = self.df[col].median()
else: # auto
fill_value = self.df[col].median()
self.df[col] = self.df[col].fillna(fill_value)
self.log(f"Filled '{col}' with {strategy}: {fill_value:.2f}")
else:
# Categorical columns
if strategy == 'mode':
fill_value = self.df[col].mode()[0]
else: # auto
fill_value = 'Unknown'
self.df[col] = self.df[col].fillna(fill_value)
self.log(f"Filled '{col}' with: {fill_value}")
return self
def fix_data_types(self, type_mapping=None):
"""Convert columns to appropriate data types."""
if type_mapping is None:
type_mapping = {}
for col in self.df.columns:
if col in type_mapping:
try:
self.df[col] = self.df[col].astype(type_mapping[col])
self.log(f"Converted '{col}' to {type_mapping[col]}")
except Exception as e:
self.log(f"Failed to convert '{col}': {e}")
else:
# Auto-detect dates
if 'date' in col.lower() or 'time' in col.lower():
try:
self.df[col] = pd.to_datetime(self.df[col])
self.log(f"Converted '{col}' to datetime")
except:
pass
return self
def remove_outliers(self, columns=None, method='iqr', threshold=1.5):
"""Remove outliers using IQR or Z-score method."""
if columns is None:
columns = self.df.select_dtypes(include=[np.number]).columns
before = len(self.df)
for col in columns:
if method == 'iqr':
Q1 = self.df[col].quantile(0.25)
Q3 = self.df[col].quantile(0.75)
IQR = Q3 - Q1
lower = Q1 - threshold * IQR
upper = Q3 + threshold * IQR
mask = (self.df[col] >= lower) & (self.df[col] <= upper)
else: # z-score
z_scores = np.abs((self.df[col] - self.df[col].mean()) / self.df[col].std())
mask = z_scores < threshold
self.df = self.df[mask]
removed = before - len(self.df)
self.log(f"Removed {removed} outlier rows using {method} method")
return self
def normalize_text(self, columns=None):
"""Normalize text columns (lowercase, strip whitespace)."""
if columns is None:
columns = self.df.select_dtypes(include=['object']).columns
for col in columns:
self.df[col] = self.df[col].str.strip().str.lower()
self.log(f"Normalized text in '{col}'")
return self
def encode_categorical(self, columns=None, method='label'):
"""Encode categorical variables."""
if columns is None:
columns = self.df.select_dtypes(include=['object']).columns
for col in columns:
if method == 'label':
self.df[col] = pd.Categorical(self.df[col]).codes
self.log(f"Label encoded '{col}'")
elif method == 'onehot':
dummies = pd.get_dummies(self.df[col], prefix=col)
self.df = pd.concat([self.df.drop(columns=[col]), dummies], axis=1)
self.log(f"One-hot encoded '{col}'")
return self
def validate_ranges(self, range_checks):
"""Validate numeric columns are within expected ranges."""
for col, (min_val, max_val) in range_checks.items():
invalid = ((self.df[col] < min_val) | (self.df[col] > max_val)).sum()
if invalid > 0:
self.log(f"WARNING: {invalid} values in '{col}' outside range [{min_val}, {max_val}]")
# Remove invalid rows
self.df = self.df[(self.df[col] >= min_val) & (self.df[col] <= max_val)]
return self
def generate_report(self):
"""Generate cleaning report."""
report = f"""
Data Cleaning Report
====================
Original Shape: {self.original_shape}
Final Shape: {self.df.shape}
Rows Removed: {self.original_shape[0] - self.df.shape[0]}
Columns Removed: {self.original_shape[1] - self.df.shape[1]}
Cleaning Steps:
"""
for step in self.cleaning_log:
report += f" - {step}\n"
return report
def get_cleaned_data(self):
"""Return cleaned DataFrame."""
return self.df
# Usage
pipeline = DataCleaningPipeline(df)
cleaned_df = (
pipeline
.remove_duplicates()
.handle_missing_values(strategy='auto')
.fix_data_types()
.remove_outliers(method='iqr', threshold=1.5)
.normalize_text()
.validate_ranges({'age': (0, 120), 'price': (0, 1000000)})
.get_cleaned_data()
)
print(pipeline.generate_report())
cleaned_df.to_csv('cleaned_data.csv', index=False)
3. Polars Pipeline (Faster for Large Data)
import polars as pl
# Load data
df = pl.read_csv('data.csv')
# Cleaning pipeline
cleaned_df = (
df
# Remove duplicates
.unique()
# Handle missing values
.with_columns([
pl.col('age').fill_null(pl.col('age').median()),
pl.col('name').fill_null('Unknown'),
])
# Fix data types
.with_columns([
pl.col('date').str.strptime(pl.Date, '%Y-%m-%d'),
pl.col('amount').cast(pl.Float64),
])
# Remove outliers
.filter(
(pl.col('age') >= 0) & (pl.col('age') <= 120)
)
# Normalize text
.with_columns([
pl.col('name').str.to_lowercase().str.strip(),
pl.col('email').str.to_lowercase(),
])
)
# Save
cleaned_df.write_csv('cleaned_data.csv')
4. PySpark Pipeline (For Big Data)
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, mean, trim, lower
from pyspark.sql.types import IntegerType, DoubleType
spark = SparkSession.builder.appName('DataCleaning').getOrCreate()
# Load data
df = spark.read.csv('data.csv', header=True, inferSchema=True)
# Cleaning pipeline
cleaned_df = (
df
# Remove duplicates
.dropDuplicates()
# Handle missing values
.fillna({
'age': df.select(mean('age')).collect()[0][0],
'name': 'Unknown',
})
# Fix data types
.withColumn('age', col('age').cast(IntegerType()))
.withColumn('amount', col('amount').cast(DoubleType()))
# Remove outliers
.filter((col('age') >= 0) & (col('age') <= 120))
# Normalize text
.withColumn('name', trim(lower(col('name'))))
.withColumn('email', trim(lower(col('email'))))
)
# Save
cleaned_df.write.csv('cleaned_data', header=True, mode='overwrite')
5. Data Quality Checks
def data_quality_checks(df):
"""Run comprehensive data quality checks."""
report = []
# Check 1: Missing values
missing = df.isnull().sum()
if missing.sum() > 0:
report.append(f"⚠️ Missing values found:\n{missing[missing > 0]}")
# Check 2: Duplicates
duplicates = df.duplicated().sum()
if duplicates > 0:
report.append(f"⚠️ {duplicates} duplicate rows found")
# Check 3: Data types
expected_types = {
'age': 'int64',
'amount': 'float64',
'date': 'datetime64[ns]',
}
for col, expected in expected_types.items():
if col in df.columns and df[col].dtype != expected:
report.append(f"⚠️ Column '{col}' has type {df[col].dtype}, expected {expected}")
# Check 4: Value ranges
if 'age' in df.columns:
invalid_age = ((df['age'] < 0) | (df['age'] > 120)).sum()
if invalid_age > 0:
report.append(f"⚠️ {invalid_age} invalid age values")
# Check 5: Unique identifiers
if 'id' in df.columns:
if df['id'].duplicated().any():
report.append(f"⚠️ Duplicate IDs found")
# Check 6: Consistency
if 'email' in df.columns:
invalid_email = ~df['email'].str.contains('@', na=False)
if invalid_email.sum() > 0:
report.append(f"⚠️ {invalid_email.sum()} invalid email addresses")
if report:
print("Data Quality Issues:")
for issue in report:
print(issue)
else:
print("✅ All quality checks passed!")
return len(report) == 0
# Run checks
data_quality_checks(cleaned_df)
6. Automated Cleaning Function
def auto_clean_dataframe(df, config=None):
"""Automatically clean DataFrame with sensible defaults."""
if config is None:
config = {
'remove_duplicates': True,
'handle_missing': True,
'remove_outliers': True,
'fix_types': True,
'normalize_text': True,
}
print(f"Original shape: {df.shape}")
if config['remove_duplicates']:
df = df.drop_duplicates()
print(f"After removing duplicates: {df.shape}")
if config['handle_missing']:
# Numeric: fill with median
numeric_cols = df.select_dtypes(include=[np.number]).columns
for col in numeric_cols:
df[col] = df[col].fillna(df[col].median())
# Categorical: fill with mode or 'Unknown'
categorical_cols = df.select_dtypes(include=['object']).columns
for col in categorical_cols:
if df[col].mode().empty:
df[col] = df[col].fillna('Unknown')
else:
df[col] = df[col].fillna(df[col].mode()[0])
print(f"After handling missing values: {df.shape}")
if config['remove_outliers']:
numeric_cols = df.select_dtypes(include=[np.number]).columns
for col in numeric_cols:
Q1 = df[col].quantile(0.25)
Q3 = df[col].quantile(0.75)
IQR = Q3 - Q1
df = df[(df[col] >= Q1 - 1.5 * IQR) & (df[col] <= Q3 + 1.5 * IQR)]
print(f"After removing outliers: {df.shape}")
if config['normalize_text']:
text_cols = df.select_dtypes(include=['object']).columns
for col in text_cols:
df[col] = df[col].str.strip().str.lower()
return df
# Usage
cleaned_df = auto_clean_dataframe(df)
7. Save Pipeline Configuration
# data_cleaning_config.yaml
cleaning_pipeline:
remove_duplicates:
enabled: true
subset: ['id', 'email']
keep: 'first'
missing_values:
strategy: auto
drop_threshold: 50 # Drop columns with >50% missing
numeric_fill: median
categorical_fill: mode
outliers:
method: iqr
threshold: 1.5
columns: ['age', 'price', 'quantity']
data_types:
age: int64
price: float64
date: datetime64
email: string
text_normalization:
lowercase: true
strip_whitespace: true
remove_special_chars: false
validation:
ranges:
age: [0, 120]
price: [0, 1000000]
required_columns: ['id', 'name', 'email']
Best Practices
DO:
- Always keep original data
- Log all cleaning steps
- Validate data quality
- Handle missing values appropriately
- Remove duplicates early
- Check for outliers
- Validate data types
- Document assumptions
DON'T:
- Delete original data
- Fill all missing with zeros
- Ignore outliers
- Mix data types
- Skip validation
- Overfit to training data
- Remove too many rows
- Forget to save cleaned data
Checklist
- Loaded and inspected data
- Removed duplicates
- Handled missing values
- Fixed data types
- Removed/handled outliers
- Normalized text fields
- Validated data quality
- Generated cleaning report
- Saved cleaned data
Weekly Installs
1
Repository
dexploarer/clau…s-skillsGitHub Stars
4
First Seen
6 days ago
Security Audits
Installed on
zencoder1
amp1
cline1
openclaw1
opencode1
cursor1