data-silo-detection

SKILL.md

Data Silo Detection

Overview

Based on DDC methodology (Chapter 1.2), this skill detects and maps data silos in construction organizations, identifying disconnected data sources, duplicate data, and integration opportunities.

Book Reference: "Технологии и системы управления в современном строительстве" / "Technologies and Management Systems in Modern Construction"

Quick Start

from dataclasses import dataclass, field
from enum import Enum
from typing import List, Dict, Optional, Set, Tuple
from datetime import datetime
import json
from collections import defaultdict

class DataDomain(Enum):
    """Construction data domains"""
    DESIGN = "design"
    COST = "cost"
    SCHEDULE = "schedule"
    QUALITY = "quality"
    SAFETY = "safety"
    PROCUREMENT = "procurement"
    SITE = "site"
    DOCUMENT = "document"
    FINANCIAL = "financial"
    HR = "hr"

class SiloSeverity(Enum):
    """Severity level of data silo"""
    CRITICAL = "critical"      # Major business impact
    HIGH = "high"              # Significant inefficiency
    MEDIUM = "medium"          # Noticeable issues
    LOW = "low"                # Minor inconvenience

class DataSourceType(Enum):
    """Types of data sources"""
    DATABASE = "database"
    SPREADSHEET = "spreadsheet"
    FILE_SHARE = "file_share"
    CLOUD_APP = "cloud_app"
    DESKTOP_APP = "desktop_app"
    PAPER = "paper"
    EMAIL = "email"
    PERSONAL = "personal"

@dataclass
class DataSource:
    """Represents a data source in the organization"""
    id: str
    name: str
    type: DataSourceType
    domain: DataDomain
    owner: str
    department: str
    users: List[str]
    data_entities: List[str]
    connections: List[str] = field(default_factory=list)
    update_frequency: str = "unknown"
    access_level: str = "department"  # personal, department, organization
    has_api: bool = False
    last_modified: Optional[datetime] = None

@dataclass
class DataSilo:
    """Detected data silo"""
    id: str
    sources: List[DataSource]
    domain: DataDomain
    severity: SiloSeverity
    issue_type: str
    description: str
    impact: str
    affected_users: int
    affected_processes: List[str]
    recommendations: List[str]
    estimated_cost: Optional[float] = None

@dataclass
class DuplicateData:
    """Detected duplicate data across sources"""
    entity_name: str
    sources: List[str]
    discrepancy_rate: float  # 0-1
    master_source: Optional[str] = None
    issues: List[str] = field(default_factory=list)

@dataclass
class SiloAnalysis:
    """Complete silo analysis results"""
    organization: str
    analysis_date: datetime
    total_sources: int
    silos_detected: List[DataSilo]
    duplicates: List[DuplicateData]
    connectivity_score: float
    data_flow_gaps: List[Dict]
    priority_actions: List[str]
    integration_roadmap: Dict


