environmental-monitoring

SKILL.md

Environmental Monitoring

Overview

Monitor and analyze environmental conditions on construction sites including air quality, noise, vibration, dust, and weather. Support regulatory compliance, worker safety, and community relations through real-time environmental tracking.

Environmental Monitoring System

┌─────────────────────────────────────────────────────────────────┐
│                ENVIRONMENTAL MONITORING                          │
├─────────────────────────────────────────────────────────────────┤
│                                                                  │
│  SENSORS                   MONITORING            COMPLIANCE     │
│  ───────                   ──────────            ──────────     │
│                                                                  │
│  💨 Air Quality  ───┐                            ✅ OSHA limits  │
│  🔊 Noise Level  ───┼─────→ Real-time   ────────→ ✅ EPA limits  │
│  📊 Vibration    ───┤       Dashboard            ✅ Local codes │
│  🌫️ Dust/PM     ───┤       Alerts               ✅ Permits     │
│  🌡️ Weather     ───┘       Reports              ✅ Neighbors   │
│                                                                  │
│  THRESHOLDS:                                                    │
│  • Noise: 85 dB (OSHA 8hr TWA)                                 │
│  • PM2.5: 35 µg/m³ (EPA 24hr)                                  │
│  • Vibration: 25 mm/s (structural)                             │
│  • CO: 50 ppm (OSHA ceiling)                                   │
│                                                                  │
└─────────────────────────────────────────────────────────────────┘

Technical Implementation

from dataclasses import dataclass, field
from typing import List, Dict, Optional, Tuple
from datetime import datetime, timedelta
from enum import Enum
import statistics
import math

class ParameterType(Enum):
    NOISE = "noise"
    PM25 = "pm25"
    PM10 = "pm10"
    CO = "co"
    CO2 = "co2"
    VOC = "voc"
    VIBRATION = "vibration"
    TEMPERATURE = "temperature"
    HUMIDITY = "humidity"
    WIND_SPEED = "wind_speed"
    WIND_DIRECTION = "wind_direction"
    RAINFALL = "rainfall"

class ComplianceStatus(Enum):
    COMPLIANT = "compliant"
    WARNING = "warning"
    EXCEEDANCE = "exceedance"
    CRITICAL = "critical"

class AlertType(Enum):
    THRESHOLD_WARNING = "threshold_warning"
    THRESHOLD_EXCEEDANCE = "threshold_exceedance"
    EQUIPMENT_MALFUNCTION = "equipment_malfunction"
    WEATHER_ALERT = "weather_alert"
    COMMUNITY_COMPLAINT = "community_complaint"

@dataclass
class RegulatoryLimit:
    parameter: ParameterType
    limit_value: float
    unit: str
    averaging_period_hours: float  # e.g., 8 for 8-hour TWA
    regulation: str  # e.g., "OSHA", "EPA"
    description: str

@dataclass
class EnvironmentalReading:
    station_id: str
    parameter: ParameterType
    timestamp: datetime
    value: float
    unit: str
    quality_flag: str = "valid"

@dataclass
class MonitoringStation:
    id: str
    name: str
    location: Dict  # {lat, lon, description}
    parameters: List[ParameterType]
    installation_date: datetime
    last_calibration: datetime
    status: str = "active"

@dataclass
class ComplianceRecord:
    parameter: ParameterType
    regulation: str
    limit_value: float
    measured_value: float
    averaging_period: str
    status: ComplianceStatus
    timestamp: datetime
    location: str

@dataclass
class EnvironmentalAlert:
    id: str
    alert_type: AlertType
    parameter: ParameterType
    station_id: str
    timestamp: datetime
    value: float
    threshold: float
    message: str
    acknowledged: bool = False
    resolved: bool = False
    resolution_notes: str = ""

@dataclass
class DailyReport:
    date: datetime
    site_name: str
    parameters_monitored: int
    readings_collected: int
    exceedances: int
    alerts_triggered: int
    compliance_status: ComplianceStatus
    summary: Dict[str, Dict]

