environmental-monitoring
SKILL.md
Environmental Monitoring
Overview
Monitor and analyze environmental conditions on construction sites including air quality, noise, vibration, dust, and weather. Support regulatory compliance, worker safety, and community relations through real-time environmental tracking.
Environmental Monitoring System
┌─────────────────────────────────────────────────────────────────┐
│ ENVIRONMENTAL MONITORING │
├─────────────────────────────────────────────────────────────────┤
│ │
│ SENSORS MONITORING COMPLIANCE │
│ ─────── ────────── ────────── │
│ │
│ 💨 Air Quality ───┐ ✅ OSHA limits │
│ 🔊 Noise Level ───┼─────→ Real-time ────────→ ✅ EPA limits │
│ 📊 Vibration ───┤ Dashboard ✅ Local codes │
│ 🌫️ Dust/PM ───┤ Alerts ✅ Permits │
│ 🌡️ Weather ───┘ Reports ✅ Neighbors │
│ │
│ THRESHOLDS: │
│ • Noise: 85 dB (OSHA 8hr TWA) │
│ • PM2.5: 35 µg/m³ (EPA 24hr) │
│ • Vibration: 25 mm/s (structural) │
│ • CO: 50 ppm (OSHA ceiling) │
│ │
└─────────────────────────────────────────────────────────────────┘
Technical Implementation
from dataclasses import dataclass, field
from typing import List, Dict, Optional, Tuple
from datetime import datetime, timedelta
from enum import Enum
import statistics
import math
class ParameterType(Enum):
NOISE = "noise"
PM25 = "pm25"
PM10 = "pm10"
CO = "co"
CO2 = "co2"
VOC = "voc"
VIBRATION = "vibration"
TEMPERATURE = "temperature"
HUMIDITY = "humidity"
WIND_SPEED = "wind_speed"
WIND_DIRECTION = "wind_direction"
RAINFALL = "rainfall"
class ComplianceStatus(Enum):
COMPLIANT = "compliant"
WARNING = "warning"
EXCEEDANCE = "exceedance"
CRITICAL = "critical"
class AlertType(Enum):
THRESHOLD_WARNING = "threshold_warning"
THRESHOLD_EXCEEDANCE = "threshold_exceedance"
EQUIPMENT_MALFUNCTION = "equipment_malfunction"
WEATHER_ALERT = "weather_alert"
COMMUNITY_COMPLAINT = "community_complaint"
@dataclass
class RegulatoryLimit:
parameter: ParameterType
limit_value: float
unit: str
averaging_period_hours: float # e.g., 8 for 8-hour TWA
regulation: str # e.g., "OSHA", "EPA"
description: str
@dataclass
class EnvironmentalReading:
station_id: str
parameter: ParameterType
timestamp: datetime
value: float
unit: str
quality_flag: str = "valid"
@dataclass
class MonitoringStation:
id: str
name: str
location: Dict # {lat, lon, description}
parameters: List[ParameterType]
installation_date: datetime
last_calibration: datetime
status: str = "active"
@dataclass
class ComplianceRecord:
parameter: ParameterType
regulation: str
limit_value: float
measured_value: float
averaging_period: str
status: ComplianceStatus
timestamp: datetime
location: str
@dataclass
class EnvironmentalAlert:
id: str
alert_type: AlertType
parameter: ParameterType
station_id: str
timestamp: datetime
value: float
threshold: float
message: str
acknowledged: bool = False
resolved: bool = False
resolution_notes: str = ""
@dataclass
class DailyReport:
date: datetime
site_name: str
parameters_monitored: int
readings_collected: int
exceedances: int
alerts_triggered: int
compliance_status: ComplianceStatus
summary: Dict[str, Dict]
class EnvironmentalMonitor:
"""Monitor environmental conditions on construction sites."""
# Default regulatory limits
REGULATORY_LIMITS = {
ParameterType.NOISE: [
RegulatoryLimit(ParameterType.NOISE, 85, "dBA", 8.0, "OSHA", "8-hour TWA"),
RegulatoryLimit(ParameterType.NOISE, 90, "dBA", 8.0, "OSHA", "Action level"),
RegulatoryLimit(ParameterType.NOISE, 115, "dBA", 0.25, "OSHA", "15-min max"),
],
ParameterType.PM25: [
RegulatoryLimit(ParameterType.PM25, 35, "µg/m³", 24.0, "EPA", "24-hour standard"),
RegulatoryLimit(ParameterType.PM25, 12, "µg/m³", 8760.0, "EPA", "Annual standard"),
],
ParameterType.PM10: [
RegulatoryLimit(ParameterType.PM10, 150, "µg/m³", 24.0, "EPA", "24-hour standard"),
],
ParameterType.CO: [
RegulatoryLimit(ParameterType.CO, 50, "ppm", 0.0, "OSHA", "Ceiling limit"),
RegulatoryLimit(ParameterType.CO, 35, "ppm", 8.0, "OSHA", "8-hour TWA"),
],
ParameterType.VIBRATION: [
RegulatoryLimit(ParameterType.VIBRATION, 25, "mm/s", 0.0, "ISO 4866", "Structural damage threshold"),
RegulatoryLimit(ParameterType.VIBRATION, 5, "mm/s", 0.0, "DIN 4150", "Sensitive structures"),
],
}
def __init__(self, site_name: str):
self.site_name = site_name
self.stations: Dict[str, MonitoringStation] = {}
self.readings: List[EnvironmentalReading] = []
self.alerts: List[EnvironmentalAlert] = []
self.custom_limits: Dict[ParameterType, List[RegulatoryLimit]] = {}
def add_station(self, id: str, name: str, location: Dict,
parameters: List[ParameterType]) -> MonitoringStation:
"""Add monitoring station."""
station = MonitoringStation(
id=id,
name=name,
location=location,
parameters=parameters,
installation_date=datetime.now(),
last_calibration=datetime.now()
)
self.stations[id] = station
return station
def add_custom_limit(self, parameter: ParameterType, limit_value: float,
unit: str, averaging_hours: float, regulation: str,
description: str):
"""Add custom regulatory limit."""
limit = RegulatoryLimit(
parameter=parameter,
limit_value=limit_value,
unit=unit,
averaging_period_hours=averaging_hours,
regulation=regulation,
description=description
)
if parameter not in self.custom_limits:
self.custom_limits[parameter] = []
self.custom_limits[parameter].append(limit)
def record_reading(self, station_id: str, parameter: ParameterType,
value: float, unit: str,
timestamp: datetime = None) -> EnvironmentalReading:
"""Record environmental reading."""
if station_id not in self.stations:
raise ValueError(f"Unknown station: {station_id}")
reading = EnvironmentalReading(
station_id=station_id,
parameter=parameter,
timestamp=timestamp or datetime.now(),
value=value,
unit=unit
)
self.readings.append(reading)
# Check against limits
self._check_limits(station_id, parameter, value)
return reading
def record_batch(self, readings: List[Dict]) -> int:
"""Record multiple readings."""
count = 0
for r in readings:
try:
self.record_reading(
station_id=r['station_id'],
parameter=ParameterType(r['parameter']),
value=r['value'],
unit=r['unit'],
timestamp=r.get('timestamp')
)
count += 1
except Exception:
pass
return count
def _check_limits(self, station_id: str, parameter: ParameterType, value: float):
"""Check value against regulatory limits."""
# Get applicable limits
limits = self.REGULATORY_LIMITS.get(parameter, [])
limits.extend(self.custom_limits.get(parameter, []))
for limit in limits:
if limit.averaging_period_hours == 0:
# Instantaneous limit
check_value = value
else:
# Time-weighted average
check_value = self._calculate_twa(
station_id, parameter, limit.averaging_period_hours
)
if check_value is None:
continue
# Check against limit
if check_value >= limit.limit_value:
self._create_alert(
station_id, parameter, check_value, limit
)
elif check_value >= limit.limit_value * 0.8:
# Warning at 80% of limit
self._create_alert(
station_id, parameter, check_value, limit,
is_warning=True
)
def _calculate_twa(self, station_id: str, parameter: ParameterType,
hours: float) -> Optional[float]:
"""Calculate time-weighted average."""
cutoff = datetime.now() - timedelta(hours=hours)
readings = [r for r in self.readings
if r.station_id == station_id
and r.parameter == parameter
and r.timestamp > cutoff]
if not readings:
return None
return statistics.mean([r.value for r in readings])
def _create_alert(self, station_id: str, parameter: ParameterType,
value: float, limit: RegulatoryLimit,
is_warning: bool = False):
"""Create environmental alert."""
# Avoid duplicate alerts
recent_alerts = [a for a in self.alerts
if a.station_id == station_id
and a.parameter == parameter
and not a.resolved
and (datetime.now() - a.timestamp).total_seconds() < 3600]
if recent_alerts:
return
alert_type = (AlertType.THRESHOLD_WARNING if is_warning
else AlertType.THRESHOLD_EXCEEDANCE)
station = self.stations.get(station_id)
alert = EnvironmentalAlert(
id=f"ENV-{len(self.alerts)+1:05d}",
alert_type=alert_type,
parameter=parameter,
station_id=station_id,
timestamp=datetime.now(),
value=value,
threshold=limit.limit_value,
message=f"{parameter.value} {'approaching' if is_warning else 'exceeds'} "
f"{limit.regulation} limit ({limit.limit_value} {limit.unit}) "
f"at {station.name if station else station_id}"
)
self.alerts.append(alert)
def get_current_conditions(self, station_id: str = None) -> Dict:
"""Get current environmental conditions."""
conditions = {}
stations = ([self.stations[station_id]] if station_id
else self.stations.values())
for station in stations:
station_conditions = {}
for param in station.parameters:
# Get latest reading
readings = [r for r in self.readings
if r.station_id == station.id
and r.parameter == param]
if readings:
latest = max(readings, key=lambda r: r.timestamp)
station_conditions[param.value] = {
"value": latest.value,
"unit": latest.unit,
"timestamp": latest.timestamp,
"status": self._get_compliance_status(param, latest.value)
}
conditions[station.id] = {
"name": station.name,
"location": station.location,
"parameters": station_conditions
}
return conditions
def _get_compliance_status(self, parameter: ParameterType,
value: float) -> ComplianceStatus:
"""Determine compliance status for value."""
limits = self.REGULATORY_LIMITS.get(parameter, [])
limits.extend(self.custom_limits.get(parameter, []))
# Check instantaneous limits
instant_limits = [l for l in limits if l.averaging_period_hours == 0]
for limit in instant_limits:
if value >= limit.limit_value:
return ComplianceStatus.EXCEEDANCE
elif value >= limit.limit_value * 0.9:
return ComplianceStatus.WARNING
return ComplianceStatus.COMPLIANT
def check_compliance(self, start_date: datetime,
end_date: datetime) -> List[ComplianceRecord]:
"""Check compliance for period."""
records = []
for station in self.stations.values():
for param in station.parameters:
limits = self.REGULATORY_LIMITS.get(param, [])
limits.extend(self.custom_limits.get(param, []))
for limit in limits:
# Calculate average for period
readings = [r for r in self.readings
if r.station_id == station.id
and r.parameter == param
and start_date <= r.timestamp <= end_date]
if not readings:
continue
avg_value = statistics.mean([r.value for r in readings])
max_value = max(r.value for r in readings)
# Check appropriate value
if limit.averaging_period_hours == 0:
check_value = max_value
period_str = "Instantaneous"
else:
check_value = avg_value
period_str = f"{limit.averaging_period_hours:.0f}-hour avg"
# Determine status
if check_value >= limit.limit_value:
status = ComplianceStatus.EXCEEDANCE
elif check_value >= limit.limit_value * 0.9:
status = ComplianceStatus.WARNING
else:
status = ComplianceStatus.COMPLIANT
records.append(ComplianceRecord(
parameter=param,
regulation=limit.regulation,
limit_value=limit.limit_value,
measured_value=check_value,
averaging_period=period_str,
status=status,
timestamp=end_date,
location=station.name
))
return records
def get_exceedance_summary(self, days: int = 30) -> Dict:
"""Get summary of exceedances."""
cutoff = datetime.now() - timedelta(days=days)
recent_alerts = [a for a in self.alerts
if a.timestamp > cutoff
and a.alert_type == AlertType.THRESHOLD_EXCEEDANCE]
summary = {
"period_days": days,
"total_exceedances": len(recent_alerts),
"by_parameter": {},
"by_station": {},
"recent_events": []
}
for alert in recent_alerts:
# By parameter
param = alert.parameter.value
summary["by_parameter"][param] = summary["by_parameter"].get(param, 0) + 1
# By station
station = alert.station_id
summary["by_station"][station] = summary["by_station"].get(station, 0) + 1
# Recent events
summary["recent_events"] = sorted(
recent_alerts, key=lambda a: a.timestamp, reverse=True
)[:10]
return summary
def generate_daily_report(self, date: datetime = None) -> DailyReport:
"""Generate daily environmental report."""
if date is None:
date = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
next_day = date + timedelta(days=1)
# Filter readings
day_readings = [r for r in self.readings
if date <= r.timestamp < next_day]
# Filter alerts
day_alerts = [a for a in self.alerts
if date <= a.timestamp < next_day]
# Check compliance
compliance_records = self.check_compliance(date, next_day)
exceedances = [r for r in compliance_records
if r.status == ComplianceStatus.EXCEEDANCE]
# Overall status
if exceedances:
overall_status = ComplianceStatus.EXCEEDANCE
elif any(r.status == ComplianceStatus.WARNING for r in compliance_records):
overall_status = ComplianceStatus.WARNING
else:
overall_status = ComplianceStatus.COMPLIANT
# Summary by parameter
param_summary = {}
for param in ParameterType:
param_readings = [r for r in day_readings if r.parameter == param]
if param_readings:
values = [r.value for r in param_readings]
param_summary[param.value] = {
"count": len(values),
"min": min(values),
"max": max(values),
"avg": statistics.mean(values),
"exceedances": len([r for r in compliance_records
if r.parameter == param
and r.status == ComplianceStatus.EXCEEDANCE])
}
return DailyReport(
date=date,
site_name=self.site_name,
parameters_monitored=len(set(r.parameter for r in day_readings)),
readings_collected=len(day_readings),
exceedances=len(exceedances),
alerts_triggered=len(day_alerts),
compliance_status=overall_status,
summary=param_summary
)
def generate_report(self) -> str:
"""Generate environmental monitoring report."""
lines = [
"# Environmental Monitoring Report",
"",
f"**Site:** {self.site_name}",
f"**Report Date:** {datetime.now().strftime('%Y-%m-%d %H:%M')}",
"",
"## Monitoring Stations",
"",
"| Station | Location | Parameters | Status |",
"|---------|----------|------------|--------|"
]
for station in self.stations.values():
params = ", ".join([p.value for p in station.parameters])
lines.append(
f"| {station.name} | {station.location.get('description', '-')} | "
f"{params} | {station.status} |"
)
# Current conditions
conditions = self.get_current_conditions()
lines.extend([
"",
"## Current Conditions",
""
])
for station_id, data in conditions.items():
lines.append(f"### {data['name']}")
lines.append("")
lines.append("| Parameter | Value | Status |")
lines.append("|-----------|-------|--------|")
for param, values in data['parameters'].items():
status_icon = ("✅" if values['status'] == ComplianceStatus.COMPLIANT
else "⚠️" if values['status'] == ComplianceStatus.WARNING
else "🔴")
lines.append(
f"| {param} | {values['value']:.1f} {values['unit']} | "
f"{status_icon} {values['status'].value} |"
)
lines.append("")
# Exceedance summary
exceedance_summary = self.get_exceedance_summary(30)
lines.extend([
"## 30-Day Exceedance Summary",
"",
f"**Total Exceedances:** {exceedance_summary['total_exceedances']}",
""
])
if exceedance_summary['by_parameter']:
lines.append("By Parameter:")
for param, count in exceedance_summary['by_parameter'].items():
lines.append(f"- {param}: {count}")
# Active alerts
active_alerts = [a for a in self.alerts if not a.resolved]
if active_alerts:
lines.extend([
"",
f"## Active Alerts ({len(active_alerts)})",
"",
"| Time | Parameter | Station | Value | Threshold |",
"|------|-----------|---------|-------|-----------|"
])
for alert in sorted(active_alerts, key=lambda a: a.timestamp, reverse=True)[:10]:
lines.append(
f"| {alert.timestamp.strftime('%Y-%m-%d %H:%M')} | "
f"{alert.parameter.value} | {alert.station_id} | "
f"{alert.value:.1f} | {alert.threshold} |"
)
return "\n".join(lines)
Quick Start
from datetime import datetime, timedelta
# Initialize monitor
monitor = EnvironmentalMonitor("Downtown Construction Site")
# Add monitoring stations
monitor.add_station(
"STA-001", "North Perimeter",
location={"lat": 40.7128, "lon": -74.0060, "description": "North fence line"},
parameters=[ParameterType.NOISE, ParameterType.PM25, ParameterType.PM10]
)
monitor.add_station(
"STA-002", "Equipment Area",
location={"lat": 40.7125, "lon": -74.0055, "description": "Near excavation"},
parameters=[ParameterType.NOISE, ParameterType.VIBRATION, ParameterType.CO]
)
# Add custom limit for local ordinance
monitor.add_custom_limit(
ParameterType.NOISE, 65, "dBA", 0,
"Local Ordinance", "Residential boundary limit"
)
# Record readings
monitor.record_reading("STA-001", ParameterType.NOISE, 78.5, "dBA")
monitor.record_reading("STA-001", ParameterType.PM25, 28.3, "µg/m³")
monitor.record_reading("STA-002", ParameterType.VIBRATION, 8.2, "mm/s")
# Batch record
readings = [
{"station_id": "STA-001", "parameter": "noise", "value": 82.0, "unit": "dBA"},
{"station_id": "STA-001", "parameter": "pm25", "value": 31.5, "unit": "µg/m³"},
{"station_id": "STA-002", "parameter": "noise", "value": 88.0, "unit": "dBA"}
]
monitor.record_batch(readings)
# Get current conditions
conditions = monitor.get_current_conditions()
for station, data in conditions.items():
print(f"\n{data['name']}:")
for param, values in data['parameters'].items():
print(f" {param}: {values['value']} {values['unit']} - {values['status'].value}")
# Check compliance
compliance = monitor.check_compliance(
datetime.now() - timedelta(days=1),
datetime.now()
)
for record in compliance:
if record.status != ComplianceStatus.COMPLIANT:
print(f"⚠️ {record.parameter.value}: {record.measured_value} vs limit {record.limit_value}")
# Generate daily report
report = monitor.generate_daily_report()
print(f"\nDaily Status: {report.compliance_status.value}")
print(f"Exceedances: {report.exceedances}")
# Full report
print(monitor.generate_report())
Requirements
pip install (no external dependencies)
Weekly Installs
4
Repository
datadrivenconst…tructionGitHub Stars
52
First Seen
11 days ago
Security Audits
Installed on
opencode4
gemini-cli4
antigravity4
github-copilot4
codex4
kimi-cli4