audit-logging
Installation
SKILL.md
Audit Logging
Implement comprehensive audit logging for compliance, security monitoring, and forensic analysis across infrastructure and applications.
When to Use
- Setting up centralized logging for compliance frameworks (SOC 2, HIPAA, PCI DSS)
- Implementing security event monitoring and alerting
- Building audit trails for regulatory requirements
- Configuring log retention and tamper-proof storage
- Integrating application logs with SIEM platforms
Log Categories
audit_events:
authentication:
- Login attempts (success and failure)
- MFA enrollment and verification events
- Session creation, renewal, and termination
- Password changes and resets
- API key and token generation
authorization:
- Access grants and denials
- Permission changes and role assignments
- Privilege escalation events
- Resource sharing modifications
- Policy evaluation results
data_access:
- Read operations on sensitive data
- Write and update operations
- Delete and purge operations
- Bulk export and download events
- Data classification changes
administrative:
- Configuration changes
- User and group management
- System startup and shutdown
- Backup and restore operations
- Network and firewall rule changes
system:
- Service health state changes
- Resource provisioning and deprovisioning
- Certificate and key rotation events
- Scheduled job execution results
- Integration and webhook events
Rsyslog Configuration for Centralized Logging
# /etc/rsyslog.d/50-audit.conf
# Load imfile module to read application logs
module(load="imfile")
# Forward auth logs
input(type="imfile"
File="/var/log/auth.log"
Tag="auth"
Severity="info"
Facility="auth"
)
# Forward application audit logs
input(type="imfile"
File="/var/log/app/audit.log"
Tag="app-audit"
Severity="info"
Facility="local0"
)
# Structured JSON template
template(name="json-audit" type="list") {
constant(value="{")
constant(value="\"@timestamp\":\"") property(name="timereported" dateFormat="rfc3339")
constant(value="\",\"host\":\"") property(name="hostname")
constant(value="\",\"severity\":\"") property(name="syslogseverity-text")
constant(value="\",\"facility\":\"") property(name="syslogfacility-text")
constant(value="\",\"tag\":\"") property(name="syslogtag" format="json")
constant(value="\",\"message\":\"") property(name="msg" format="json")
constant(value="\"}\n")
}
# Forward to central syslog server over TLS
action(
type="omfwd"
target="syslog.internal.example.com"
port="6514"
protocol="tcp"
StreamDriver="gtls"
StreamDriverMode="1"
StreamDriverAuthMode="x509/name"
template="json-audit"
queue.type="LinkedList"
queue.size="50000"
queue.filename="fwd_audit"
queue.saveonshutdown="on"
action.resumeRetryCount="-1"
)
Journald Configuration for Persistent Logging
# /etc/systemd/journald.conf
[Journal]
Storage=persistent
Compress=yes
Seal=yes
SplitMode=uid
MaxRetentionSec=365d
MaxFileSec=30d
SystemMaxUse=10G
SystemKeepFree=2G
ForwardToSyslog=yes
# Query journald for audit events
journalctl _TRANSPORT=audit --since "24 hours ago" --output json-pretty
# Filter by specific audit types
journalctl _AUDIT_TYPE=1112 --since today # user login events
journalctl _AUDIT_TYPE=1100 --since today # user auth events
# Export for offline analysis
journalctl --since "7 days ago" --output export > /backup/journal-export.bin
Application Logging with Structured JSON
import logging
import json
import hashlib
from datetime import datetime, timezone
from functools import wraps
class AuditLogger:
def __init__(self, service_name, logger_name="audit"):
self.service = service_name
self.logger = logging.getLogger(logger_name)
handler = logging.FileHandler("/var/log/app/audit.log")
handler.setFormatter(logging.Formatter("%(message)s"))
self.logger.addHandler(handler)
self.logger.setLevel(logging.INFO)
self._prev_hash = None
def log_event(self, event_type, user, resource, action, result,
metadata=None, source_ip=None):
log_entry = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"service": self.service,
"event_type": event_type,
"user": user,
"resource": resource,
"action": action,
"result": result,
"source_ip": source_ip,
"metadata": metadata or {},
}
# Chain hash for tamper detection
raw = json.dumps(log_entry, sort_keys=True)
log_entry["prev_hash"] = self._prev_hash
log_entry["hash"] = hashlib.sha256(
f"{self._prev_hash}:{raw}".encode()
).hexdigest()
self._prev_hash = log_entry["hash"]
self.logger.info(json.dumps(log_entry))
def log_auth(self, user, action, success, source_ip=None, mfa=False):
self.log_event(
event_type="authentication",
user=user,
resource="auth-service",
action=action,
result="success" if success else "failure",
metadata={"mfa_used": mfa},
source_ip=source_ip,
)
def log_data_access(self, user, resource, operation, record_count=0,
source_ip=None):
self.log_event(
event_type="data_access",
user=user,
resource=resource,
action=operation,
result="success",
metadata={"record_count": record_count},
source_ip=source_ip,
)
def audit_trail(audit_logger, resource_name):
"""Decorator to automatically audit function calls."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
user = kwargs.get("current_user", "system")
try:
result = func(*args, **kwargs)
audit_logger.log_event(
event_type="operation",
user=user,
resource=resource_name,
action=func.__name__,
result="success",
)
return result
except Exception as e:
audit_logger.log_event(
event_type="operation",
user=user,
resource=resource_name,
action=func.__name__,
result="failure",
metadata={"error": str(e)},
)
raise
return wrapper
return decorator
Fluentd / Fluent Bit Log Aggregation
# fluent-bit.conf - lightweight agent on each node
[SERVICE]
Flush 5
Daemon Off
Log_Level info
Parsers_File parsers.conf
[INPUT]
Name tail
Path /var/log/app/audit.log
Parser json
Tag audit.app
Refresh_Interval 5
Rotate_Wait 30
[INPUT]
Name systemd
Tag audit.system
Systemd_Filter _TRANSPORT=audit
[FILTER]
Name modify
Match audit.*
Add cluster ${CLUSTER_NAME}
Add node ${NODE_NAME}
[OUTPUT]
Name es
Match audit.*
Host elasticsearch.internal.example.com
Port 9200
Index audit-logs
Type _doc
tls On
tls.verify On
Retry_Limit 5
[OUTPUT]
Name s3
Match audit.*
region us-east-1
bucket audit-logs-archive
total_file_size 50M
upload_timeout 10m
s3_key_format /logs/%Y/%m/%d/$TAG/%H_%M_%S.gz
compression gzip
Elasticsearch Index Lifecycle for Retention
{
"policy": {
"phases": {
"hot": {
"min_age": "0ms",
"actions": {
"rollover": {
"max_size": "50gb",
"max_age": "1d"
},
"set_priority": { "priority": 100 }
}
},
"warm": {
"min_age": "7d",
"actions": {
"shrink": { "number_of_shards": 1 },
"forcemerge": { "max_num_segments": 1 },
"set_priority": { "priority": 50 }
}
},
"cold": {
"min_age": "30d",
"actions": {
"freeze": {},
"set_priority": { "priority": 0 }
}
},
"delete": {
"min_age": "365d",
"actions": { "delete": {} }
}
}
}
}
Retention Policy by Compliance Framework
retention_requirements:
soc2:
minimum: 1 year
recommended: 3 years
notes: "Based on audit period and report requirements"
hipaa:
minimum: 6 years
notes: "From date of creation or last effective date"
pci_dss:
minimum: 1 year
immediately_available: 3 months
notes: "Req 10.7 - retain for at least one year, 3 months immediately available"
gdpr:
minimum: "As long as necessary for processing purpose"
notes: "Apply data minimization; delete when no longer needed"
fedramp:
minimum: 3 years
notes: "AU-11 control requirement"
iso27001:
minimum: "Defined by organization policy"
recommended: 3 years
notes: "A.12.4.1 - retention period must be defined"
Log Integrity Verification Script
#!/usr/bin/env bash
# verify-log-integrity.sh - Verify log file checksums against stored hashes
LOG_DIR="/var/log/app"
HASH_FILE="/var/log/app/.checksums"
ALERT_WEBHOOK="${ALERT_WEBHOOK_URL}"
verify_logs() {
local failures=0
while IFS=' ' read -r stored_hash filename; do
if [ -f "$filename" ]; then
current_hash=$(sha256sum "$filename" | awk '{print $1}')
if [ "$stored_hash" != "$current_hash" ]; then
echo "TAMPER DETECTED: $filename"
failures=$((failures + 1))
curl -s -X POST "$ALERT_WEBHOOK" \
-H "Content-Type: application/json" \
-d "{\"text\":\"ALERT: Audit log tamper detected on $(hostname): $filename\"}"
fi
else
echo "MISSING: $filename"
failures=$((failures + 1))
fi
done < "$HASH_FILE"
return $failures
}
update_checksums() {
find "$LOG_DIR" -name "*.log" -type f -exec sha256sum {} \; > "$HASH_FILE"
chmod 440 "$HASH_FILE"
}
case "${1:-verify}" in
verify) verify_logs ;;
update) update_checksums ;;
*) echo "Usage: $0 {verify|update}" ;;
esac
SIEM Integration Checklist
siem_integration:
log_sources:
- [ ] Operating system auth logs (syslog, journald)
- [ ] Application audit logs (structured JSON)
- [ ] Cloud provider audit trails (CloudTrail, Activity Log, Audit Logs)
- [ ] Database query and access logs
- [ ] Network flow logs and firewall logs
- [ ] Container and orchestrator logs (Kubernetes audit)
- [ ] WAF and CDN access logs
- [ ] VPN and remote access logs
normalization:
- [ ] Common event format (CEF) or OCSF schema
- [ ] Consistent timestamp format (ISO 8601 / UTC)
- [ ] Unified user identity fields
- [ ] Standardized severity levels
alerting_rules:
- [ ] Multiple failed login attempts (brute force)
- [ ] Login from unusual location or device
- [ ] Privilege escalation events
- [ ] Sensitive data bulk export
- [ ] Administrative action outside change window
- [ ] Service account anomalous activity
- [ ] Log forwarding gap or interruption
operational:
- [ ] Log pipeline health monitoring
- [ ] Storage capacity alerting
- [ ] Retention policy enforcement verified
- [ ] Backup of log archives confirmed
- [ ] Access to log systems restricted and audited
Best Practices
- Use structured logging (JSON) with consistent field names across all services
- Ship logs to a centralized platform with write-once storage for tamper protection
- Implement hash chaining or digital signatures for log integrity verification
- Define and enforce retention policies per compliance framework requirements
- Set up real-time alerting for high-severity security events
- Separate audit logs from application debug logs to reduce noise
- Never log sensitive data (passwords, tokens, PII) in audit entries
- Monitor the logging pipeline itself to detect gaps in coverage
- Regularly test log restoration from archives to verify recoverability
- Rotate and compress logs to manage storage while meeting retention windows
Weekly Installs
35
Repository
bagelhole/devop…t-skillsGitHub Stars
18
First Seen
5 days ago
Security Audits