skills/4444j99/a-i--skills/webhook-integration-patterns

webhook-integration-patterns

SKILL.md

Webhook Integration Patterns

This skill provides guidance for designing and implementing robust webhook systems—both as providers (sending webhooks) and consumers (receiving webhooks).

Core Competencies

  • Delivery Guarantees: At-least-once, exactly-once semantics
  • Security: Signature verification, secret rotation
  • Reliability: Retry strategies, dead letter handling
  • Scalability: Queue-based processing, rate limiting

Webhook Fundamentals

What Webhooks Solve

Polling (inefficient):           Webhooks (efficient):
┌────────┐     ┌────────┐       ┌────────┐     ┌────────┐
│ Client │     │ Server │       │ Client │     │ Server │
└───┬────┘     └───┬────┘       └───┬────┘     └───┬────┘
    │ Any news?    │                │              │
    │─────────────▶│                │              │
    │    No        │                │   Event!     │
    │◀─────────────│                │◀─────────────│
    │ Any news?    │                │  POST /hook  │
    │─────────────▶│                │◀─────────────│
    │    No        │                │   200 OK     │
    │◀─────────────│                │─────────────▶│
    │ Any news?    │
    │─────────────▶│       Push-based notification
    │    YES!      │       instead of polling
    │◀─────────────│

Webhook Anatomy

POST /webhooks/payment HTTP/1.1
Host: your-app.com
Content-Type: application/json
X-Webhook-Signature: sha256=abc123...
X-Webhook-ID: evt_12345
X-Webhook-Timestamp: 1706616000

{
  "id": "evt_12345",
  "type": "payment.completed",
  "created_at": "2024-01-30T12:00:00Z",
  "data": {
    "payment_id": "pay_abc",
    "amount": 1000,
    "currency": "USD"
  }
}

Key headers:

  • Signature: Cryptographic proof of authenticity
  • ID: Unique event identifier for deduplication
  • Timestamp: When the event occurred

Webhook Provider Design

Event Schema Design

class WebhookEvent:
    """Standard webhook event structure"""

    def __init__(self, event_type, data, idempotency_key=None):
        self.id = self._generate_id()
        self.type = event_type
        self.created_at = datetime.utcnow().isoformat()
        self.api_version = "2024-01-30"
        self.data = data
        self.idempotency_key = idempotency_key or self.id

    def to_payload(self):
        return {
            "id": self.id,
            "type": self.type,
            "created_at": self.created_at,
            "api_version": self.api_version,
            "data": self.data
        }

# Event types follow resource.action pattern
EVENT_TYPES = [
    "payment.created",
    "payment.completed",
    "payment.failed",
    "subscription.created",
    "subscription.updated",
    "subscription.cancelled",
    "customer.created",
    "customer.deleted"
]

Signature Generation

import hmac
import hashlib
import time

class WebhookSigner:
    """Sign webhook payloads for verification"""

    def __init__(self, secret):  # allow-secret
        self.secret = secret.encode()  # allow-secret

    def sign(self, payload, timestamp=None):
        """Generate HMAC signature"""
        timestamp = timestamp or int(time.time())
        payload_str = json.dumps(payload, separators=(',', ':'))

        # Include timestamp to prevent replay attacks
        signed_payload = f"{timestamp}.{payload_str}"

        signature = hmac.new(
            self.secret,
            signed_payload.encode(),
            hashlib.sha256
        ).hexdigest()

        return {
            'signature': f"sha256={signature}",
            'timestamp': timestamp
        }

    def create_headers(self, payload):
        """Generate all webhook headers"""
        sign_data = self.sign(payload)
        return {
            'Content-Type': 'application/json',
            'X-Webhook-Signature': sign_data['signature'],
            'X-Webhook-Timestamp': str(sign_data['timestamp']),
            'X-Webhook-ID': payload['id']
        }

Delivery System

import asyncio
from datetime import datetime, timedelta