class EnvironmentalMonitor:
    """Monitor environmental conditions on construction sites."""

    # Default regulatory limits
    REGULATORY_LIMITS = {
        ParameterType.NOISE: [
            RegulatoryLimit(ParameterType.NOISE, 85, "dBA", 8.0, "OSHA", "8-hour TWA"),
            RegulatoryLimit(ParameterType.NOISE, 90, "dBA", 8.0, "OSHA", "Action level"),
            RegulatoryLimit(ParameterType.NOISE, 115, "dBA", 0.25, "OSHA", "15-min max"),
        ],
        ParameterType.PM25: [
            RegulatoryLimit(ParameterType.PM25, 35, "µg/m³", 24.0, "EPA", "24-hour standard"),
            RegulatoryLimit(ParameterType.PM25, 12, "µg/m³", 8760.0, "EPA", "Annual standard"),
        ],
        ParameterType.PM10: [
            RegulatoryLimit(ParameterType.PM10, 150, "µg/m³", 24.0, "EPA", "24-hour standard"),
        ],
        ParameterType.CO: [
            RegulatoryLimit(ParameterType.CO, 50, "ppm", 0.0, "OSHA", "Ceiling limit"),
            RegulatoryLimit(ParameterType.CO, 35, "ppm", 8.0, "OSHA", "8-hour TWA"),
        ],
        ParameterType.VIBRATION: [
            RegulatoryLimit(ParameterType.VIBRATION, 25, "mm/s", 0.0, "ISO 4866", "Structural damage threshold"),
            RegulatoryLimit(ParameterType.VIBRATION, 5, "mm/s", 0.0, "DIN 4150", "Sensitive structures"),
        ],
    }

    def __init__(self, site_name: str):
        self.site_name = site_name
        self.stations: Dict[str, MonitoringStation] = {}
        self.readings: List[EnvironmentalReading] = []
        self.alerts: List[EnvironmentalAlert] = []
        self.custom_limits: Dict[ParameterType, List[RegulatoryLimit]] = {}

    def add_station(self, id: str, name: str, location: Dict,
                   parameters: List[ParameterType]) -> MonitoringStation:
        """Add monitoring station."""
        station = MonitoringStation(
            id=id,
            name=name,
            location=location,
            parameters=parameters,
            installation_date=datetime.now(),
            last_calibration=datetime.now()
        )
        self.stations[id] = station
        return station

    def add_custom_limit(self, parameter: ParameterType, limit_value: float,
                        unit: str, averaging_hours: float, regulation: str,
                        description: str):
        """Add custom regulatory limit."""
        limit = RegulatoryLimit(
            parameter=parameter,
            limit_value=limit_value,
            unit=unit,
            averaging_period_hours=averaging_hours,
            regulation=regulation,
            description=description
        )
        if parameter not in self.custom_limits:
            self.custom_limits[parameter] = []
        self.custom_limits[parameter].append(limit)

    def record_reading(self, station_id: str, parameter: ParameterType,
                      value: float, unit: str,
                      timestamp: datetime = None) -> EnvironmentalReading:
        """Record environmental reading."""
        if station_id not in self.stations:
            raise ValueError(f"Unknown station: {station_id}")

        reading = EnvironmentalReading(
            station_id=station_id,
            parameter=parameter,
            timestamp=timestamp or datetime.now(),
            value=value,
            unit=unit
        )

        self.readings.append(reading)

        # Check against limits
        self._check_limits(station_id, parameter, value)

        return reading

    def record_batch(self, readings: List[Dict]) -> int:
        """Record multiple readings."""
        count = 0
        for r in readings:
            try:
                self.record_reading(
                    station_id=r['station_id'],
                    parameter=ParameterType(r['parameter']),
                    value=r['value'],
                    unit=r['unit'],
                    timestamp=r.get('timestamp')
                )
                count += 1
            except Exception:
                pass
        return count

    def _check_limits(self, station_id: str, parameter: ParameterType, value: float):
        """Check value against regulatory limits."""
        # Get applicable limits
        limits = self.REGULATORY_LIMITS.get(parameter, [])
        limits.extend(self.custom_limits.get(parameter, []))

        for limit in limits:
            if limit.averaging_period_hours == 0:
                # Instantaneous limit
                check_value = value
            else:
                # Time-weighted average
                check_value = self._calculate_twa(
                    station_id, parameter, limit.averaging_period_hours
                )
                if check_value is None:
                    continue

            # Check against limit
            if check_value >= limit.limit_value:
                self._create_alert(
                    station_id, parameter, check_value, limit
                )
            elif check_value >= limit.limit_value * 0.8:
                # Warning at 80% of limit
                self._create_alert(
                    station_id, parameter, check_value, limit,
                    is_warning=True
                )

    def _calculate_twa(self, station_id: str, parameter: ParameterType,
                      hours: float) -> Optional[float]:
        """Calculate time-weighted average."""
        cutoff = datetime.now() - timedelta(hours=hours)
        readings = [r for r in self.readings
                   if r.station_id == station_id
                   and r.parameter == parameter
                   and r.timestamp > cutoff]

        if not readings:
            return None

        return statistics.mean([r.value for r in readings])

    def _create_alert(self, station_id: str, parameter: ParameterType,
                     value: float, limit: RegulatoryLimit,
                     is_warning: bool = False):
        """Create environmental alert."""
        # Avoid duplicate alerts
        recent_alerts = [a for a in self.alerts
                        if a.station_id == station_id
                        and a.parameter == parameter
                        and not a.resolved
                        and (datetime.now() - a.timestamp).total_seconds() < 3600]

        if recent_alerts:
            return

        alert_type = (AlertType.THRESHOLD_WARNING if is_warning
                     else AlertType.THRESHOLD_EXCEEDANCE)

        station = self.stations.get(station_id)

        alert = EnvironmentalAlert(
            id=f"ENV-{len(self.alerts)+1:05d}",
            alert_type=alert_type,
            parameter=parameter,
            station_id=station_id,
            timestamp=datetime.now(),
            value=value,
            threshold=limit.limit_value,
            message=f"{parameter.value} {'approaching' if is_warning else 'exceeds'} "
                    f"{limit.regulation} limit ({limit.limit_value} {limit.unit}) "
                    f"at {station.name if station else station_id}"
        )

        self.alerts.append(alert)

    def get_current_conditions(self, station_id: str = None) -> Dict:
        """Get current environmental conditions."""
        conditions = {}

        stations = ([self.stations[station_id]] if station_id
                   else self.stations.values())

        for station in stations:
            station_conditions = {}

            for param in station.parameters:
                # Get latest reading
                readings = [r for r in self.readings
                           if r.station_id == station.id
                           and r.parameter == param]

                if readings:
                    latest = max(readings, key=lambda r: r.timestamp)
                    station_conditions[param.value] = {
                        "value": latest.value,
                        "unit": latest.unit,
                        "timestamp": latest.timestamp,
                        "status": self._get_compliance_status(param, latest.value)
                    }

            conditions[station.id] = {
                "name": station.name,
                "location": station.location,
                "parameters": station_conditions
            }

        return conditions

    def _get_compliance_status(self, parameter: ParameterType,
                              value: float) -> ComplianceStatus:
        """Determine compliance status for value."""
        limits = self.REGULATORY_LIMITS.get(parameter, [])
        limits.extend(self.custom_limits.get(parameter, []))

        # Check instantaneous limits
        instant_limits = [l for l in limits if l.averaging_period_hours == 0]
        for limit in instant_limits:
            if value >= limit.limit_value:
                return ComplianceStatus.EXCEEDANCE
            elif value >= limit.limit_value * 0.9:
                return ComplianceStatus.WARNING

        return ComplianceStatus.COMPLIANT

    def check_compliance(self, start_date: datetime,
                        end_date: datetime) -> List[ComplianceRecord]:
        """Check compliance for period."""
        records = []

        for station in self.stations.values():
            for param in station.parameters:
                limits = self.REGULATORY_LIMITS.get(param, [])
                limits.extend(self.custom_limits.get(param, []))

                for limit in limits:
                    # Calculate average for period
                    readings = [r for r in self.readings
                               if r.station_id == station.id
                               and r.parameter == param
                               and start_date <= r.timestamp <= end_date]

                    if not readings:
                        continue

                    avg_value = statistics.mean([r.value for r in readings])
                    max_value = max(r.value for r in readings)

                    # Check appropriate value
                    if limit.averaging_period_hours == 0:
                        check_value = max_value
                        period_str = "Instantaneous"
                    else:
                        check_value = avg_value
                        period_str = f"{limit.averaging_period_hours:.0f}-hour avg"

                    # Determine status
                    if check_value >= limit.limit_value:
                        status = ComplianceStatus.EXCEEDANCE
                    elif check_value >= limit.limit_value * 0.9:
                        status = ComplianceStatus.WARNING
                    else:
                        status = ComplianceStatus.COMPLIANT

                    records.append(ComplianceRecord(
                        parameter=param,
                        regulation=limit.regulation,
                        limit_value=limit.limit_value,
                        measured_value=check_value,
                        averaging_period=period_str,
                        status=status,
                        timestamp=end_date,
                        location=station.name
                    ))

        return records

    def get_exceedance_summary(self, days: int = 30) -> Dict:
        """Get summary of exceedances."""
        cutoff = datetime.now() - timedelta(days=days)
        recent_alerts = [a for a in self.alerts
                       if a.timestamp > cutoff
                       and a.alert_type == AlertType.THRESHOLD_EXCEEDANCE]

        summary = {
            "period_days": days,
            "total_exceedances": len(recent_alerts),
            "by_parameter": {},
            "by_station": {},
            "recent_events": []
        }

        for alert in recent_alerts:
            # By parameter
            param = alert.parameter.value
            summary["by_parameter"][param] = summary["by_parameter"].get(param, 0) + 1

            # By station
            station = alert.station_id
            summary["by_station"][station] = summary["by_station"].get(station, 0) + 1

        # Recent events
        summary["recent_events"] = sorted(
            recent_alerts, key=lambda a: a.timestamp, reverse=True
        )[:10]

        return summary

    def generate_daily_report(self, date: datetime = None) -> DailyReport:
        """Generate daily environmental report."""
        if date is None:
            date = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)

        next_day = date + timedelta(days=1)

        # Filter readings
        day_readings = [r for r in self.readings
                       if date <= r.timestamp < next_day]

        # Filter alerts
        day_alerts = [a for a in self.alerts
                     if date <= a.timestamp < next_day]

        # Check compliance
        compliance_records = self.check_compliance(date, next_day)
        exceedances = [r for r in compliance_records
                      if r.status == ComplianceStatus.EXCEEDANCE]

        # Overall status
        if exceedances:
            overall_status = ComplianceStatus.EXCEEDANCE
        elif any(r.status == ComplianceStatus.WARNING for r in compliance_records):
            overall_status = ComplianceStatus.WARNING
        else:
            overall_status = ComplianceStatus.COMPLIANT

        # Summary by parameter
        param_summary = {}
        for param in ParameterType:
            param_readings = [r for r in day_readings if r.parameter == param]
            if param_readings:
                values = [r.value for r in param_readings]
                param_summary[param.value] = {
                    "count": len(values),
                    "min": min(values),
                    "max": max(values),
                    "avg": statistics.mean(values),
                    "exceedances": len([r for r in compliance_records
                                       if r.parameter == param
                                       and r.status == ComplianceStatus.EXCEEDANCE])
                }

        return DailyReport(
            date=date,
            site_name=self.site_name,
            parameters_monitored=len(set(r.parameter for r in day_readings)),
            readings_collected=len(day_readings),
            exceedances=len(exceedances),
            alerts_triggered=len(day_alerts),
            compliance_status=overall_status,
            summary=param_summary
        )

    def generate_report(self) -> str:
        """Generate environmental monitoring report."""
        lines = [
            "# Environmental Monitoring Report",
            "",
            f"**Site:** {self.site_name}",
            f"**Report Date:** {datetime.now().strftime('%Y-%m-%d %H:%M')}",
            "",
            "## Monitoring Stations",
            "",
            "| Station | Location | Parameters | Status |",
            "|---------|----------|------------|--------|"
        ]

        for station in self.stations.values():
            params = ", ".join([p.value for p in station.parameters])
            lines.append(
                f"| {station.name} | {station.location.get('description', '-')} | "
                f"{params} | {station.status} |"
            )

        # Current conditions
        conditions = self.get_current_conditions()
        lines.extend([
            "",
            "## Current Conditions",
            ""
        ])

        for station_id, data in conditions.items():
            lines.append(f"### {data['name']}")
            lines.append("")
            lines.append("| Parameter | Value | Status |")
            lines.append("|-----------|-------|--------|")

            for param, values in data['parameters'].items():
                status_icon = ("✅" if values['status'] == ComplianceStatus.COMPLIANT
                              else "⚠️" if values['status'] == ComplianceStatus.WARNING
                              else "🔴")
                lines.append(
                    f"| {param} | {values['value']:.1f} {values['unit']} | "
                    f"{status_icon} {values['status'].value} |"
                )

            lines.append("")

        # Exceedance summary
        exceedance_summary = self.get_exceedance_summary(30)
        lines.extend([
            "## 30-Day Exceedance Summary",
            "",
            f"**Total Exceedances:** {exceedance_summary['total_exceedances']}",
            ""
        ])

        if exceedance_summary['by_parameter']:
            lines.append("By Parameter:")
            for param, count in exceedance_summary['by_parameter'].items():
                lines.append(f"- {param}: {count}")

        # Active alerts
        active_alerts = [a for a in self.alerts if not a.resolved]
        if active_alerts:
            lines.extend([
                "",
                f"## Active Alerts ({len(active_alerts)})",
                "",
                "| Time | Parameter | Station | Value | Threshold |",
                "|------|-----------|---------|-------|-----------|"
            ])

            for alert in sorted(active_alerts, key=lambda a: a.timestamp, reverse=True)[:10]:
                lines.append(
                    f"| {alert.timestamp.strftime('%Y-%m-%d %H:%M')} | "
                    f"{alert.parameter.value} | {alert.station_id} | "
                    f"{alert.value:.1f} | {alert.threshold} |"
                )

        return "\n".join(lines)

