data-profiler
SKILL.md
Data Profiler for Construction
Overview
Analyze construction data to understand its characteristics, distributions, quality, and patterns. Essential for data quality assessment, ETL planning, and identifying data issues before they impact projects.
Business Case
Before using any construction data, you need to understand:
- What data types are present
- Distribution of values
- Missing data patterns
- Anomalies and outliers
- Referential integrity issues
This skill profiles data to answer these questions and provides actionable insights.
Technical Implementation
from dataclasses import dataclass, field
from typing import List, Dict, Any, Optional, Tuple
import pandas as pd
import numpy as np
from datetime import datetime
import json
@dataclass
class ColumnProfile:
name: str
data_type: str
inferred_type: str # More specific: project_id, cost, date, csi_code, etc.
total_count: int
null_count: int
null_percentage: float
unique_count: int
uniqueness_ratio: float
# For numeric columns
min_value: Optional[float] = None
max_value: Optional[float] = None
mean_value: Optional[float] = None
median_value: Optional[float] = None
std_dev: Optional[float] = None
# For string columns
min_length: Optional[int] = None
max_length: Optional[int] = None
avg_length: Optional[float] = None
# Top values
top_values: List[Tuple[Any, int]] = field(default_factory=list)
# Patterns
common_patterns: List[str] = field(default_factory=list)
# Quality flags
quality_issues: List[str] = field(default_factory=list)
@dataclass
class DataProfile:
source_name: str
row_count: int
column_count: int
columns: List[ColumnProfile]
duplicate_rows: int
memory_usage: str
profiled_at: datetime
quality_score: float
recommendations: List[str]
class ConstructionDataProfiler:
"""Profile construction data for quality and characteristics."""
# Known construction data patterns
CONSTRUCTION_PATTERNS = {
'csi_code': r'^\d{2}\s?\d{2}\s?\d{2}$',
'project_id': r'^[A-Z]{2,4}[-_]?\d{3,6}$',
'cost_code': r'^\d{2}[-.]?\d{2,4}$',
'wbs': r'^[\d.]+$',
'phone': r'^\(?\d{3}\)?[-.\s]?\d{3}[-.\s]?\d{4}$',
'email': r'^[\w.-]+@[\w.-]+\.\w+$',
'date_iso': r'^\d{4}-\d{2}-\d{2}',
'date_us': r'^\d{1,2}/\d{1,2}/\d{2,4}$',
'currency': r'^\$?[\d,]+\.?\d{0,2}$',
'percentage': r'^\d+\.?\d*%?$',
}
# Construction-specific column name patterns
COLUMN_TYPE_HINTS = {
'project': ['project_id', 'project_name', 'proj', 'job'],
'cost': ['cost', 'amount', 'price', 'total', 'budget', 'actual'],
'date': ['date', 'start', 'finish', 'end', 'created', 'modified'],
'quantity': ['qty', 'quantity', 'count', 'units'],
'csi': ['csi', 'division', 'masterformat', 'spec'],
'location': ['location', 'area', 'zone', 'floor', 'level'],
'person': ['owner', 'manager', 'superintendent', 'foreman', 'contact'],
}
def __init__(self):
self.profiles: Dict[str, DataProfile] = {}
def profile_dataframe(self, df: pd.DataFrame, source_name: str) -> DataProfile:
"""Profile a pandas DataFrame."""
columns = []
for col in df.columns:
col_profile = self._profile_column(df[col], col)
columns.append(col_profile)
# Calculate duplicates
duplicate_rows = len(df) - len(df.drop_duplicates())
# Calculate memory usage
memory_bytes = df.memory_usage(deep=True).sum()
if memory_bytes < 1024:
memory_usage = f"{memory_bytes} B"
elif memory_bytes < 1024**2:
memory_usage = f"{memory_bytes/1024:.1f} KB"
else:
memory_usage = f"{memory_bytes/1024**2:.1f} MB"
# Calculate overall quality score
quality_score = self._calculate_quality_score(columns)
# Generate recommendations
recommendations = self._generate_recommendations(columns, df)
profile = DataProfile(
source_name=source_name,
row_count=len(df),
column_count=len(df.columns),
columns=columns,
duplicate_rows=duplicate_rows,
memory_usage=memory_usage,
profiled_at=datetime.now(),
quality_score=quality_score,
recommendations=recommendations
)
self.profiles[source_name] = profile
return profile
def _profile_column(self, series: pd.Series, name: str) -> ColumnProfile:
"""Profile a single column."""
total_count = len(series)
null_count = series.isnull().sum()
null_percentage = (null_count / total_count * 100) if total_count > 0 else 0
# Get non-null values for analysis
non_null = series.dropna()
unique_count = non_null.nunique()
uniqueness_ratio = unique_count / len(non_null) if len(non_null) > 0 else 0
profile = ColumnProfile(
name=name,
data_type=str(series.dtype),
inferred_type=self._infer_construction_type(series, name),
total_count=total_count,
null_count=null_count,
null_percentage=round(null_percentage, 2),
unique_count=unique_count,
uniqueness_ratio=round(uniqueness_ratio, 4)
)
# Numeric analysis
if pd.api.types.is_numeric_dtype(series):
profile.min_value = float(non_null.min()) if len(non_null) > 0 else None
profile.max_value = float(non_null.max()) if len(non_null) > 0 else None
profile.mean_value = float(non_null.mean()) if len(non_null) > 0 else None
profile.median_value = float(non_null.median()) if len(non_null) > 0 else None
profile.std_dev = float(non_null.std()) if len(non_null) > 1 else None
# Check for outliers
if len(non_null) > 10 and profile.std_dev:
outliers = non_null[abs(non_null - profile.mean_value) > 3 * profile.std_dev]
if len(outliers) > 0:
profile.quality_issues.append(f"{len(outliers)} potential outliers detected")
# Check for negative costs
if any(hint in name.lower() for hint in ['cost', 'amount', 'price', 'total']):
negatives = (non_null < 0).sum()
if negatives > 0:
profile.quality_issues.append(f"{negatives} negative values in cost column")
# String analysis
elif pd.api.types.is_object_dtype(series) or pd.api.types.is_string_dtype(series):
str_series = non_null.astype(str)
lengths = str_series.str.len()
profile.min_length = int(lengths.min()) if len(lengths) > 0 else None
profile.max_length = int(lengths.max()) if len(lengths) > 0 else None
profile.avg_length = float(lengths.mean()) if len(lengths) > 0 else None
# Detect patterns
profile.common_patterns = self._detect_patterns(str_series)
# Top values
if len(non_null) > 0:
value_counts = non_null.value_counts().head(5)
profile.top_values = list(zip(value_counts.index.tolist(), value_counts.values.tolist()))
# Quality checks
if null_percentage > 50:
profile.quality_issues.append("High null rate (>50%)")
if uniqueness_ratio == 1.0 and total_count > 100:
profile.quality_issues.append("All unique values - possible ID column")
if uniqueness_ratio < 0.01 and unique_count > 1:
profile.quality_issues.append("Low cardinality - possible category")
return profile
def _infer_construction_type(self, series: pd.Series, name: str) -> str:
"""Infer construction-specific data type."""
name_lower = name.lower()
# Check column name hints
for type_name, hints in self.COLUMN_TYPE_HINTS.items():
if any(hint in name_lower for hint in hints):
return type_name
# Check data patterns
non_null = series.dropna().astype(str)
if len(non_null) == 0:
return "unknown"
sample = non_null.head(100)
for pattern_name, pattern in self.CONSTRUCTION_PATTERNS.items():
matches = sample.str.match(pattern, na=False).sum()
if matches / len(sample) > 0.8:
return pattern_name
# Default to pandas dtype
if pd.api.types.is_numeric_dtype(series):
return "numeric"
elif pd.api.types.is_datetime64_any_dtype(series):
return "datetime"
else:
return "text"
def _detect_patterns(self, str_series: pd.Series) -> List[str]:
"""Detect common patterns in string data."""
patterns_found = []
sample = str_series.head(1000)
for pattern_name, pattern in self.CONSTRUCTION_PATTERNS.items():
matches = sample.str.match(pattern, na=False).sum()
if matches / len(sample) > 0.1:
patterns_found.append(f"{pattern_name} ({matches/len(sample):.0%})")
return patterns_found[:3]
def _calculate_quality_score(self, columns: List[ColumnProfile]) -> float:
"""Calculate overall data quality score (0-100)."""
if not columns:
return 0.0
scores = []
for col in columns:
col_score = 100
# Penalize for nulls
col_score -= min(col.null_percentage, 50)
# Penalize for quality issues
col_score -= len(col.quality_issues) * 10
scores.append(max(col_score, 0))
return round(sum(scores) / len(scores), 1)
def _generate_recommendations(self, columns: List[ColumnProfile], df: pd.DataFrame) -> List[str]:
"""Generate recommendations based on profile."""
recommendations = []
# High null columns
high_null = [c for c in columns if c.null_percentage > 30]
if high_null:
recommendations.append(
f"Review {len(high_null)} columns with >30% null values: "
f"{', '.join(c.name for c in high_null[:3])}"
)
# Potential ID columns without uniqueness
for col in columns:
if 'id' in col.name.lower() and col.uniqueness_ratio < 1.0:
recommendations.append(
f"Column '{col.name}' appears to be an ID but has duplicate values"
)
# Date columns that should be datetime
for col in columns:
if col.inferred_type in ['date_iso', 'date_us'] and col.data_type == 'object':
recommendations.append(
f"Convert '{col.name}' to datetime type for better analysis"
)
# Cost columns that are strings
for col in columns:
if col.inferred_type == 'currency' and col.data_type == 'object':
recommendations.append(
f"Convert '{col.name}' to numeric type (remove $ and commas)"
)
return recommendations
def profile_to_dict(self, profile: DataProfile) -> Dict:
"""Convert profile to dictionary for JSON export."""
return {
'source_name': profile.source_name,
'row_count': profile.row_count,
'column_count': profile.column_count,
'duplicate_rows': profile.duplicate_rows,
'memory_usage': profile.memory_usage,
'profiled_at': profile.profiled_at.isoformat(),
'quality_score': profile.quality_score,
'recommendations': profile.recommendations,
'columns': [
{
'name': c.name,
'data_type': c.data_type,
'inferred_type': c.inferred_type,
'null_percentage': c.null_percentage,
'unique_count': c.unique_count,
'quality_issues': c.quality_issues,
'top_values': c.top_values[:3]
}
for c in profile.columns
]
}
def generate_profile_report(self, profile: DataProfile) -> str:
"""Generate markdown profile report."""
report = [f"# Data Profile: {profile.source_name}", ""]
report.append(f"**Profiled At:** {profile.profiled_at.strftime('%Y-%m-%d %H:%M')}")
report.append(f"**Quality Score:** {profile.quality_score}/100")
report.append("")
# Summary
report.append("## Summary")
report.append(f"- **Rows:** {profile.row_count:,}")
report.append(f"- **Columns:** {profile.column_count}")
report.append(f"- **Duplicate Rows:** {profile.duplicate_rows:,}")
report.append(f"- **Memory Usage:** {profile.memory_usage}")
report.append("")
# Recommendations
if profile.recommendations:
report.append("## Recommendations")
for rec in profile.recommendations:
report.append(f"- {rec}")
report.append("")
# Column Details
report.append("## Column Details")
report.append("")
report.append("| Column | Type | Inferred | Nulls | Unique | Issues |")
report.append("|--------|------|----------|-------|--------|--------|")
for col in profile.columns:
issues = len(col.quality_issues)
report.append(
f"| {col.name} | {col.data_type} | {col.inferred_type} | "
f"{col.null_percentage:.1f}% | {col.unique_count:,} | {issues} |"
)
# Detailed column profiles
report.append("")
report.append("## Detailed Column Profiles")
for col in profile.columns:
report.append(f"\n### {col.name}")
report.append(f"- **Type:** {col.data_type} (inferred: {col.inferred_type})")
report.append(f"- **Nulls:** {col.null_count:,} ({col.null_percentage:.1f}%)")
report.append(f"- **Unique Values:** {col.unique_count:,} ({col.uniqueness_ratio:.1%})")
if col.min_value is not None:
report.append(f"- **Range:** {col.min_value:,.2f} to {col.max_value:,.2f}")
report.append(f"- **Mean:** {col.mean_value:,.2f}, Median: {col.median_value:,.2f}")
if col.min_length is not None:
report.append(f"- **Length:** {col.min_length} to {col.max_length} (avg: {col.avg_length:.1f})")
if col.top_values:
report.append(f"- **Top Values:** {col.top_values[:3]}")
if col.common_patterns:
report.append(f"- **Patterns:** {', '.join(col.common_patterns)}")
if col.quality_issues:
report.append(f"- **Issues:** {', '.join(col.quality_issues)}")
return "\n".join(report)
def compare_profiles(self, profile1: DataProfile, profile2: DataProfile) -> Dict:
"""Compare two profiles to detect schema changes or data drift."""
comparison = {
'profiles': [profile1.source_name, profile2.source_name],
'row_count_change': profile2.row_count - profile1.row_count,
'quality_change': profile2.quality_score - profile1.quality_score,
'new_columns': [],
'removed_columns': [],
'type_changes': [],
'null_rate_changes': []
}
cols1 = {c.name: c for c in profile1.columns}
cols2 = {c.name: c for c in profile2.columns}
# Find new/removed columns
comparison['new_columns'] = [n for n in cols2 if n not in cols1]
comparison['removed_columns'] = [n for n in cols1 if n not in cols2]
# Compare common columns
for name in cols1:
if name in cols2:
c1, c2 = cols1[name], cols2[name]
if c1.data_type != c2.data_type:
comparison['type_changes'].append({
'column': name,
'from': c1.data_type,
'to': c2.data_type
})
null_change = c2.null_percentage - c1.null_percentage
if abs(null_change) > 10:
comparison['null_rate_changes'].append({
'column': name,
'change': null_change
})
return comparison
Quick Start
import pandas as pd
# Load construction data
df = pd.read_excel("project_costs.xlsx")
# Profile the data
profiler = ConstructionDataProfiler()
profile = profiler.profile_dataframe(df, "Project Costs 2025")
# Generate report
report = profiler.generate_profile_report(profile)
print(report)
# Export to JSON
profile_dict = profiler.profile_to_dict(profile)
with open("profile.json", "w") as f:
json.dump(profile_dict, f, indent=2)
# Compare with previous profile
old_profile = profiler.profile_dataframe(old_df, "Project Costs 2024")
comparison = profiler.compare_profiles(old_profile, profile)
print(f"Quality changed by: {comparison['quality_change']}")
Common Use Cases
- Pre-ETL Analysis: Profile source data before building pipelines
- Quality Monitoring: Track data quality over time
- Schema Validation: Detect unexpected changes in data structure
- Anomaly Detection: Find outliers and data quality issues
Dependencies
pip install pandas numpy
Resources
- Data Profiling Best Practices: DAMA DMBOK
- Construction Data Standards: CSI MasterFormat, UniFormat
Weekly Installs
4
Repository
datadrivenconst…tructionGitHub Stars
52
First Seen
11 days ago
Security Audits
Installed on
opencode4
antigravity4
github-copilot4
codex4
kimi-cli4
gemini-cli4