class WebhookDeliverySystem:
    """Reliable webhook delivery with retries"""

    RETRY_SCHEDULE = [
        timedelta(seconds=10),
        timedelta(minutes=1),
        timedelta(minutes=5),
        timedelta(minutes=30),
        timedelta(hours=1),
        timedelta(hours=6),
        timedelta(hours=24)
    ]

    def __init__(self, signer, http_client):
        self.signer = signer
        self.http = http_client
        self.delivery_log = []

    async def deliver(self, endpoint, event, attempt=0):
        """Attempt webhook delivery"""
        payload = event.to_payload()
        headers = self.signer.create_headers(payload)

        try:
            response = await self.http.post(
                endpoint.url,
                json=payload,
                headers=headers,
                timeout=30
            )

            self._log_attempt(endpoint, event, attempt, response)

            if response.status_code in (200, 201, 202, 204):
                return {'status': 'delivered', 'attempts': attempt + 1}

            # Non-success status - schedule retry
            return await self._schedule_retry(endpoint, event, attempt)

        except Exception as e:
            self._log_attempt(endpoint, event, attempt, error=e)
            return await self._schedule_retry(endpoint, event, attempt)

    async def _schedule_retry(self, endpoint, event, attempt):
        """Schedule next retry or give up"""
        if attempt >= len(self.RETRY_SCHEDULE):
            self._move_to_dead_letter(endpoint, event)
            return {'status': 'failed', 'attempts': attempt + 1}

        delay = self.RETRY_SCHEDULE[attempt]
        # In production: use job queue with delayed execution
        await asyncio.sleep(delay.total_seconds())
        return await self.deliver(endpoint, event, attempt + 1)

    def _move_to_dead_letter(self, endpoint, event):
        """Store failed webhook for manual review"""
        # Store in dead letter queue/table
        pass

Endpoint Management

class WebhookEndpoint:
    """Subscriber endpoint configuration"""

    def __init__(self, url, events, secret=None):  # allow-secret
        self.id = generate_id()
        self.url = url
        self.events = events  # List of subscribed event types
        self.secret = secret or generate_secret()  # allow-secret
        self.status = 'active'
        self.created_at = datetime.utcnow()

        # Health tracking
        self.consecutive_failures = 0
        self.last_success = None
        self.last_failure = None

    def should_receive(self, event_type):
        """Check if endpoint subscribes to this event"""
        if '*' in self.events:
            return True
        return event_type in self.events

    def record_success(self):
        self.consecutive_failures = 0
        self.last_success = datetime.utcnow()
        if self.status == 'disabled':
            self.status = 'active'

    def record_failure(self):
        self.consecutive_failures += 1
        self.last_failure = datetime.utcnow()

        # Auto-disable after too many failures
        if self.consecutive_failures >= 10:
            self.status = 'disabled'

Webhook Consumer Design

Signature Verification

class WebhookVerifier:
    """Verify incoming webhook signatures"""

    TIMESTAMP_TOLERANCE = 300  # 5 minutes

    def __init__(self, secret):  # allow-secret
        self.secret = secret.encode()  # allow-secret

    def verify(self, payload, signature, timestamp):
        """Verify webhook authenticity"""
        # Check timestamp freshness
        current_time = int(time.time())
        if abs(current_time - int(timestamp)) > self.TIMESTAMP_TOLERANCE:
            raise WebhookVerificationError("Timestamp too old")

        # Compute expected signature
        signed_payload = f"{timestamp}.{payload}"
        expected = hmac.new(
            self.secret,
            signed_payload.encode(),
            hashlib.sha256
        ).hexdigest()

        expected_sig = f"sha256={expected}"

        # Constant-time comparison to prevent timing attacks
        if not hmac.compare_digest(signature, expected_sig):
            raise WebhookVerificationError("Invalid signature")

        return True


# Flask endpoint example
@app.route('/webhooks/provider', methods=['POST'])
def handle_webhook():
    verifier = WebhookVerifier(WEBHOOK_SECRET)

    try:
        verifier.verify(
            request.data.decode(),
            request.headers.get('X-Webhook-Signature'),
            request.headers.get('X-Webhook-Timestamp')
        )
    except WebhookVerificationError:
        return 'Invalid signature', 401

    event = request.json
    process_webhook(event)

    return 'OK', 200

Idempotent Processing

class IdempotentWebhookProcessor:
    """Process webhooks exactly once"""

    def __init__(self, storage):
        self.storage = storage  # Redis, database, etc.
        self.lock_ttl = 300  # 5 minute lock

    async def process(self, event):
        """Process webhook idempotently"""
        event_id = event['id']

        # Check if already processed
        if await self.storage.exists(f"webhook:processed:{event_id}"):
            return {'status': 'duplicate', 'event_id': event_id}

        # Acquire lock to prevent concurrent processing
        lock_key = f"webhook:lock:{event_id}"
        if not await self.storage.set_nx(lock_key, "1", ex=self.lock_ttl):
            return {'status': 'processing', 'event_id': event_id}

        try:
            # Process the event
            result = await self._handle_event(event)

            # Mark as processed (with long TTL for deduplication)
            await self.storage.set(
                f"webhook:processed:{event_id}",
                json.dumps(result),
                ex=86400 * 7  # Keep for 7 days
            )

            return {'status': 'processed', 'event_id': event_id}

        finally:
            await self.storage.delete(lock_key)

    async def _handle_event(self, event):
        """Route event to appropriate handler"""
        handlers = {
            'payment.completed': self._handle_payment_completed,
            'subscription.cancelled': self._handle_subscription_cancelled,
            # ... more handlers
        }
        handler = handlers.get(event['type'])
        if handler:
            return await handler(event['data'])
        return {'skipped': True, 'reason': 'unknown_event_type'}

