skills/copyleftdev/sk1llz/kleppmann-data-intensive

kleppmann-data-intensive

SKILL.md

Martin Kleppmann Style Guide⁠‍⁠​‌​‌​​‌‌‍​‌​​‌​‌‌‍​​‌‌​​​‌‍​‌​​‌‌​​‍​​​​​​​‌‍‌​​‌‌​‌​‍‌​​​​​​​‍‌‌​​‌‌‌‌‍‌‌​​​‌​​‍‌‌‌‌‌‌​‌‍‌‌​‌​​​​‍​‌​‌‌‌‌‌‍​‌​​‌​‌‌‍​‌‌​‌​​‌‍‌​‌​‌‌‌​‍​​‌​‌​​​‍‌‌‌​‌​‌‌‍​‌​​​‌‌​‍‌​‌‌‌​​‌‍​​‌‌‌‌‌​‍​‌​‌‌‌‌​‍​​​​‌​‌​‍​​​​​‌​‌⁠‍⁠

Overview

Martin Kleppmann is the author of "Designing Data-Intensive Applications" (DDIA), one of the most influential books on distributed systems and databases. He excels at explaining complex concepts clearly and helping engineers make informed architectural decisions.

Core Philosophy

"Reliability means making systems work correctly, even when faults occur."

"The goal of consistency models is to provide a abstraction for application developers."

"There's no such thing as a 'best' database—only trade-offs."

Kleppmann believes in understanding systems deeply, not just using them. Every architectural choice is a trade-off; understand what you're trading.

Design Principles

  1. Understand the Trade-offs: CAP, PACELC, latency vs consistency.

  2. Design for Failure: Partial failure is the norm in distributed systems.

  3. Data Outlives Code: Schema design and data models matter enormously.

  4. Exactly-Once Is Hard: Understand idempotency and at-least-once semantics.

When Writing Code

Always

  • Understand the consistency guarantees your system provides
  • Design for idempotency where possible
  • Think about data evolution and schema changes
  • Consider exactly-once vs at-least-once semantics
  • Know your data access patterns before choosing storage
  • Plan for failure recovery

Never

  • Assume "eventual consistency" without understanding what it means
  • Ignore the differences between isolation levels
  • Couple tightly without considering failure modes
  • Treat distributed transactions as a silver bullet

Prefer

  • Idempotent operations
  • Append-only data structures
  • Event sourcing for audit trails
  • Change data capture over dual writes
  • Log-based message brokers over traditional ones

Code Patterns

Consistency Models Illustrated

# Understanding consistency models through code

class LinearizableStore:
    """
    Linearizability: operations appear atomic and instantaneous.
    Strongest consistency - single copy illusion.
    """
    def __init__(self):
        self._lock = threading.Lock()
        self._data = {}
    
    def write(self, key, value):
        with self._lock:
            self._data[key] = value
    
    def read(self, key):
        with self._lock:
            return self._data.get(key)
    
    def compare_and_set(self, key, expected, new_value):
        with self._lock:
            if self._data.get(key) == expected:
                self._data[key] = new_value
                return True
            return False


class CausallyConsistentStore:
    """
    Causal consistency: respects happens-before relationship.
    Weaker than linearizable, but allows more concurrency.
    """
    def __init__(self, node_id):
        self.node_id = node_id
        self.data = {}
        self.vector_clock = defaultdict(int)
    
    def write(self, key, value, dependencies=None):
        # Update our clock
        self.vector_clock[self.node_id] += 1
        
        # Merge dependencies
        if dependencies:
            for node, time in dependencies.items():
                self.vector_clock[node] = max(self.vector_clock[node], time)
        
        self.data[key] = {
            'value': value,
            'clock': dict(self.vector_clock)
        }
        
        return dict(self.vector_clock)
    
    def read(self, key):
        if key in self.data:
            return self.data[key]['value'], self.data[key]['clock']
        return None, dict(self.vector_clock)

