event-driven-architect
SKILL.md
Event-Driven Architect
Build decoupled, scalable systems with event-driven patterns.
Core Workflow
- Identify domain events: Define what happened
- Design event schema: Structure event payloads
- Implement event bus: Publish and subscribe
- Add event handlers: React to events
- Consider CQRS: Separate reads and writes
- Enable event sourcing: Store event history
Event Fundamentals
Event Structure
// events/base.ts
export interface DomainEvent<T = unknown> {
id: string;
type: string;
aggregateId: string;
aggregateType: string;
payload: T;
metadata: {
timestamp: Date;
version: number;
correlationId?: string;
causationId?: string;
userId?: string;
};
}
// Type-safe event creator
export function createEvent<T>(
type: string,
aggregateType: string,
aggregateId: string,
payload: T,
metadata?: Partial<DomainEvent['metadata']>
): DomainEvent<T> {
return {
id: crypto.randomUUID(),
type,
aggregateType,
aggregateId,
payload,
metadata: {
timestamp: new Date(),
version: 1,
...metadata,
},
};
}
Define Domain Events
// events/order.events.ts
export interface OrderCreatedPayload {
customerId: string;
items: Array<{
productId: string;
quantity: number;
price: number;
}>;
totalAmount: number;
shippingAddress: Address;
}
export interface OrderPaidPayload {
paymentId: string;
amount: number;
method: 'card' | 'bank' | 'wallet';
}
export interface OrderShippedPayload {
trackingNumber: string;
carrier: string;
estimatedDelivery: string;
}
export interface OrderCancelledPayload {
reason: string;
cancelledBy: string;
refundAmount?: number;
}
// Event types
export type OrderEvent =
| DomainEvent<OrderCreatedPayload> & { type: 'OrderCreated' }
| DomainEvent<OrderPaidPayload> & { type: 'OrderPaid' }
| DomainEvent<OrderShippedPayload> & { type: 'OrderShipped' }
| DomainEvent<OrderCancelledPayload> & { type: 'OrderCancelled' };
// Event creators
export const OrderEvents = {
created: (orderId: string, payload: OrderCreatedPayload) =>
createEvent('OrderCreated', 'Order', orderId, payload),
paid: (orderId: string, payload: OrderPaidPayload) =>
createEvent('OrderPaid', 'Order', orderId, payload),
shipped: (orderId: string, payload: OrderShippedPayload) =>
createEvent('OrderShipped', 'Order', orderId, payload),
cancelled: (orderId: string, payload: OrderCancelledPayload) =>
createEvent('OrderCancelled', 'Order', orderId, payload),
};
Event Bus
In-Memory Event Bus
// events/event-bus.ts
import { EventEmitter } from 'events';
import { DomainEvent } from './base';
type EventHandler<T = unknown> = (event: DomainEvent<T>) => Promise<void>;
class EventBus {
private emitter = new EventEmitter();
private handlers = new Map<string, EventHandler[]>();
async publish<T>(event: DomainEvent<T>): Promise<void> {
console.log(`Publishing event: ${event.type}`, event);
// Store event (for event sourcing)
await this.storeEvent(event);
// Emit to handlers
this.emitter.emit(event.type, event);
this.emitter.emit('*', event); // Wildcard for all events
}
async publishAll(events: DomainEvent[]): Promise<void> {
for (const event of events) {
await this.publish(event);
}
}
subscribe<T>(eventType: string, handler: EventHandler<T>): () => void {
const wrappedHandler = async (event: DomainEvent<T>) => {
try {
await handler(event);
} catch (error) {
console.error(`Error handling ${eventType}:`, error);
// Could emit to dead letter queue here
}
};
this.emitter.on(eventType, wrappedHandler);
// Return unsubscribe function
return () => {
this.emitter.off(eventType, wrappedHandler);
};
}
subscribeAll(handler: EventHandler): () => void {
return this.subscribe('*', handler);
}
private async storeEvent(event: DomainEvent): Promise<void> {
await db.event.create({
data: {
id: event.id,
type: event.type,
aggregateId: event.aggregateId,
aggregateType: event.aggregateType,
payload: event.payload as any,
metadata: event.metadata as any,
createdAt: event.metadata.timestamp,
},
});
}
}
export const eventBus = new EventBus();
Redis-Based Event Bus
// events/redis-event-bus.ts
import { Redis } from 'ioredis';
import { DomainEvent } from './base';
const publisher = new Redis(process.env.REDIS_URL!);
const subscriber = new Redis(process.env.REDIS_URL!);
class RedisEventBus {
private handlers = new Map<string, Set<(event: DomainEvent) => Promise<void>>>();
constructor() {
subscriber.on('message', async (channel, message) => {
const event = JSON.parse(message) as DomainEvent;
const handlers = this.handlers.get(channel) || new Set();
for (const handler of handlers) {
try {
await handler(event);
} catch (error) {
console.error(`Error handling ${event.type}:`, error);
}
}
});
}
async publish(event: DomainEvent): Promise<void> {
const channel = `events:${event.type}`;
await publisher.publish(channel, JSON.stringify(event));
// Also store in stream for replay
await publisher.xadd(
`stream:${event.aggregateType}`,
'*',
'event',
JSON.stringify(event)
);
}
subscribe(eventType: string, handler: (event: DomainEvent) => Promise<void>): () => void {
const channel = `events:${eventType}`;
if (!this.handlers.has(channel)) {
this.handlers.set(channel, new Set());
subscriber.subscribe(channel);
}
this.handlers.get(channel)!.add(handler);
return () => {
this.handlers.get(channel)?.delete(handler);
};
}
}
export const eventBus = new RedisEventBus();
Event Handlers
Handler Registration
// handlers/order.handlers.ts
import { eventBus } from '../events/event-bus';
import { OrderEvent } from '../events/order.events';
// Email notification on order created
eventBus.subscribe<OrderCreatedPayload>('OrderCreated', async (event) => {
await emailService.send({
to: await getUserEmail(event.payload.customerId),
template: 'order-confirmation',
data: {
orderId: event.aggregateId,
items: event.payload.items,
total: event.payload.totalAmount,
},
});
});
// Update inventory on order created
eventBus.subscribe<OrderCreatedPayload>('OrderCreated', async (event) => {
for (const item of event.payload.items) {
await inventoryService.reserve(item.productId, item.quantity);
}
});
// Analytics tracking
eventBus.subscribe<OrderPaidPayload>('OrderPaid', async (event) => {
await analytics.track('order_completed', {
orderId: event.aggregateId,
amount: event.payload.amount,
paymentMethod: event.payload.method,
});
});
// Notify shipping on order paid
eventBus.subscribe<OrderPaidPayload>('OrderPaid', async (event) => {
await shippingService.createShipment(event.aggregateId);
});
// Handle cancellation
eventBus.subscribe<OrderCancelledPayload>('OrderCancelled', async (event) => {
// Release inventory
const order = await orderRepository.findById(event.aggregateId);
for (const item of order.items) {
await inventoryService.release(item.productId, item.quantity);
}
// Process refund
if (event.payload.refundAmount) {
await paymentService.refund(event.aggregateId, event.payload.refundAmount);
}
// Send cancellation email
await emailService.send({
to: await getUserEmail(order.customerId),
template: 'order-cancelled',
data: {
orderId: event.aggregateId,
reason: event.payload.reason,
},
});
});
Event Sourcing
Aggregate with Events
// aggregates/order.aggregate.ts
import { DomainEvent } from '../events/base';
import { OrderEvents, OrderCreatedPayload, OrderPaidPayload } from '../events/order.events';
interface OrderItem {
productId: string;
quantity: number;
price: number;
}
type OrderStatus = 'pending' | 'paid' | 'shipped' | 'delivered' | 'cancelled';
export class OrderAggregate {
private _id: string;
private _status: OrderStatus = 'pending';
private _items: OrderItem[] = [];
private _totalAmount: number = 0;
private _customerId: string = '';
private _version: number = 0;
private uncommittedEvents: DomainEvent[] = [];
get id() { return this._id; }
get status() { return this._status; }
get items() { return [...this._items]; }
get version() { return this._version; }
constructor(id?: string) {
this._id = id || crypto.randomUUID();
}
// Command: Create order
static create(customerId: string, items: OrderItem[], shippingAddress: Address): OrderAggregate {
const order = new OrderAggregate();
const totalAmount = items.reduce((sum, item) => sum + item.price * item.quantity, 0);
order.apply(
OrderEvents.created(order._id, {
customerId,
items,
totalAmount,
shippingAddress,
})
);
return order;
}
// Command: Pay order
pay(paymentId: string, amount: number, method: 'card' | 'bank' | 'wallet'): void {
if (this._status !== 'pending') {
throw new Error('Order cannot be paid in current status');
}
if (amount !== this._totalAmount) {
throw new Error('Payment amount does not match order total');
}
this.apply(
OrderEvents.paid(this._id, { paymentId, amount, method })
);
}
// Command: Cancel order
cancel(reason: string, cancelledBy: string): void {
if (['shipped', 'delivered', 'cancelled'].includes(this._status)) {
throw new Error('Order cannot be cancelled in current status');
}
const refundAmount = this._status === 'paid' ? this._totalAmount : undefined;
this.apply(
OrderEvents.cancelled(this._id, { reason, cancelledBy, refundAmount })
);
}
// Apply event and track for persistence
private apply(event: DomainEvent): void {
this.applyEvent(event);
this.uncommittedEvents.push(event);
}
// Apply event to state (used for replay too)
private applyEvent(event: DomainEvent): void {
switch (event.type) {
case 'OrderCreated':
const created = event.payload as OrderCreatedPayload;
this._customerId = created.customerId;
this._items = created.items;
this._totalAmount = created.totalAmount;
this._status = 'pending';
break;
case 'OrderPaid':
this._status = 'paid';
break;
case 'OrderShipped':
this._status = 'shipped';
break;
case 'OrderCancelled':
this._status = 'cancelled';
break;
}
this._version++;
}
// Get and clear uncommitted events
getUncommittedEvents(): DomainEvent[] {
const events = [...this.uncommittedEvents];
this.uncommittedEvents = [];
return events;
}
// Rebuild from events
static fromEvents(events: DomainEvent[]): OrderAggregate {
if (events.length === 0) {
throw new Error('Cannot rebuild aggregate from empty events');
}
const order = new OrderAggregate(events[0].aggregateId);
for (const event of events) {
order.applyEvent(event);
}
return order;
}
}
Event Store Repository
// repositories/event-store.repository.ts
import { db } from '../lib/db';
import { DomainEvent } from '../events/base';
import { eventBus } from '../events/event-bus';
export class EventStoreRepository<T extends { id: string; getUncommittedEvents(): DomainEvent[] }> {
constructor(
private aggregateType: string,
private reconstruct: (events: DomainEvent[]) => T
) {}
async save(aggregate: T): Promise<void> {
const events = aggregate.getUncommittedEvents();
if (events.length === 0) return;
// Store events
await db.event.createMany({
data: events.map((event) => ({
id: event.id,
type: event.type,
aggregateId: event.aggregateId,
aggregateType: event.aggregateType,
payload: event.payload as any,
metadata: event.metadata as any,
createdAt: event.metadata.timestamp,
})),
});
// Publish events
await eventBus.publishAll(events);
}
async findById(id: string): Promise<T | null> {
const events = await db.event.findMany({
where: {
aggregateId: id,
aggregateType: this.aggregateType,
},
orderBy: { createdAt: 'asc' },
});
if (events.length === 0) return null;
return this.reconstruct(
events.map((e) => ({
id: e.id,
type: e.type,
aggregateId: e.aggregateId,
aggregateType: e.aggregateType,
payload: e.payload,
metadata: e.metadata as any,
}))
);
}
async getEvents(aggregateId: string, fromVersion?: number): Promise<DomainEvent[]> {
const events = await db.event.findMany({
where: {
aggregateId,
aggregateType: this.aggregateType,
...(fromVersion && {
metadata: { path: ['version'], gte: fromVersion },
}),
},
orderBy: { createdAt: 'asc' },
});
return events.map((e) => ({
id: e.id,
type: e.type,
aggregateId: e.aggregateId,
aggregateType: e.aggregateType,
payload: e.payload,
metadata: e.metadata as any,
}));
}
}
// Usage
export const orderRepository = new EventStoreRepository(
'Order',
OrderAggregate.fromEvents
);
CQRS Pattern
Separate Command and Query
// commands/create-order.command.ts
export interface CreateOrderCommand {
customerId: string;
items: Array<{ productId: string; quantity: number }>;
shippingAddress: Address;
}
// Command handler
export async function handleCreateOrder(command: CreateOrderCommand): Promise<string> {
// Validate
const customer = await customerRepository.findById(command.customerId);
if (!customer) throw new Error('Customer not found');
// Get product prices
const items = await Promise.all(
command.items.map(async (item) => {
const product = await productRepository.findById(item.productId);
return {
productId: item.productId,
quantity: item.quantity,
price: product.price,
};
})
);
// Create aggregate and save
const order = OrderAggregate.create(
command.customerId,
items,
command.shippingAddress
);
await orderRepository.save(order);
return order.id;
}
// queries/order.queries.ts
// Read model - denormalized for fast queries
export interface OrderReadModel {
id: string;
status: string;
customerName: string;
customerEmail: string;
items: Array<{
productName: string;
quantity: number;
price: number;
}>;
totalAmount: number;
createdAt: Date;
paidAt?: Date;
shippedAt?: Date;
}
// Query handler
export async function getOrderById(orderId: string): Promise<OrderReadModel | null> {
return db.orderReadModel.findUnique({
where: { id: orderId },
});
}
export async function getOrdersByCustomer(customerId: string): Promise<OrderReadModel[]> {
return db.orderReadModel.findMany({
where: { customerId },
orderBy: { createdAt: 'desc' },
});
}
Read Model Projector
// projectors/order.projector.ts
import { eventBus } from '../events/event-bus';
// Project events to read model
eventBus.subscribe('OrderCreated', async (event) => {
const { customerId, items, totalAmount } = event.payload;
const customer = await db.customer.findUnique({ where: { id: customerId } });
await db.orderReadModel.create({
data: {
id: event.aggregateId,
status: 'pending',
customerId,
customerName: customer.name,
customerEmail: customer.email,
items: await enrichItems(items),
totalAmount,
createdAt: event.metadata.timestamp,
},
});
});
eventBus.subscribe('OrderPaid', async (event) => {
await db.orderReadModel.update({
where: { id: event.aggregateId },
data: {
status: 'paid',
paidAt: event.metadata.timestamp,
},
});
});
eventBus.subscribe('OrderShipped', async (event) => {
await db.orderReadModel.update({
where: { id: event.aggregateId },
data: {
status: 'shipped',
shippedAt: event.metadata.timestamp,
trackingNumber: event.payload.trackingNumber,
},
});
});
Best Practices
- Immutable events: Never modify stored events
- Descriptive event names: Past tense (OrderCreated, not CreateOrder)
- Include all context: Events should be self-contained
- Version events: Handle schema evolution
- Idempotent handlers: Handle duplicate events gracefully
- Separate concerns: Commands mutate, queries read
- Event versioning: Support backward compatibility
- Dead letter queue: Handle failed events
Output Checklist
Every event-driven system should include:
- Well-defined domain events
- Type-safe event payloads
- Event bus (in-memory or distributed)
- Event handlers with error handling
- Event store for persistence
- Aggregate with event sourcing (if needed)
- CQRS separation (if needed)
- Read model projectors
- Dead letter handling
- Event replay capability
Weekly Installs
9
Repository
patricio0312rev/skillsFirst Seen
10 days ago
Installed on
claude-code7
trae6
gemini-cli6
antigravity6
github-copilot6
windsurf6