saga-patterns
SKILL.md
Saga Patterns for Distributed Transactions
Maintain consistency across microservices without distributed locks.
Overview
- Multi-service business transactions (order -> payment -> inventory -> shipping)
- Operations that must eventually succeed or roll back completely
- Long-running business processes (minutes to days)
- Microservices avoiding 2PC (two-phase commit)
When NOT to Use
- Single database operations (use transactions)
- Real-time consistency requirements (use synchronous calls)
- When eventual consistency is unacceptable
Orchestration vs Choreography
| Aspect | Orchestration | Choreography |
|---|---|---|
| Control | Central orchestrator | Distributed events |
| Coupling | Services depend on orchestrator | Loosely coupled |
| Visibility | Single point of observation | Requires distributed tracing |
| Best for | Complex, ordered workflows | Simple, parallel flows |
Orchestration Pattern
from enum import Enum
from dataclasses import dataclass, field
from typing import Callable, Any
from datetime import datetime, timezone
class SagaStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
COMPENSATING = "compensating"
COMPENSATED = "compensated"
FAILED = "failed"
@dataclass
class SagaStep:
name: str
action: Callable
compensation: Callable
status: SagaStatus = SagaStatus.PENDING
result: Any = None
@dataclass
class SagaContext:
saga_id: str
data: dict = field(default_factory=dict)
steps: list[SagaStep] = field(default_factory=list)
status: SagaStatus = SagaStatus.PENDING
current_step: int = 0
class SagaOrchestrator:
def __init__(self, saga_repository, event_publisher):
self.repo = saga_repository
self.publisher = event_publisher
async def execute(self, saga: SagaContext) -> SagaContext:
saga.status = SagaStatus.RUNNING
await self.repo.save(saga)
for i, step in enumerate(saga.steps):
saga.current_step = i
try:
step.result = await step.action(saga.data)
saga.data.update(step.result or {})
step.status = SagaStatus.COMPLETED
except Exception:
step.status = SagaStatus.FAILED
await self._compensate(saga, i)
return saga
saga.status = SagaStatus.COMPLETED
await self.repo.save(saga)
return saga
async def _compensate(self, saga: SagaContext, failed_step: int):
saga.status = SagaStatus.COMPENSATING
for i in range(failed_step - 1, -1, -1):
step = saga.steps[i]
if step.status == SagaStatus.COMPLETED:
try:
await step.compensation(saga.data)
step.status = SagaStatus.COMPENSATED
except Exception as e:
step.error = f"Compensation failed: {e}"
saga.status = SagaStatus.COMPENSATED
await self.repo.save(saga)
Order Saga Example
class OrderSaga:
def __init__(self, payment_service, inventory_service, shipping_service):
self.payment = payment_service
self.inventory = inventory_service
self.shipping = shipping_service
def create_saga(self, order: Order) -> SagaContext:
return SagaContext(
saga_id=f"order-{order.id}",
data={"order": order.dict()},
steps=[
SagaStep("reserve_inventory", self._reserve_inventory, self._release_inventory),
SagaStep("process_payment", self._process_payment, self._refund_payment),
SagaStep("create_shipment", self._create_shipment, self._cancel_shipment),
],
)
async def _reserve_inventory(self, data: dict) -> dict:
reservation = await self.inventory.reserve(items=data["order"]["items"])
return {"reservation_id": reservation.id}
async def _release_inventory(self, data: dict):
await self.inventory.release(data["reservation_id"])
async def _process_payment(self, data: dict) -> dict:
payment = await self.payment.charge(amount=data["order"]["total"])
return {"payment_id": payment.id}
async def _refund_payment(self, data: dict):
await self.payment.refund(data["payment_id"])
async def _create_shipment(self, data: dict) -> dict:
shipment = await self.shipping.create(order_id=data["order"]["id"])
return {"shipment_id": shipment.id}
async def _cancel_shipment(self, data: dict):
if "shipment_id" in data:
await self.shipping.cancel(data["shipment_id"])
Choreography Pattern
class OrderChoreography:
"""Event handlers for order saga choreography."""
def __init__(self, event_bus, order_repo):
self.bus = event_bus
self.repo = order_repo
async def handle_order_created(self, event):
await self.bus.publish("inventory.reserve.requested", {
"saga_id": event.saga_id,
"items": event.payload["order"]["items"],
})
async def handle_inventory_reserved(self, event):
await self.bus.publish("payment.charge.requested", {
"saga_id": event.saga_id,
"amount": event.payload["amount"],
})
async def handle_payment_failed(self, event):
# Compensation: release inventory
await self.bus.publish("inventory.release.requested", {
"saga_id": event.saga_id,
"reservation_id": event.payload["reservation_id"],
})
async def handle_shipment_created(self, event):
order = await self.repo.get(event.payload["order_id"])
order.status = "shipped"
await self.repo.save(order)
Timeout and Recovery
from datetime import timedelta
import asyncio
class SagaRecovery:
def __init__(self, saga_repo, orchestrator):
self.repo = saga_repo
self.orchestrator = orchestrator
async def recover_stuck_sagas(self, timeout: timedelta = timedelta(hours=1)):
cutoff = datetime.now(timezone.utc) - timeout
stuck_sagas = await self.repo.find_by_status_and_age(SagaStatus.RUNNING, cutoff)
for saga in stuck_sagas:
try:
await self.orchestrator.resume(saga)
except Exception:
await self.orchestrator._compensate(saga, saga.current_step)
async def retry_failed_step(self, saga_id: str, max_retries: int = 3):
saga = await self.repo.get(saga_id)
failed_step = saga.steps[saga.current_step]
for attempt in range(max_retries):
try:
failed_step.result = await failed_step.action(saga.data)
failed_step.status = SagaStatus.COMPLETED
await self.orchestrator.resume(saga, from_step=saga.current_step + 1)
return
except Exception:
await asyncio.sleep(2 ** attempt)
await self.orchestrator._compensate(saga, saga.current_step)
Idempotency
class IdempotentSagaStep:
def __init__(self, step_name: str, idempotency_store):
self.step_name = step_name
self.store = idempotency_store
async def execute(self, saga_id: str, action: Callable, *args, **kwargs):
idempotency_key = f"{saga_id}:{self.step_name}"
existing = await self.store.get(idempotency_key)
if existing:
return existing["result"]
result = await action(*args, **kwargs)
await self.store.set(idempotency_key, {"result": result}, ttl=timedelta(days=7))
return result
Key Decisions
| Decision | Recommendation |
|---|---|
| Pattern choice | Orchestration for complex flows, Choreography for simple |
| State storage | Persistent store (PostgreSQL) for saga state |
| Idempotency | Required for all saga steps |
| Timeouts | Per-step timeouts with recovery |
| Compensation | Always implement, test thoroughly |
| Observability | Trace saga ID across all services |
Anti-Patterns (FORBIDDEN)
# NEVER skip compensation logic
async def _process_payment(self, data: dict):
return await self.payment.charge(data) # Missing compensation!
# NEVER rely on synchronous calls across services
async def _reserve_and_pay(self, data: dict):
await self.inventory.reserve(data)
await self.payment.charge(data) # If fails, inventory stuck!
# NEVER ignore idempotency
async def _create_order(self, data: dict):
return await self.db.insert(Order(**data)) # Duplicate on retry!
# NEVER use in-memory saga state
sagas = {} # Lost on restart!
# ALWAYS persist saga state and test compensation paths
Related Skills
temporal-io- Durable workflow executionevent-sourcing- Event-driven state managementidempotency-patterns- Idempotent operations