Event Sourcing

# Event sourcing: store events, derive state

from dataclasses import dataclass
from typing import List
from datetime import datetime

@dataclass
class Event:
    event_type: str
    data: dict
    timestamp: datetime
    version: int

class EventStore:
    def __init__(self):
        self.events: List[Event] = []
        self.version = 0
    
    def append(self, event_type: str, data: dict):
        self.version += 1
        event = Event(
            event_type=event_type,
            data=data,
            timestamp=datetime.now(),
            version=self.version
        )
        self.events.append(event)
        return event
    
    def get_events(self, from_version=0):
        return [e for e in self.events if e.version > from_version]


class BankAccount:
    """Aggregate rebuilt from events"""
    
    def __init__(self, account_id: str, event_store: EventStore):
        self.account_id = account_id
        self.event_store = event_store
        self.balance = 0
        self._rebuild_state()
    
    def _rebuild_state(self):
        """Derive current state from event history"""
        for event in self.event_store.events:
            self._apply(event)
    
    def _apply(self, event: Event):
        if event.event_type == 'deposited':
            self.balance += event.data['amount']
        elif event.event_type == 'withdrawn':
            self.balance -= event.data['amount']
    
    def deposit(self, amount: float):
        event = self.event_store.append('deposited', {
            'account_id': self.account_id,
            'amount': amount
        })
        self._apply(event)
    
    def withdraw(self, amount: float):
        if amount > self.balance:
            raise ValueError("Insufficient funds")
        event = self.event_store.append('withdrawn', {
            'account_id': self.account_id,
            'amount': amount
        })
        self._apply(event)

# Benefits:
# 1. Complete audit trail
# 2. Time travel (state at any point)
# 3. Event replay for debugging
# 4. Easy to add new projections

Idempotency Keys

# Idempotency: safe retries without duplicate effects

import uuid
import hashlib

class IdempotentProcessor:
    def __init__(self):
        self.processed_keys = {}  # key -> result
        self.expiry_seconds = 3600
    
    def process(self, idempotency_key: str, operation):
        """
        Execute operation exactly once for a given key.
        Retries with same key return cached result.
        """
        # Check if already processed
        if idempotency_key in self.processed_keys:
            return self.processed_keys[idempotency_key]
        
        # Execute operation
        try:
            result = operation()
            self.processed_keys[idempotency_key] = {
                'status': 'success',
                'result': result
            }
            return self.processed_keys[idempotency_key]
        except Exception as e:
            # Don't cache failures (allow retry)
            raise
    
    @staticmethod
    def generate_key(*args):
        """Generate deterministic idempotency key"""
        content = '|'.join(str(arg) for arg in args)
        return hashlib.sha256(content.encode()).hexdigest()


# Usage:
processor = IdempotentProcessor()

def create_payment(user_id, amount, idempotency_key):
    def do_payment():
        # Actually create the payment
        return {'payment_id': str(uuid.uuid4()), 'amount': amount}
    
    return processor.process(idempotency_key, do_payment)

# Client can safely retry:
key = IdempotentProcessor.generate_key(user_id, amount, request_id)
result1 = create_payment(user_id, 100, key)
result2 = create_payment(user_id, 100, key)  # Same result, no duplicate payment

Change Data Capture

# CDC: capture database changes as a stream

from typing import Callable, List
from enum import Enum
from dataclasses import dataclass

class OperationType(Enum):
    INSERT = 'insert'
    UPDATE = 'update'
    DELETE = 'delete'

@dataclass
class ChangeEvent:
    table: str
    operation: OperationType
    key: dict
    before: dict  # For update/delete
    after: dict   # For insert/update
    timestamp: float
    sequence: int