Quick Start

from datetime import datetime, timedelta

# Initialize monitor
monitor = EnvironmentalMonitor("Downtown Construction Site")

# Add monitoring stations
monitor.add_station(
    "STA-001", "North Perimeter",
    location={"lat": 40.7128, "lon": -74.0060, "description": "North fence line"},
    parameters=[ParameterType.NOISE, ParameterType.PM25, ParameterType.PM10]
)

monitor.add_station(
    "STA-002", "Equipment Area",
    location={"lat": 40.7125, "lon": -74.0055, "description": "Near excavation"},
    parameters=[ParameterType.NOISE, ParameterType.VIBRATION, ParameterType.CO]
)

# Add custom limit for local ordinance
monitor.add_custom_limit(
    ParameterType.NOISE, 65, "dBA", 0,
    "Local Ordinance", "Residential boundary limit"
)

# Record readings
monitor.record_reading("STA-001", ParameterType.NOISE, 78.5, "dBA")
monitor.record_reading("STA-001", ParameterType.PM25, 28.3, "µg/m³")
monitor.record_reading("STA-002", ParameterType.VIBRATION, 8.2, "mm/s")

# Batch record
readings = [
    {"station_id": "STA-001", "parameter": "noise", "value": 82.0, "unit": "dBA"},
    {"station_id": "STA-001", "parameter": "pm25", "value": 31.5, "unit": "µg/m³"},
    {"station_id": "STA-002", "parameter": "noise", "value": 88.0, "unit": "dBA"}
]
monitor.record_batch(readings)

# Get current conditions
conditions = monitor.get_current_conditions()
for station, data in conditions.items():
    print(f"\n{data['name']}:")
    for param, values in data['parameters'].items():
        print(f"  {param}: {values['value']} {values['unit']} - {values['status'].value}")

# Check compliance
compliance = monitor.check_compliance(
    datetime.now() - timedelta(days=1),
    datetime.now()
)
for record in compliance:
    if record.status != ComplianceStatus.COMPLIANT:
        print(f"⚠️ {record.parameter.value}: {record.measured_value} vs limit {record.limit_value}")

# Generate daily report
report = monitor.generate_daily_report()
print(f"\nDaily Status: {report.compliance_status.value}")
print(f"Exceedances: {report.exceedances}")

# Full report
print(monitor.generate_report())

Requirements

pip install (no external dependencies)
Weekly Installs
4
GitHub Stars
52
First Seen
11 days ago
Installed on
opencode4
gemini-cli4
antigravity4
github-copilot4
codex4
kimi-cli4