webhook-integration-patterns
Webhook Integration Patterns
This skill provides guidance for designing and implementing robust webhook systems—both as providers (sending webhooks) and consumers (receiving webhooks).
Core Competencies
- Delivery Guarantees: At-least-once, exactly-once semantics
- Security: Signature verification, secret rotation
- Reliability: Retry strategies, dead letter handling
- Scalability: Queue-based processing, rate limiting
Webhook Fundamentals
What Webhooks Solve
Polling (inefficient): Webhooks (efficient):
┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐
│ Client │ │ Server │ │ Client │ │ Server │
└───┬────┘ └───┬────┘ └───┬────┘ └───┬────┘
│ Any news? │ │ │
│─────────────▶│ │ │
│ No │ │ Event! │
│◀─────────────│ │◀─────────────│
│ Any news? │ │ POST /hook │
│─────────────▶│ │◀─────────────│
│ No │ │ 200 OK │
│◀─────────────│ │─────────────▶│
│ Any news? │
│─────────────▶│ Push-based notification
│ YES! │ instead of polling
│◀─────────────│
Webhook Anatomy
POST /webhooks/payment HTTP/1.1
Host: your-app.com
Content-Type: application/json
X-Webhook-Signature: sha256=abc123...
X-Webhook-ID: evt_12345
X-Webhook-Timestamp: 1706616000
{
"id": "evt_12345",
"type": "payment.completed",
"created_at": "2024-01-30T12:00:00Z",
"data": {
"payment_id": "pay_abc",
"amount": 1000,
"currency": "USD"
}
}
Key headers:
- Signature: Cryptographic proof of authenticity
- ID: Unique event identifier for deduplication
- Timestamp: When the event occurred
Webhook Provider Design
Event Schema Design
class WebhookEvent:
"""Standard webhook event structure"""
def __init__(self, event_type, data, idempotency_key=None):
self.id = self._generate_id()
self.type = event_type
self.created_at = datetime.utcnow().isoformat()
self.api_version = "2024-01-30"
self.data = data
self.idempotency_key = idempotency_key or self.id
def to_payload(self):
return {
"id": self.id,
"type": self.type,
"created_at": self.created_at,
"api_version": self.api_version,
"data": self.data
}
# Event types follow resource.action pattern
EVENT_TYPES = [
"payment.created",
"payment.completed",
"payment.failed",
"subscription.created",
"subscription.updated",
"subscription.cancelled",
"customer.created",
"customer.deleted"
]
Signature Generation
import hmac
import hashlib
import time
class WebhookSigner:
"""Sign webhook payloads for verification"""
def __init__(self, secret): # allow-secret
self.secret = secret.encode() # allow-secret
def sign(self, payload, timestamp=None):
"""Generate HMAC signature"""
timestamp = timestamp or int(time.time())
payload_str = json.dumps(payload, separators=(',', ':'))
# Include timestamp to prevent replay attacks
signed_payload = f"{timestamp}.{payload_str}"
signature = hmac.new(
self.secret,
signed_payload.encode(),
hashlib.sha256
).hexdigest()
return {
'signature': f"sha256={signature}",
'timestamp': timestamp
}
def create_headers(self, payload):
"""Generate all webhook headers"""
sign_data = self.sign(payload)
return {
'Content-Type': 'application/json',
'X-Webhook-Signature': sign_data['signature'],
'X-Webhook-Timestamp': str(sign_data['timestamp']),
'X-Webhook-ID': payload['id']
}
Delivery System
import asyncio
from datetime import datetime, timedelta
class WebhookDeliverySystem:
"""Reliable webhook delivery with retries"""
RETRY_SCHEDULE = [
timedelta(seconds=10),
timedelta(minutes=1),
timedelta(minutes=5),
timedelta(minutes=30),
timedelta(hours=1),
timedelta(hours=6),
timedelta(hours=24)
]
def __init__(self, signer, http_client):
self.signer = signer
self.http = http_client
self.delivery_log = []
async def deliver(self, endpoint, event, attempt=0):
"""Attempt webhook delivery"""
payload = event.to_payload()
headers = self.signer.create_headers(payload)
try:
response = await self.http.post(
endpoint.url,
json=payload,
headers=headers,
timeout=30
)
self._log_attempt(endpoint, event, attempt, response)
if response.status_code in (200, 201, 202, 204):
return {'status': 'delivered', 'attempts': attempt + 1}
# Non-success status - schedule retry
return await self._schedule_retry(endpoint, event, attempt)
except Exception as e:
self._log_attempt(endpoint, event, attempt, error=e)
return await self._schedule_retry(endpoint, event, attempt)
async def _schedule_retry(self, endpoint, event, attempt):
"""Schedule next retry or give up"""
if attempt >= len(self.RETRY_SCHEDULE):
self._move_to_dead_letter(endpoint, event)
return {'status': 'failed', 'attempts': attempt + 1}
delay = self.RETRY_SCHEDULE[attempt]
# In production: use job queue with delayed execution
await asyncio.sleep(delay.total_seconds())
return await self.deliver(endpoint, event, attempt + 1)
def _move_to_dead_letter(self, endpoint, event):
"""Store failed webhook for manual review"""
# Store in dead letter queue/table
pass
Endpoint Management
class WebhookEndpoint:
"""Subscriber endpoint configuration"""
def __init__(self, url, events, secret=None): # allow-secret
self.id = generate_id()
self.url = url
self.events = events # List of subscribed event types
self.secret = secret or generate_secret() # allow-secret
self.status = 'active'
self.created_at = datetime.utcnow()
# Health tracking
self.consecutive_failures = 0
self.last_success = None
self.last_failure = None
def should_receive(self, event_type):
"""Check if endpoint subscribes to this event"""
if '*' in self.events:
return True
return event_type in self.events
def record_success(self):
self.consecutive_failures = 0
self.last_success = datetime.utcnow()
if self.status == 'disabled':
self.status = 'active'
def record_failure(self):
self.consecutive_failures += 1
self.last_failure = datetime.utcnow()
# Auto-disable after too many failures
if self.consecutive_failures >= 10:
self.status = 'disabled'
Webhook Consumer Design
Signature Verification
class WebhookVerifier:
"""Verify incoming webhook signatures"""
TIMESTAMP_TOLERANCE = 300 # 5 minutes
def __init__(self, secret): # allow-secret
self.secret = secret.encode() # allow-secret
def verify(self, payload, signature, timestamp):
"""Verify webhook authenticity"""
# Check timestamp freshness
current_time = int(time.time())
if abs(current_time - int(timestamp)) > self.TIMESTAMP_TOLERANCE:
raise WebhookVerificationError("Timestamp too old")
# Compute expected signature
signed_payload = f"{timestamp}.{payload}"
expected = hmac.new(
self.secret,
signed_payload.encode(),
hashlib.sha256
).hexdigest()
expected_sig = f"sha256={expected}"
# Constant-time comparison to prevent timing attacks
if not hmac.compare_digest(signature, expected_sig):
raise WebhookVerificationError("Invalid signature")
return True
# Flask endpoint example
@app.route('/webhooks/provider', methods=['POST'])
def handle_webhook():
verifier = WebhookVerifier(WEBHOOK_SECRET)
try:
verifier.verify(
request.data.decode(),
request.headers.get('X-Webhook-Signature'),
request.headers.get('X-Webhook-Timestamp')
)
except WebhookVerificationError:
return 'Invalid signature', 401
event = request.json
process_webhook(event)
return 'OK', 200
Idempotent Processing
class IdempotentWebhookProcessor:
"""Process webhooks exactly once"""
def __init__(self, storage):
self.storage = storage # Redis, database, etc.
self.lock_ttl = 300 # 5 minute lock
async def process(self, event):
"""Process webhook idempotently"""
event_id = event['id']
# Check if already processed
if await self.storage.exists(f"webhook:processed:{event_id}"):
return {'status': 'duplicate', 'event_id': event_id}
# Acquire lock to prevent concurrent processing
lock_key = f"webhook:lock:{event_id}"
if not await self.storage.set_nx(lock_key, "1", ex=self.lock_ttl):
return {'status': 'processing', 'event_id': event_id}
try:
# Process the event
result = await self._handle_event(event)
# Mark as processed (with long TTL for deduplication)
await self.storage.set(
f"webhook:processed:{event_id}",
json.dumps(result),
ex=86400 * 7 # Keep for 7 days
)
return {'status': 'processed', 'event_id': event_id}
finally:
await self.storage.delete(lock_key)
async def _handle_event(self, event):
"""Route event to appropriate handler"""
handlers = {
'payment.completed': self._handle_payment_completed,
'subscription.cancelled': self._handle_subscription_cancelled,
# ... more handlers
}
handler = handlers.get(event['type'])
if handler:
return await handler(event['data'])
return {'skipped': True, 'reason': 'unknown_event_type'}
Queue-Based Processing
class QueuedWebhookHandler:
"""Decouple receipt from processing"""
def __init__(self, queue):
self.queue = queue # Redis, SQS, RabbitMQ, etc.
async def receive(self, event):
"""Acknowledge receipt quickly, process async"""
# Validate immediately
self._validate_event_structure(event)
# Queue for processing
await self.queue.enqueue(
'webhook_processing',
event,
deduplication_id=event['id']
)
# Return 200 immediately (don't make sender wait)
return {'status': 'accepted'}
async def process_queue(self):
"""Worker that processes queued webhooks"""
while True:
event = await self.queue.dequeue('webhook_processing')
if event:
try:
await self._process_event(event)
await self.queue.ack(event)
except Exception as e:
await self.queue.nack(event, requeue=True)
logging.error(f"Webhook processing failed: {e}")
Security Patterns
Secret Rotation
class SecretRotation:
"""Support rolling secret updates"""
def __init__(self):
self.current_secret = None
self.previous_secret = None
self.rotation_timestamp = None
def rotate(self, new_secret):
"""Rotate to new secret while supporting old"""
self.previous_secret = self.current_secret
self.current_secret = new_secret
self.rotation_timestamp = datetime.utcnow()
def get_verification_secrets(self):
"""Return secrets to try for verification"""
secrets = [self.current_secret]
# Accept previous secret for grace period
if self.previous_secret and self.rotation_timestamp:
grace_period = timedelta(hours=24)
if datetime.utcnow() - self.rotation_timestamp < grace_period:
secrets.append(self.previous_secret)
return secrets
IP Allowlisting
ALLOWED_IPS = {
'stripe': ['3.18.12.63', '3.130.192.231', ...],
'github': ['192.30.252.0/22', '185.199.108.0/22', ...],
'twilio': ['54.172.60.0/23', '54.244.51.0/24', ...]
}
def verify_source_ip(request, provider):
"""Verify webhook comes from expected IP"""
client_ip = request.remote_addr
allowed = ALLOWED_IPS.get(provider, [])
for allowed_range in allowed:
if ip_in_range(client_ip, allowed_range):
return True
return False
Error Handling
Response Codes
| Code | Meaning | Provider Action |
|---|---|---|
| 200-204 | Success | Mark delivered |
| 400 | Bad request | Don't retry (your bug) |
| 401/403 | Auth failed | Disable endpoint |
| 404 | Not found | Disable endpoint |
| 429 | Rate limited | Retry with backoff |
| 500+ | Server error | Retry |
| Timeout | No response | Retry |
Dead Letter Queue
class DeadLetterQueue:
"""Store and manage failed webhooks"""
async def add(self, endpoint, event, failure_reason, attempts):
"""Add failed webhook to DLQ"""
await self.storage.add({
'id': generate_id(),
'endpoint_id': endpoint.id,
'endpoint_url': endpoint.url,
'event': event.to_payload(),
'failure_reason': str(failure_reason),
'attempts': attempts,
'added_at': datetime.utcnow().isoformat()
})
async def retry(self, dlq_item_id):
"""Manually retry a DLQ item"""
item = await self.storage.get(dlq_item_id)
endpoint = await self.get_endpoint(item['endpoint_id'])
event = WebhookEvent.from_payload(item['event'])
result = await self.delivery_system.deliver(endpoint, event)
if result['status'] == 'delivered':
await self.storage.remove(dlq_item_id)
return result
async def purge_old(self, days=30):
"""Clean up old DLQ items"""
cutoff = datetime.utcnow() - timedelta(days=days)
await self.storage.delete_before(cutoff)
Best Practices
For Providers
- Include event ID for consumer deduplication
- Sign payloads with HMAC-SHA256 minimum
- Include timestamps to prevent replay
- Retry with exponential backoff (5 attempts minimum)
- Provide webhook logs in your dashboard
- Support event filtering by type
- Version your payloads for backward compatibility
For Consumers
- Verify signatures before processing
- Respond quickly (< 5 seconds)
- Process asynchronously via queue
- Implement idempotency using event ID
- Log everything for debugging
- Monitor for failures proactively
References
references/webhook-security.md- Detailed security implementationreferences/provider-examples.md- Stripe, GitHub, Twilio patternsreferences/testing-webhooks.md- Local development and testing
More from 4444j99/a-i--skills
creative-writing-craft
Craft compelling fiction and creative nonfiction with attention to structure, voice, prose style, and revision. Supports short stories, novel chapters, essays, and hybrid forms. Triggers on creative writing, fiction writing, story craft, prose style, or literary technique requests.
183skill-creator
Guide for creating effective skills. This skill should be used when users want to create a new skill (or update an existing skill) that extends Claude's capabilities with specialized knowledge, workflows, or tool integrations.
15freelance-client-ops
Manage freelance and client work professionally—proposals, contracts, scope management, invoicing, and client communication. Covers the business side of creative work. Triggers on freelance, client work, proposals, contracts, pricing, or project scope requests.
14generative-music-composer
Creates algorithmic music composition systems using procedural generation, Markov chains, L-systems, and neural approaches for ambient, adaptive, and experimental music.
12generative-art-algorithms
Create algorithmic and generative art using mathematical patterns, noise functions, particle systems, and procedural generation. Covers flow fields, L-systems, fractals, and creative coding foundations. Triggers on generative art, algorithmic art, creative coding, procedural generation, or mathematical visualization requests.
10interfaith-sacred-geometry
Generate sacred geometry patterns with interfaith symbolism for spiritual visualizations and art. Use when creating visual representations that honor multiple religious traditions, designing meditation aids, building soul journey visualizations, or producing art that bridges sacred traditions through geometric harmony. Triggers on sacred geometry requests, interfaith symbol design, spiritual visualization projects, or multi-tradition sacred art.
8