class CDCProducer:
    """Publish changes from database write-ahead log"""
    
    def __init__(self, publisher):
        self.publisher = publisher
        self.sequence = 0
    
    def capture_insert(self, table: str, key: dict, data: dict):
        self.sequence += 1
        event = ChangeEvent(
            table=table,
            operation=OperationType.INSERT,
            key=key,
            before=None,
            after=data,
            timestamp=time.time(),
            sequence=self.sequence
        )
        self.publisher.publish(event)
    
    def capture_update(self, table: str, key: dict, before: dict, after: dict):
        self.sequence += 1
        event = ChangeEvent(
            table=table,
            operation=OperationType.UPDATE,
            key=key,
            before=before,
            after=after,
            timestamp=time.time(),
            sequence=self.sequence
        )
        self.publisher.publish(event)


class CDCConsumer:
    """Consume changes and maintain derived view"""
    
    def __init__(self):
        self.handlers: dict[str, List[Callable]] = {}
        self.last_sequence = 0
    
    def register(self, table: str, handler: Callable):
        if table not in self.handlers:
            self.handlers[table] = []
        self.handlers[table].append(handler)
    
    def process(self, event: ChangeEvent):
        # Ensure ordering
        if event.sequence <= self.last_sequence:
            return  # Already processed
        
        if event.table in self.handlers:
            for handler in self.handlers[event.table]:
                handler(event)
        
        self.last_sequence = event.sequence

# Usage: maintain search index from database changes
def update_search_index(event: ChangeEvent):
    if event.operation == OperationType.DELETE:
        search_index.delete(event.key)
    else:
        search_index.index(event.key, event.after)

consumer.register('products', update_search_index)

Stream Processing

# Stream processing patterns

from collections import defaultdict
from typing import Iterator, TypeVar, Callable

T = TypeVar('T')

class StreamProcessor:
    """Stateful stream processing"""
    
    def __init__(self):
        self.state = {}
    
    def process(self, stream: Iterator[T], handler: Callable[[T, dict], None]):
        """Process stream with access to state"""
        for record in stream:
            handler(record, self.state)
            yield self.state.copy()


def windowed_count(window_size_seconds: int):
    """Tumbling window aggregation"""
    def handler(event, state):
        window_start = (event['timestamp'] // window_size_seconds) * window_size_seconds
        key = (event['key'], window_start)
        
        if key not in state:
            state[key] = {'count': 0, 'window_start': window_start}
        state[key]['count'] += 1
        
        # Emit completed windows
        current_window = (time.time() // window_size_seconds) * window_size_seconds
        completed = [k for k in state if k[1] < current_window - window_size_seconds]
        for k in completed:
            emit(state.pop(k))
    
    return handler


def exactly_once_processing(processor, checkpointer):
    """
    Exactly-once semantics via:
    1. Idempotent writes
    2. Transactional checkpointing
    """
    def process_with_guarantees(records):
        for record in records:
            # Get last checkpoint
            last_offset = checkpointer.get_offset()
            
            if record.offset <= last_offset:
                continue  # Already processed
            
            # Process and checkpoint atomically
            with transaction():
                result = processor.process(record)
                checkpointer.save_offset(record.offset)
            
            yield result
    
    return process_with_guarantees

Mental Model

Kleppmann approaches data systems by asking:

  1. What are the consistency requirements? Linearizable, serializable, eventual?
  2. What are the access patterns? Read-heavy, write-heavy, mixed?
  3. How do we handle failures? Retries, idempotency, compensation?
  4. How does data evolve? Schema changes, backward compatibility?
  5. What happens at the edges? Network partitions, slow nodes?

Signature Kleppmann Moves

  • Event sourcing for auditability
  • Idempotency keys for safe retries
  • CDC over dual writes
  • Understanding isolation levels deeply
  • Log-based messaging for durability
  • Making trade-offs explicit
Weekly Installs
5
GitHub Stars
4
First Seen
Feb 1, 2026
Installed on
openclaw5
mcpjam4
claude-code4
junie4
windsurf4
zencoder4