idempotency-patterns
SKILL.md
Idempotency Patterns (2026)
Patterns for ensuring operations can be safely retried without unintended side effects.
Overview
- Building payment or financial APIs
- Implementing webhook handlers
- Processing messages from queues
- Creating mutation endpoints (POST, PUT, DELETE)
- Building distributed systems with at-least-once delivery
Quick Reference
Idempotency Key Generation
import hashlib
import json
from typing import Any
def generate_idempotency_key(
*,
entity_id: str,
action: str,
params: dict[str, Any] | None = None,
) -> str:
"""
Generate deterministic idempotency key.
Args:
entity_id: Unique identifier of the entity
action: The action being performed
params: Optional parameters that affect the result
Returns:
32-character hex string
"""
content = f"{entity_id}:{action}"
if params:
# Sort keys for deterministic output
content += f":{json.dumps(params, sort_keys=True)}"
return hashlib.sha256(content.encode()).hexdigest()[:32]
# Examples
key1 = generate_idempotency_key(
entity_id="order-123",
action="create",
params={"amount": 100, "currency": "USD"},
)
key2 = generate_idempotency_key(
entity_id="payment-456",
action="refund",
)
FastAPI Idempotency Middleware
from fastapi import Request, Response, HTTPException
from starlette.middleware.base import BaseHTTPMiddleware
import redis.asyncio as redis
import json
class IdempotencyMiddleware(BaseHTTPMiddleware):
"""Handle Idempotency-Key header for POST/PUT/PATCH."""
def __init__(self, app, redis_client: redis.Redis, ttl: int = 86400):
super().__init__(app)
self.redis = redis_client
self.ttl = ttl
async def dispatch(self, request: Request, call_next):
# Only apply to mutation methods
if request.method not in ("POST", "PUT", "PATCH"):
return await call_next(request)
# Check for idempotency key
idempotency_key = request.headers.get("Idempotency-Key")
if not idempotency_key:
return await call_next(request)
cache_key = f"idem:{request.url.path}:{idempotency_key}"
# Check for cached response
cached = await self.redis.get(cache_key)
if cached:
data = json.loads(cached)
return Response(
content=data["body"],
status_code=data["status"],
media_type="application/json",
headers={"X-Idempotent-Replayed": "true"},
)
# Process request
response = await call_next(request)
# Cache successful responses
if 200 <= response.status_code < 300:
body = b"".join([chunk async for chunk in response.body_iterator])
await self.redis.setex(
cache_key,
self.ttl,
json.dumps({
"body": body.decode(),
"status": response.status_code,
}),
)
return Response(
content=body,
status_code=response.status_code,
media_type=response.media_type,
)
return response
Database-Backed Idempotency
from sqlalchemy import Column, String, DateTime, Text
from sqlalchemy.dialects.postgresql import JSONB, insert
from datetime import UTC, datetime, timedelta
class ProcessedRequest(Base):
"""Track processed requests for idempotency."""
__tablename__ = "processed_requests"
idempotency_key = Column(String(64), primary_key=True)
endpoint = Column(String(255), nullable=False)
status_code = Column(Integer, nullable=False)
response_body = Column(Text)
created_at = Column(DateTime, default=lambda: datetime.now(UTC))
expires_at = Column(DateTime)
async def idempotent_execute(
db: AsyncSession,
idempotency_key: str,
endpoint: str,
operation,
ttl_hours: int = 24,
) -> tuple[Any, int, bool]:
"""
Execute operation idempotently.
Returns: (response, status_code, was_replayed)
"""
# Check for existing
existing = await db.get(ProcessedRequest, idempotency_key)
if existing and existing.expires_at > datetime.now(UTC):
return json.loads(existing.response_body), existing.status_code, True
# Execute operation
result, status_code = await operation()
# Store result (upsert to handle races)
stmt = insert(ProcessedRequest).values(
idempotency_key=idempotency_key,
endpoint=endpoint,
status_code=status_code,
response_body=json.dumps(result),
expires_at=datetime.now(UTC) + timedelta(hours=ttl_hours),
).on_conflict_do_nothing()
await db.execute(stmt)
return result, status_code, False
Event Consumer Idempotency
class IdempotentConsumer:
"""Process events exactly once using idempotency keys."""
def __init__(self, db: AsyncSession, redis: redis.Redis):
self.db = db
self.redis = redis
async def process(
self,
event: dict,
handler,
) -> tuple[Any, bool]:
"""
Process event idempotently.
Returns: (result, was_duplicate)
"""
idempotency_key = event.get("idempotency_key")
if not idempotency_key:
# No key = always process (risky)
return await handler(event), False
# Fast path: check Redis cache
cache_key = f"processed:{idempotency_key}"
if await self.redis.exists(cache_key):
return None, True
# Slow path: check database
existing = await self.db.execute(
select(ProcessedEvent)
.where(ProcessedEvent.idempotency_key == idempotency_key)
)
if existing.scalar_one_or_none():
# Backfill cache
await self.redis.setex(cache_key, 86400, "1")
return None, True
# Process with database lock to prevent races
try:
async with self.db.begin_nested():
# Insert first to claim the key
self.db.add(ProcessedEvent(idempotency_key=idempotency_key))
await self.db.flush()
# Then process
result = await handler(event)
# Cache for fast future lookups
await self.redis.setex(cache_key, 86400, "1")
return result, False
except IntegrityError:
# Another process claimed it
return None, True
Key Decisions
| Aspect | Recommendation | Rationale |
|---|---|---|
| Key generation | Deterministic hash | Same input = same key always |
| Storage | Redis + DB | Redis for speed, DB for durability |
| TTL | 24-72 hours | Balance storage vs replay window |
| Lock strategy | DB unique constraint | Handles race conditions |
| Response caching | Status 2xx only | Don't cache errors |
Anti-Patterns (FORBIDDEN)
# NEVER use non-deterministic keys
def bad_key():
return str(uuid.uuid4()) # Different every time!
# NEVER include timestamps in keys
def bad_key(event):
return f"{event.id}:{datetime.now(UTC)}" # Timestamp varies!
# NEVER check-then-act without locking
async def bad_process(key):
if not await exists(key): # Race condition!
await process()
await mark_processed(key)
# NEVER skip idempotency for financial operations
@router.post("/payments")
async def create_payment(data: PaymentCreate):
return await process_payment(data) # No idempotency!
# NEVER cache error responses
if response.status_code >= 400:
await cache_response(key, response) # WRONG - errors should retry
Related Skills
outbox-pattern- Reliable event publishingmessage-queues- At-least-once message deliverycaching-strategies- Redis caching patternsauth-patterns- API key management
Capability Details
key-generation
Keywords: idempotency key, hash, deterministic, deduplication key Solves:
- How do I generate idempotency keys?
- Deterministic key generation
- Key format best practices
api-idempotency
Keywords: idempotency header, POST idempotent, retry safe, middleware Solves:
- How do I make POST endpoints idempotent?
- Implement Idempotency-Key header
- Cache and replay responses
consumer-idempotency
Keywords: exactly-once, event deduplication, message idempotency Solves:
- How do I process events exactly once?
- Deduplicate queue messages
- Handle at-least-once delivery
Weekly Installs
2
Repository
melodic-software/claude-code-pluginsInstalled on
windsurf1
trae1
opencode1
codex1
claude-code1
antigravity1