class DataSiloDetector:
    """
    Detect and analyze data silos in construction organizations.
    Based on DDC methodology Chapter 1.2.
    """

    def __init__(self):
        self.domain_relationships = self._define_domain_relationships()
        self.critical_entities = self._define_critical_entities()

    def _define_domain_relationships(self) -> Dict[DataDomain, List[DataDomain]]:
        """Define expected relationships between domains"""
        return {
            DataDomain.DESIGN: [
                DataDomain.COST, DataDomain.SCHEDULE,
                DataDomain.PROCUREMENT, DataDomain.QUALITY
            ],
            DataDomain.COST: [
                DataDomain.DESIGN, DataDomain.SCHEDULE,
                DataDomain.FINANCIAL, DataDomain.PROCUREMENT
            ],
            DataDomain.SCHEDULE: [
                DataDomain.DESIGN, DataDomain.COST,
                DataDomain.SITE, DataDomain.HR
            ],
            DataDomain.PROCUREMENT: [
                DataDomain.COST, DataDomain.DESIGN,
                DataDomain.SITE, DataDomain.FINANCIAL
            ],
            DataDomain.SITE: [
                DataDomain.SCHEDULE, DataDomain.SAFETY,
                DataDomain.QUALITY, DataDomain.HR
            ],
            DataDomain.QUALITY: [
                DataDomain.DESIGN, DataDomain.SITE,
                DataDomain.DOCUMENT
            ],
            DataDomain.SAFETY: [
                DataDomain.SITE, DataDomain.HR,
                DataDomain.DOCUMENT
            ],
            DataDomain.FINANCIAL: [
                DataDomain.COST, DataDomain.PROCUREMENT,
                DataDomain.HR
            ]
        }

    def _define_critical_entities(self) -> Dict[str, List[DataDomain]]:
        """Define entities that should be shared across domains"""
        return {
            "project": [DataDomain.DESIGN, DataDomain.COST, DataDomain.SCHEDULE],
            "budget": [DataDomain.COST, DataDomain.FINANCIAL, DataDomain.PROCUREMENT],
            "schedule": [DataDomain.SCHEDULE, DataDomain.SITE, DataDomain.PROCUREMENT],
            "material": [DataDomain.DESIGN, DataDomain.COST, DataDomain.PROCUREMENT],
            "labor": [DataDomain.HR, DataDomain.COST, DataDomain.SCHEDULE],
            "subcontractor": [DataDomain.PROCUREMENT, DataDomain.COST, DataDomain.SCHEDULE],
            "rfi": [DataDomain.DESIGN, DataDomain.DOCUMENT, DataDomain.SITE],
            "change_order": [DataDomain.COST, DataDomain.DESIGN, DataDomain.SCHEDULE]
        }

    def detect_silos(
        self,
        organization: str,
        data_sources: List[DataSource],
        process_flows: Optional[List[Dict]] = None
    ) -> SiloAnalysis:
        """
        Detect data silos in the organization.

        Args:
            organization: Organization name
            data_sources: List of data sources to analyze
            process_flows: Optional business process flows

        Returns:
            Complete silo analysis
        """
        # Build connectivity graph
        connectivity = self._build_connectivity_graph(data_sources)

        # Detect isolated sources
        isolated_silos = self._detect_isolated_sources(
            data_sources, connectivity
        )

        # Detect domain silos
        domain_silos = self._detect_domain_silos(data_sources)

        # Detect duplicate data
        duplicates = self._detect_duplicates(data_sources)

        # Detect data flow gaps
        flow_gaps = self._detect_flow_gaps(
            data_sources, process_flows
        )

        # Calculate connectivity score
        connectivity_score = self._calculate_connectivity_score(
            data_sources, connectivity
        )

        # Combine all silos
        all_silos = isolated_silos + domain_silos

        # Prioritize silos
        prioritized_silos = self._prioritize_silos(all_silos)

        # Generate priority actions
        priority_actions = self._generate_priority_actions(
            prioritized_silos, duplicates
        )

        # Create integration roadmap
        roadmap = self._create_integration_roadmap(
            prioritized_silos, flow_gaps
        )

        return SiloAnalysis(
            organization=organization,
            analysis_date=datetime.now(),
            total_sources=len(data_sources),
            silos_detected=prioritized_silos,
            duplicates=duplicates,
            connectivity_score=connectivity_score,
            data_flow_gaps=flow_gaps,
            priority_actions=priority_actions,
            integration_roadmap=roadmap
        )

    def _build_connectivity_graph(
        self,
        sources: List[DataSource]
    ) -> Dict[str, Set[str]]:
        """Build graph of source connections"""
        graph = defaultdict(set)

        for source in sources:
            for connection in source.connections:
                graph[source.id].add(connection)
                graph[connection].add(source.id)

        return graph

    def _detect_isolated_sources(
        self,
        sources: List[DataSource],
        connectivity: Dict[str, Set[str]]
    ) -> List[DataSilo]:
        """Detect sources with no connections"""
        silos = []

        for source in sources:
            connections = len(connectivity.get(source.id, set()))

            if connections == 0:
                severity = SiloSeverity.CRITICAL if source.domain in [
                    DataDomain.COST, DataDomain.SCHEDULE
                ] else SiloSeverity.HIGH

                silos.append(DataSilo(
                    id=f"isolated_{source.id}",
                    sources=[source],
                    domain=source.domain,
                    severity=severity,
                    issue_type="isolated_source",
                    description=f"{source.name} has no connections to other systems",
                    impact="Data must be manually transferred, risking errors and delays",
                    affected_users=len(source.users),
                    affected_processes=self._get_affected_processes(source.domain),
                    recommendations=[
                        f"Connect {source.name} via API or ETL to related systems",
                        "Establish data synchronization schedule",
                        "Define master data source for shared entities"
                    ]
                ))
            elif connections == 1 and source.access_level == "personal":
                silos.append(DataSilo(
                    id=f"personal_{source.id}",
                    sources=[source],
                    domain=source.domain,
                    severity=SiloSeverity.MEDIUM,
                    issue_type="personal_silo",
                    description=f"{source.name} is a personal data store with limited access",
                    impact="Data not accessible to team, knowledge loss risk",
                    affected_users=1,
                    affected_processes=self._get_affected_processes(source.domain),
                    recommendations=[
                        "Move data to shared organizational repository",
                        "Implement access controls instead of isolation",
                        "Document data structure and usage"
                    ]
                ))

        return silos

    def _detect_domain_silos(
        self,
        sources: List[DataSource]
    ) -> List[DataSilo]:
        """Detect silos between domains that should be connected"""
        silos = []

        # Group sources by domain
        domain_sources = defaultdict(list)
        for source in sources:
            domain_sources[source.domain].append(source)

        # Check for missing domain connections
        for domain, related_domains in self.domain_relationships.items():
            domain_srcs = domain_sources.get(domain, [])

            for related in related_domains:
                related_srcs = domain_sources.get(related, [])

                if domain_srcs and related_srcs:
                    # Check if any connections exist between domains
                    has_connection = False
                    for src in domain_srcs:
                        for rel_src in related_srcs:
                            if rel_src.id in src.connections:
                                has_connection = True
                                break

                    if not has_connection:
                        silos.append(DataSilo(
                            id=f"domain_gap_{domain.value}_{related.value}",
                            sources=domain_srcs + related_srcs,
                            domain=domain,
                            severity=SiloSeverity.HIGH,
                            issue_type="domain_disconnect",
                            description=f"No data flow between {domain.value} and {related.value}",
                            impact="Related information not synchronized, decision delays",
                            affected_users=sum(len(s.users) for s in domain_srcs + related_srcs),
                            affected_processes=self._get_affected_processes(domain) +
                                              self._get_affected_processes(related),
                            recommendations=[
                                f"Establish integration between {domain.value} and {related.value} systems",
                                "Define shared data entities and master sources",
                                "Implement automated data synchronization"
                            ]
                        ))

        return silos

    def _detect_duplicates(
        self,
        sources: List[DataSource]
    ) -> List[DuplicateData]:
        """Detect duplicate data across sources"""
        duplicates = []

        # Map entities to sources
        entity_sources = defaultdict(list)
        for source in sources:
            for entity in source.data_entities:
                entity_sources[entity].append(source.id)

        # Find duplicates
        for entity, source_ids in entity_sources.items():
            if len(source_ids) > 1:
                # Check if it's a critical entity
                is_critical = entity.lower() in self.critical_entities

                duplicate = DuplicateData(
                    entity_name=entity,
                    sources=source_ids,
                    discrepancy_rate=0.0,  # Would need actual data to calculate
                    issues=[]
                )

                if is_critical and len(source_ids) > 2:
                    duplicate.issues.append(
                        "Critical entity duplicated in multiple systems"
                    )

                if not any(s for s in sources if s.id in source_ids and "master" in s.name.lower()):
                    duplicate.issues.append("No clear master source defined")

                duplicates.append(duplicate)

        return duplicates

    def _detect_flow_gaps(
        self,
        sources: List[DataSource],
        process_flows: Optional[List[Dict]]
    ) -> List[Dict]:
        """Detect gaps in expected data flows"""
        gaps = []

        # Check critical entity coverage
        for entity, required_domains in self.critical_entities.items():
            entity_domains = set()
            for source in sources:
                if entity in [e.lower() for e in source.data_entities]:
                    entity_domains.add(source.domain)

            missing = set(required_domains) - entity_domains
            if missing:
                gaps.append({
                    "entity": entity,
                    "missing_domains": [d.value for d in missing],
                    "impact": f"{entity} data not available in {len(missing)} domains"
                })

        return gaps

    def _calculate_connectivity_score(
        self,
        sources: List[DataSource],
        connectivity: Dict[str, Set[str]]
    ) -> float:
        """Calculate overall connectivity score"""
        if not sources:
            return 0.0

        # Calculate average connections per source
        total_connections = sum(len(conns) for conns in connectivity.values())
        avg_connections = total_connections / len(sources)

        # Ideal connections per source
        ideal_connections = 3

        # Score based on average connections
        connection_score = min(1.0, avg_connections / ideal_connections)

        # Penalize for isolated sources
        isolated = sum(1 for s in sources if s.id not in connectivity or not connectivity[s.id])
        isolation_penalty = isolated / len(sources)

        # API availability bonus
        api_count = sum(1 for s in sources if s.has_api)
        api_bonus = (api_count / len(sources)) * 0.2

        return max(0, min(1.0, connection_score - isolation_penalty + api_bonus))

    def _get_affected_processes(self, domain: DataDomain) -> List[str]:
        """Get business processes affected by domain"""
        process_map = {
            DataDomain.DESIGN: ["Design Review", "RFI Processing", "Drawing Distribution"],
            DataDomain.COST: ["Budgeting", "Cost Tracking", "Invoice Processing"],
            DataDomain.SCHEDULE: ["Planning", "Progress Tracking", "Resource Allocation"],
            DataDomain.PROCUREMENT: ["Vendor Selection", "Purchase Orders", "Material Tracking"],
            DataDomain.SITE: ["Daily Reports", "Progress Photos", "Issue Management"],
            DataDomain.QUALITY: ["Inspections", "Defect Tracking", "Compliance"],
            DataDomain.SAFETY: ["Incident Reporting", "Safety Inspections", "Training"],
            DataDomain.FINANCIAL: ["Billing", "Payments", "Financial Reporting"],
            DataDomain.HR: ["Timekeeping", "Resource Management", "Certifications"]
        }
        return process_map.get(domain, [])

    def _prioritize_silos(
        self,
        silos: List[DataSilo]
    ) -> List[DataSilo]:
        """Prioritize silos by severity and impact"""
        severity_order = {
            SiloSeverity.CRITICAL: 0,
            SiloSeverity.HIGH: 1,
            SiloSeverity.MEDIUM: 2,
            SiloSeverity.LOW: 3
        }

        return sorted(
            silos,
            key=lambda s: (severity_order[s.severity], -s.affected_users)
        )

    def _generate_priority_actions(
        self,
        silos: List[DataSilo],
        duplicates: List[DuplicateData]
    ) -> List[str]:
        """Generate prioritized action items"""
        actions = []

        # Critical silos first
        critical_silos = [s for s in silos if s.severity == SiloSeverity.CRITICAL]
        for silo in critical_silos[:3]:
            actions.append(f"URGENT: {silo.recommendations[0]}")

        # Duplicate data issues
        critical_dups = [d for d in duplicates if d.issues]
        for dup in critical_dups[:2]:
            actions.append(
                f"Define master source for '{dup.entity_name}' "
                f"(currently in {len(dup.sources)} sources)"
            )

        # High priority silos
        high_silos = [s for s in silos if s.severity == SiloSeverity.HIGH]
        for silo in high_silos[:3]:
            if silo.recommendations:
                actions.append(silo.recommendations[0])

        return actions[:10]

    def _create_integration_roadmap(
        self,
        silos: List[DataSilo],
        gaps: List[Dict]
    ) -> Dict:
        """Create phased integration roadmap"""
        roadmap = {
            "Phase 1 - Quick Wins (0-3 months)": [],
            "Phase 2 - Core Integration (3-6 months)": [],
            "Phase 3 - Advanced Integration (6-12 months)": [],
            "Phase 4 - Optimization (12+ months)": []
        }

        # Phase 1: Address personal silos and easy integrations
        for silo in silos:
            if silo.issue_type == "personal_silo":
                roadmap["Phase 1 - Quick Wins (0-3 months)"].append(
                    f"Migrate {silo.sources[0].name} to shared repository"
                )

        # Phase 2: Core domain integrations
        domain_gaps = [s for s in silos if s.issue_type == "domain_disconnect"]
        for silo in domain_gaps[:3]:
            roadmap["Phase 2 - Core Integration (3-6 months)"].append(
                silo.recommendations[0] if silo.recommendations else silo.description
            )

        # Phase 3: Critical entity master data
        roadmap["Phase 3 - Advanced Integration (6-12 months)"].extend([
            "Implement master data management for shared entities",
            "Deploy integration middleware/ESB",
            "Establish data governance policies"
        ])

        # Phase 4: Optimization
        roadmap["Phase 4 - Optimization (12+ months)"].extend([
            "Implement real-time data synchronization",
            "Deploy integration monitoring and alerting",
            "Continuous improvement based on metrics"
        ])

        return roadmap

    def generate_report(self, analysis: SiloAnalysis) -> str:
        """Generate silo analysis report"""
        report = f"""
# Data Silo Analysis Report
## {analysis.organization}

**Analysis Date:** {analysis.analysis_date.strftime('%Y-%m-%d')}
**Data Sources Analyzed:** {analysis.total_sources}
**Connectivity Score:** {analysis.connectivity_score:.0%}

## Executive Summary

Detected **{len(analysis.silos_detected)}** data silos and **{len(analysis.duplicates)}** duplicate data issues.

### Silos by Severity
"""
        severity_counts = defaultdict(int)
        for silo in analysis.silos_detected:
            severity_counts[silo.severity.value] += 1

        for severity in ["critical", "high", "medium", "low"]:
            count = severity_counts.get(severity, 0)
            if count > 0:
                report += f"- **{severity.title()}**: {count}\n"

        report += "\n## Priority Actions\n\n"
        for i, action in enumerate(analysis.priority_actions, 1):
            report += f"{i}. {action}\n"

        report += "\n## Detected Silos\n\n"
        for silo in analysis.silos_detected[:5]:
            report += f"""
### {silo.id}
- **Type:** {silo.issue_type}
- **Severity:** {silo.severity.value}
- **Impact:** {silo.impact}
- **Affected Users:** {silo.affected_users}
"""

        report += "\n## Integration Roadmap\n"
        for phase, items in analysis.integration_roadmap.items():
            report += f"\n### {phase}\n"
            for item in items:
                report += f"- {item}\n"

        return report

