event-sourcing

SKILL.md

Event Sourcing Patterns

Store application state as immutable events rather than current state snapshots.

Overview

  • Full audit trail requirements (compliance, finance)
  • Temporal queries ("what was state at time X?")
  • CQRS implementations with separate read/write models
  • Systems requiring event replay and debugging
  • Microservices with eventual consistency

Quick Reference

Domain Event Base

from pydantic import BaseModel, Field
from datetime import datetime, timezone
from uuid import UUID, uuid4

class DomainEvent(BaseModel):
    event_id: UUID = Field(default_factory=uuid4)
    aggregate_id: UUID
    event_type: str
    version: int
    timestamp: datetime = Field(default_factory=lambda: datetime.now(timezone.utc))
    class Config:
        frozen = True  # Events are immutable

Event-Sourced Aggregate

class Account:
    def __init__(self):
        self._changes, self._version, self.balance = [], 0, 0.0

    def deposit(self, amount: float):
        self._raise_event(MoneyDeposited(aggregate_id=self.id, amount=amount, version=self._version + 1))

    def _apply(self, event):
        match event:
            case MoneyDeposited(): self.balance += event.amount
            case MoneyWithdrawn(): self.balance -= event.amount

    def load_from_history(self, events):
        for e in events: self._apply(e); self._version = e.version

Event Store Append

async def append_events(self, aggregate_id: UUID, events: list, expected_version: int):
    current = await self.get_version(aggregate_id)
    if current != expected_version:
        raise ConcurrencyError(f"Expected {expected_version}, got {current}")
    for event in events:
        await self.session.execute(insert(event_store).values(
            event_id=event.event_id, aggregate_id=aggregate_id,
            event_type=event.event_type, version=event.version, data=event.model_dump()
        ))

Key Decisions

Decision Recommendation
Event naming Past tense (OrderPlaced, not PlaceOrder)
Concurrency Optimistic locking with version check
Snapshots Every 100-500 events for large aggregates
Event schema Version events, support upcasting
Projections Async handlers, idempotent updates
Storage PostgreSQL + JSONB or dedicated event store

Anti-Patterns (FORBIDDEN)

# NEVER modify stored events
await event_store.update(event_id, new_data)  # Destroys audit trail

# NEVER include computed data in events
class OrderPlaced(DomainEvent):
    total: float  # WRONG - compute from line items

# NEVER ignore event ordering
async for event in events:  # May arrive out of order
    await handle(event)  # Must check version/sequence

# ALWAYS use immutable events
class Event(BaseModel):
    class Config:
        frozen = True  # Correct

# ALWAYS version your events
event_schema_version: int = 1  # Support schema evolution

Related Skills

  • message-queues - Distributed event delivery
  • database-schema-designer - Event store schema design
  • integration-testing - Testing event-sourced systems

Capability Details

event-store

Keywords: event store, append-only, event persistence, event log Solves:

  • Store events with optimistic concurrency
  • Query events by aggregate ID
  • Implement event versioning

aggregate-pattern

Keywords: aggregate, domain model, event sourcing aggregate, DDD Solves:

  • Model aggregates with event sourcing
  • Apply events to rebuild state
  • Handle commands and raise events

projections

Keywords: projection, read model, CQRS read side, denormalization Solves:

  • Create optimized read models from events
  • Implement async event handlers
  • Build materialized views

snapshots

Keywords: snapshot, performance, aggregate loading, checkpoint Solves:

  • Speed up aggregate loading with snapshots
  • Implement snapshot strategies
  • Balance snapshot frequency vs storage
Weekly Installs
10
GitHub Stars
95
First Seen
Jan 22, 2026
Installed on
claude-code7
opencode5
antigravity5
gemini-cli5
windsurf4
codex4