webhook-integration-patterns
SKILL.md
Webhook Integration Patterns
This skill provides guidance for designing and implementing robust webhook systems—both as providers (sending webhooks) and consumers (receiving webhooks).
Core Competencies
- Delivery Guarantees: At-least-once, exactly-once semantics
- Security: Signature verification, secret rotation
- Reliability: Retry strategies, dead letter handling
- Scalability: Queue-based processing, rate limiting
Webhook Fundamentals
What Webhooks Solve
Polling (inefficient): Webhooks (efficient):
┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐
│ Client │ │ Server │ │ Client │ │ Server │
└───┬────┘ └───┬────┘ └───┬────┘ └───┬────┘
│ Any news? │ │ │
│─────────────▶│ │ │
│ No │ │ Event! │
│◀─────────────│ │◀─────────────│
│ Any news? │ │ POST /hook │
│─────────────▶│ │◀─────────────│
│ No │ │ 200 OK │
│◀─────────────│ │─────────────▶│
│ Any news? │
│─────────────▶│ Push-based notification
│ YES! │ instead of polling
│◀─────────────│
Webhook Anatomy
POST /webhooks/payment HTTP/1.1
Host: your-app.com
Content-Type: application/json
X-Webhook-Signature: sha256=abc123...
X-Webhook-ID: evt_12345
X-Webhook-Timestamp: 1706616000
{
"id": "evt_12345",
"type": "payment.completed",
"created_at": "2024-01-30T12:00:00Z",
"data": {
"payment_id": "pay_abc",
"amount": 1000,
"currency": "USD"
}
}
Key headers:
- Signature: Cryptographic proof of authenticity
- ID: Unique event identifier for deduplication
- Timestamp: When the event occurred
Webhook Provider Design
Event Schema Design
class WebhookEvent:
"""Standard webhook event structure"""
def __init__(self, event_type, data, idempotency_key=None):
self.id = self._generate_id()
self.type = event_type
self.created_at = datetime.utcnow().isoformat()
self.api_version = "2024-01-30"
self.data = data
self.idempotency_key = idempotency_key or self.id
def to_payload(self):
return {
"id": self.id,
"type": self.type,
"created_at": self.created_at,
"api_version": self.api_version,
"data": self.data
}
# Event types follow resource.action pattern
EVENT_TYPES = [
"payment.created",
"payment.completed",
"payment.failed",
"subscription.created",
"subscription.updated",
"subscription.cancelled",
"customer.created",
"customer.deleted"
]
Signature Generation
import hmac
import hashlib
import time
class WebhookSigner:
"""Sign webhook payloads for verification"""
def __init__(self, secret): # allow-secret
self.secret = secret.encode() # allow-secret
def sign(self, payload, timestamp=None):
"""Generate HMAC signature"""
timestamp = timestamp or int(time.time())
payload_str = json.dumps(payload, separators=(',', ':'))
# Include timestamp to prevent replay attacks
signed_payload = f"{timestamp}.{payload_str}"
signature = hmac.new(
self.secret,
signed_payload.encode(),
hashlib.sha256
).hexdigest()
return {
'signature': f"sha256={signature}",
'timestamp': timestamp
}
def create_headers(self, payload):
"""Generate all webhook headers"""
sign_data = self.sign(payload)
return {
'Content-Type': 'application/json',
'X-Webhook-Signature': sign_data['signature'],
'X-Webhook-Timestamp': str(sign_data['timestamp']),
'X-Webhook-ID': payload['id']
}
Delivery System
import asyncio
from datetime import datetime, timedelta
class WebhookDeliverySystem:
"""Reliable webhook delivery with retries"""
RETRY_SCHEDULE = [
timedelta(seconds=10),
timedelta(minutes=1),
timedelta(minutes=5),
timedelta(minutes=30),
timedelta(hours=1),
timedelta(hours=6),
timedelta(hours=24)
]
def __init__(self, signer, http_client):
self.signer = signer
self.http = http_client
self.delivery_log = []
async def deliver(self, endpoint, event, attempt=0):
"""Attempt webhook delivery"""
payload = event.to_payload()
headers = self.signer.create_headers(payload)
try:
response = await self.http.post(
endpoint.url,
json=payload,
headers=headers,
timeout=30
)
self._log_attempt(endpoint, event, attempt, response)
if response.status_code in (200, 201, 202, 204):
return {'status': 'delivered', 'attempts': attempt + 1}
# Non-success status - schedule retry
return await self._schedule_retry(endpoint, event, attempt)
except Exception as e:
self._log_attempt(endpoint, event, attempt, error=e)
return await self._schedule_retry(endpoint, event, attempt)
async def _schedule_retry(self, endpoint, event, attempt):
"""Schedule next retry or give up"""
if attempt >= len(self.RETRY_SCHEDULE):
self._move_to_dead_letter(endpoint, event)
return {'status': 'failed', 'attempts': attempt + 1}
delay = self.RETRY_SCHEDULE[attempt]
# In production: use job queue with delayed execution
await asyncio.sleep(delay.total_seconds())
return await self.deliver(endpoint, event, attempt + 1)
def _move_to_dead_letter(self, endpoint, event):
"""Store failed webhook for manual review"""
# Store in dead letter queue/table
pass
Endpoint Management
class WebhookEndpoint:
"""Subscriber endpoint configuration"""
def __init__(self, url, events, secret=None): # allow-secret
self.id = generate_id()
self.url = url
self.events = events # List of subscribed event types
self.secret = secret or generate_secret() # allow-secret
self.status = 'active'
self.created_at = datetime.utcnow()
# Health tracking
self.consecutive_failures = 0
self.last_success = None
self.last_failure = None
def should_receive(self, event_type):
"""Check if endpoint subscribes to this event"""
if '*' in self.events:
return True
return event_type in self.events
def record_success(self):
self.consecutive_failures = 0
self.last_success = datetime.utcnow()
if self.status == 'disabled':
self.status = 'active'
def record_failure(self):
self.consecutive_failures += 1
self.last_failure = datetime.utcnow()
# Auto-disable after too many failures
if self.consecutive_failures >= 10:
self.status = 'disabled'
Webhook Consumer Design
Signature Verification
class WebhookVerifier:
"""Verify incoming webhook signatures"""
TIMESTAMP_TOLERANCE = 300 # 5 minutes
def __init__(self, secret): # allow-secret
self.secret = secret.encode() # allow-secret
def verify(self, payload, signature, timestamp):
"""Verify webhook authenticity"""
# Check timestamp freshness
current_time = int(time.time())
if abs(current_time - int(timestamp)) > self.TIMESTAMP_TOLERANCE:
raise WebhookVerificationError("Timestamp too old")
# Compute expected signature
signed_payload = f"{timestamp}.{payload}"
expected = hmac.new(
self.secret,
signed_payload.encode(),
hashlib.sha256
).hexdigest()
expected_sig = f"sha256={expected}"
# Constant-time comparison to prevent timing attacks
if not hmac.compare_digest(signature, expected_sig):
raise WebhookVerificationError("Invalid signature")
return True
# Flask endpoint example
@app.route('/webhooks/provider', methods=['POST'])
def handle_webhook():
verifier = WebhookVerifier(WEBHOOK_SECRET)
try:
verifier.verify(
request.data.decode(),
request.headers.get('X-Webhook-Signature'),
request.headers.get('X-Webhook-Timestamp')
)
except WebhookVerificationError:
return 'Invalid signature', 401
event = request.json
process_webhook(event)
return 'OK', 200
Idempotent Processing
class IdempotentWebhookProcessor:
"""Process webhooks exactly once"""
def __init__(self, storage):
self.storage = storage # Redis, database, etc.
self.lock_ttl = 300 # 5 minute lock
async def process(self, event):
"""Process webhook idempotently"""
event_id = event['id']
# Check if already processed
if await self.storage.exists(f"webhook:processed:{event_id}"):
return {'status': 'duplicate', 'event_id': event_id}
# Acquire lock to prevent concurrent processing
lock_key = f"webhook:lock:{event_id}"
if not await self.storage.set_nx(lock_key, "1", ex=self.lock_ttl):
return {'status': 'processing', 'event_id': event_id}
try:
# Process the event
result = await self._handle_event(event)
# Mark as processed (with long TTL for deduplication)
await self.storage.set(
f"webhook:processed:{event_id}",
json.dumps(result),
ex=86400 * 7 # Keep for 7 days
)
return {'status': 'processed', 'event_id': event_id}
finally:
await self.storage.delete(lock_key)
async def _handle_event(self, event):
"""Route event to appropriate handler"""
handlers = {
'payment.completed': self._handle_payment_completed,
'subscription.cancelled': self._handle_subscription_cancelled,
# ... more handlers
}
handler = handlers.get(event['type'])
if handler:
return await handler(event['data'])
return {'skipped': True, 'reason': 'unknown_event_type'}
Queue-Based Processing
class QueuedWebhookHandler:
"""Decouple receipt from processing"""
def __init__(self, queue):
self.queue = queue # Redis, SQS, RabbitMQ, etc.
async def receive(self, event):
"""Acknowledge receipt quickly, process async"""
# Validate immediately
self._validate_event_structure(event)
# Queue for processing
await self.queue.enqueue(
'webhook_processing',
event,
deduplication_id=event['id']
)
# Return 200 immediately (don't make sender wait)
return {'status': 'accepted'}
async def process_queue(self):
"""Worker that processes queued webhooks"""
while True:
event = await self.queue.dequeue('webhook_processing')
if event:
try:
await self._process_event(event)
await self.queue.ack(event)
except Exception as e:
await self.queue.nack(event, requeue=True)
logging.error(f"Webhook processing failed: {e}")
Security Patterns
Secret Rotation
class SecretRotation:
"""Support rolling secret updates"""
def __init__(self):
self.current_secret = None
self.previous_secret = None
self.rotation_timestamp = None
def rotate(self, new_secret):
"""Rotate to new secret while supporting old"""
self.previous_secret = self.current_secret
self.current_secret = new_secret
self.rotation_timestamp = datetime.utcnow()
def get_verification_secrets(self):
"""Return secrets to try for verification"""
secrets = [self.current_secret]
# Accept previous secret for grace period
if self.previous_secret and self.rotation_timestamp:
grace_period = timedelta(hours=24)
if datetime.utcnow() - self.rotation_timestamp < grace_period:
secrets.append(self.previous_secret)
return secrets
IP Allowlisting
ALLOWED_IPS = {
'stripe': ['3.18.12.63', '3.130.192.231', ...],
'github': ['192.30.252.0/22', '185.199.108.0/22', ...],
'twilio': ['54.172.60.0/23', '54.244.51.0/24', ...]
}
def verify_source_ip(request, provider):
"""Verify webhook comes from expected IP"""
client_ip = request.remote_addr
allowed = ALLOWED_IPS.get(provider, [])
for allowed_range in allowed:
if ip_in_range(client_ip, allowed_range):
return True
return False
Error Handling
Response Codes
| Code | Meaning | Provider Action |
|---|---|---|
| 200-204 | Success | Mark delivered |
| 400 | Bad request | Don't retry (your bug) |
| 401/403 | Auth failed | Disable endpoint |
| 404 | Not found | Disable endpoint |
| 429 | Rate limited | Retry with backoff |
| 500+ | Server error | Retry |
| Timeout | No response | Retry |
Dead Letter Queue
class DeadLetterQueue:
"""Store and manage failed webhooks"""
async def add(self, endpoint, event, failure_reason, attempts):
"""Add failed webhook to DLQ"""
await self.storage.add({
'id': generate_id(),
'endpoint_id': endpoint.id,
'endpoint_url': endpoint.url,
'event': event.to_payload(),
'failure_reason': str(failure_reason),
'attempts': attempts,
'added_at': datetime.utcnow().isoformat()
})
async def retry(self, dlq_item_id):
"""Manually retry a DLQ item"""
item = await self.storage.get(dlq_item_id)
endpoint = await self.get_endpoint(item['endpoint_id'])
event = WebhookEvent.from_payload(item['event'])
result = await self.delivery_system.deliver(endpoint, event)
if result['status'] == 'delivered':
await self.storage.remove(dlq_item_id)
return result
async def purge_old(self, days=30):
"""Clean up old DLQ items"""
cutoff = datetime.utcnow() - timedelta(days=days)
await self.storage.delete_before(cutoff)
Best Practices
For Providers
- Include event ID for consumer deduplication
- Sign payloads with HMAC-SHA256 minimum
- Include timestamps to prevent replay
- Retry with exponential backoff (5 attempts minimum)
- Provide webhook logs in your dashboard
- Support event filtering by type
- Version your payloads for backward compatibility
For Consumers
- Verify signatures before processing
- Respond quickly (< 5 seconds)
- Process asynchronously via queue
- Implement idempotency using event ID
- Log everything for debugging
- Monitor for failures proactively
References
references/webhook-security.md- Detailed security implementationreferences/provider-examples.md- Stripe, GitHub, Twilio patternsreferences/testing-webhooks.md- Local development and testing
Weekly Installs
2
Repository
4444j99/a-i--skillsGitHub Stars
2
First Seen
4 days ago
Security Audits
Installed on
amp2
cline2
openclaw2
opencode2
cursor2
kimi-cli2