Common Use Cases

Detect Data Silos

detector = DataSiloDetector()

# Define data sources
sources = [
    DataSource(
        id="revit",
        name="Revit Models",
        type=DataSourceType.DESKTOP_APP,
        domain=DataDomain.DESIGN,
        owner="Design Team",
        department="Engineering",
        users=["architect1", "engineer1", "engineer2"],
        data_entities=["building_model", "drawings", "schedules"],
        connections=["navisworks"],
        has_api=True
    ),
    DataSource(
        id="excel_estimates",
        name="Excel Cost Estimates",
        type=DataSourceType.SPREADSHEET,
        domain=DataDomain.COST,
        owner="Estimator",
        department="Pre-construction",
        users=["estimator1"],
        data_entities=["costs", "quantities", "labor_rates"],
        connections=[],  # No connections - silo!
        access_level="personal"
    ),
    DataSource(
        id="procore",
        name="Procore",
        type=DataSourceType.CLOUD_APP,
        domain=DataDomain.SITE,
        owner="Project Manager",
        department="Operations",
        users=["pm1", "pm2", "super1"],
        data_entities=["daily_reports", "photos", "punch_list"],
        connections=["primavera"],
        has_api=True
    )
]

analysis = detector.detect_silos(
    organization="ABC Construction",
    data_sources=sources
)

print(f"Silos detected: {len(analysis.silos_detected)}")
print(f"Connectivity score: {analysis.connectivity_score:.0%}")

Generate Silo Report

report = detector.generate_report(analysis)
print(report)

# Save to file
with open("silo_report.md", "w") as f:
    f.write(report)

View Priority Actions

print("Priority Actions:")
for i, action in enumerate(analysis.priority_actions, 1):
    print(f"{i}. {action}")

print("\nIntegration Roadmap:")
for phase, items in analysis.integration_roadmap.items():
    print(f"\n{phase}:")
    for item in items:
        print(f"  - {item}")

Quick Reference

Component Purpose
DataSiloDetector Main detection engine
DataSource Data source definition
DataSilo Detected silo with details
DuplicateData Duplicate data detection
SiloAnalysis Complete analysis results
SiloSeverity Severity classification

Resources

Next Steps

Weekly Installs
3
GitHub Stars
52
First Seen
10 days ago
Installed on
opencode3
gemini-cli3
antigravity3
claude-code3
github-copilot3
codex3