microservices-patterns
Originally fromwshobson/agents
SKILL.md
Microservices Patterns
Design microservices architectures with service boundaries, event-driven communication, and resilience patterns.
Service Decomposition Strategies
By Business Capability
- Organize services around business functions
- Each service owns its domain
- Example: OrderService, PaymentService, InventoryService
By Subdomain (DDD)
- Core domain, supporting subdomains
- Bounded contexts map to services
- Clear ownership and responsibility
Strangler Fig Pattern
- Gradually extract from monolith
- New functionality as microservices
- Proxy routes to old/new systems
Communication Patterns
Synchronous (Request/Response)
from tenacity import retry, stop_after_attempt, wait_exponential
class ServiceClient:
def __init__(self, base_url: str):
self.base_url = base_url
self.client = httpx.AsyncClient(timeout=5.0)
@retry(stop=stop_after_attempt(3), wait=wait_exponential(min=2, max=10))
async def get(self, path: str, **kwargs):
response = await self.client.get(f"{self.base_url}{path}", **kwargs)
response.raise_for_status()
return response.json()
Asynchronous (Events/Messages)
from aiokafka import AIOKafkaProducer
class EventBus:
async def publish(self, event: DomainEvent):
await self.producer.send_and_wait(
event.event_type,
value=asdict(event),
key=event.aggregate_id.encode()
)
Resilience Patterns
Circuit Breaker
class CircuitBreaker:
def __init__(self, failure_threshold=5, recovery_timeout=30):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.state = CircuitState.CLOSED
async def call(self, func, *args, **kwargs):
if self.state == CircuitState.OPEN:
if self._should_attempt_reset():
self.state = CircuitState.HALF_OPEN
else:
raise CircuitBreakerOpenError()
try:
result = await func(*args, **kwargs)
self._on_success()
return result
except Exception:
self._on_failure()
raise
Bulkhead Pattern
- Isolate resources per service
- Limit impact of failures
- Prevent cascade failures
API Gateway Pattern
class APIGateway:
@circuit(failure_threshold=5, recovery_timeout=30)
async def call_service(self, service_url: str, path: str, **kwargs):
response = await self.http_client.request("GET", f"{service_url}{path}", **kwargs)
response.raise_for_status()
return response.json()
async def aggregate(self, order_id: str) -> dict:
# Parallel requests to multiple services
order, payment, inventory = await asyncio.gather(
self.call_order_service(f"/orders/{order_id}"),
self.call_payment_service(f"/payments/order/{order_id}"),
self.call_inventory_service(f"/reservations/order/{order_id}"),
return_exceptions=True
)
return {"order": order, "payment": payment, "inventory": inventory}
Data Management
Database Per Service
- Each service owns its data
- No shared databases
- Loose coupling
Saga Pattern for Distributed Transactions
- See saga-orchestration skill for details
Best Practices
- Service Boundaries: Align with business capabilities
- Database Per Service: No shared databases
- API Contracts: Versioned, backward compatible
- Async When Possible: Events over direct calls
- Circuit Breakers: Fail fast on service failures
- Distributed Tracing: Track requests across services
- Health Checks: Liveness and readiness probes
Common Pitfalls
- Distributed Monolith: Tightly coupled services
- Chatty Services: Too many inter-service calls
- No Circuit Breakers: Cascade failures
- Synchronous Everything: Tight coupling
- Ignoring Network Failures: Assuming reliable network
Weekly Installs
30
Repository
eyadsibai/ltkFirst Seen
Jan 28, 2026
Security Audits
Installed on
gemini-cli25
opencode23
github-copilot22
codex22
claude-code20
antigravity19