Queue-Based Processing

class QueuedWebhookHandler:
    """Decouple receipt from processing"""

    def __init__(self, queue):
        self.queue = queue  # Redis, SQS, RabbitMQ, etc.

    async def receive(self, event):
        """Acknowledge receipt quickly, process async"""
        # Validate immediately
        self._validate_event_structure(event)

        # Queue for processing
        await self.queue.enqueue(
            'webhook_processing',
            event,
            deduplication_id=event['id']
        )

        # Return 200 immediately (don't make sender wait)
        return {'status': 'accepted'}

    async def process_queue(self):
        """Worker that processes queued webhooks"""
        while True:
            event = await self.queue.dequeue('webhook_processing')
            if event:
                try:
                    await self._process_event(event)
                    await self.queue.ack(event)
                except Exception as e:
                    await self.queue.nack(event, requeue=True)
                    logging.error(f"Webhook processing failed: {e}")

Security Patterns

Secret Rotation

class SecretRotation:
    """Support rolling secret updates"""

    def __init__(self):
        self.current_secret = None
        self.previous_secret = None
        self.rotation_timestamp = None

    def rotate(self, new_secret):
        """Rotate to new secret while supporting old"""
        self.previous_secret = self.current_secret
        self.current_secret = new_secret
        self.rotation_timestamp = datetime.utcnow()

    def get_verification_secrets(self):
        """Return secrets to try for verification"""
        secrets = [self.current_secret]

        # Accept previous secret for grace period
        if self.previous_secret and self.rotation_timestamp:
            grace_period = timedelta(hours=24)
            if datetime.utcnow() - self.rotation_timestamp < grace_period:
                secrets.append(self.previous_secret)

        return secrets

IP Allowlisting

ALLOWED_IPS = {
    'stripe': ['3.18.12.63', '3.130.192.231', ...],
    'github': ['192.30.252.0/22', '185.199.108.0/22', ...],
    'twilio': ['54.172.60.0/23', '54.244.51.0/24', ...]
}

def verify_source_ip(request, provider):
    """Verify webhook comes from expected IP"""
    client_ip = request.remote_addr

    allowed = ALLOWED_IPS.get(provider, [])
    for allowed_range in allowed:
        if ip_in_range(client_ip, allowed_range):
            return True

    return False

Error Handling

Response Codes

Code Meaning Provider Action
200-204 Success Mark delivered
400 Bad request Don't retry (your bug)
401/403 Auth failed Disable endpoint
404 Not found Disable endpoint
429 Rate limited Retry with backoff
500+ Server error Retry
Timeout No response Retry

Dead Letter Queue

class DeadLetterQueue:
    """Store and manage failed webhooks"""

    async def add(self, endpoint, event, failure_reason, attempts):
        """Add failed webhook to DLQ"""
        await self.storage.add({
            'id': generate_id(),
            'endpoint_id': endpoint.id,
            'endpoint_url': endpoint.url,
            'event': event.to_payload(),
            'failure_reason': str(failure_reason),
            'attempts': attempts,
            'added_at': datetime.utcnow().isoformat()
        })

    async def retry(self, dlq_item_id):
        """Manually retry a DLQ item"""
        item = await self.storage.get(dlq_item_id)
        endpoint = await self.get_endpoint(item['endpoint_id'])
        event = WebhookEvent.from_payload(item['event'])

        result = await self.delivery_system.deliver(endpoint, event)

        if result['status'] == 'delivered':
            await self.storage.remove(dlq_item_id)

        return result

    async def purge_old(self, days=30):
        """Clean up old DLQ items"""
        cutoff = datetime.utcnow() - timedelta(days=days)
        await self.storage.delete_before(cutoff)

Best Practices

For Providers

  1. Include event ID for consumer deduplication
  2. Sign payloads with HMAC-SHA256 minimum
  3. Include timestamps to prevent replay
  4. Retry with exponential backoff (5 attempts minimum)
  5. Provide webhook logs in your dashboard
  6. Support event filtering by type
  7. Version your payloads for backward compatibility

For Consumers

  1. Verify signatures before processing
  2. Respond quickly (< 5 seconds)
  3. Process asynchronously via queue
  4. Implement idempotency using event ID
  5. Log everything for debugging
  6. Monitor for failures proactively

References

  • references/webhook-security.md - Detailed security implementation
  • references/provider-examples.md - Stripe, GitHub, Twilio patterns
  • references/testing-webhooks.md - Local development and testing
Weekly Installs
2
GitHub Stars
2
First Seen
4 days ago
Installed on
amp2
cline2
openclaw2
opencode2
cursor2
kimi-cli2