event-driven
Originally fromcosmix/loom
SKILL.md
Event-Driven Architecture
Asynchronous communication through events and messages.
When to Use
- Decoupled systems
- Async processing
- Real-time notifications
- Audit trails
Quick Start
import { EventEmitter } from "events";
const eventBus = new EventEmitter();
// Subscribe
eventBus.on("user:created", (user) => {
sendWelcomeEmail(user);
});
// Publish
eventBus.emit("user:created", { id: "123", email: "user@example.com" });
Core Concepts
Message Queue (RabbitMQ)
import amqp from "amqplib";
// Producer
const channel = await connection.createChannel();
await channel.assertQueue("orders");
channel.sendToQueue("orders", Buffer.from(JSON.stringify(order)));
// Consumer
await channel.consume("orders", async (msg) => {
const order = JSON.parse(msg.content.toString());
await processOrder(order);
channel.ack(msg);
});
Kafka Streams
import { Kafka } from "kafkajs";
const kafka = new Kafka({ brokers: ["localhost:9092"] });
const producer = kafka.producer();
const consumer = kafka.consumer({ groupId: "order-service" });
// Produce
await producer.send({
topic: "orders",
messages: [{ key: orderId, value: JSON.stringify(event) }],
});
// Consume
await consumer.subscribe({ topic: "orders" });
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const event = JSON.parse(message.value!.toString());
await handleEvent(event);
},
});
Common Patterns
Event Sourcing
interface Event {
type: string;
data: unknown;
timestamp: Date;
aggregateId: string;
}
class OrderAggregate {
private events: Event[] = [];
private state: OrderState = { status: "pending" };
apply(event: Event) {
this.events.push(event);
this.state = this.reducer(this.state, event);
}
private reducer(state: OrderState, event: Event): OrderState {
switch (event.type) {
case "ORDER_PLACED":
return { ...state, status: "placed" };
case "ORDER_SHIPPED":
return { ...state, status: "shipped" };
default:
return state;
}
}
}
Dead Letter Queue
await channel.assertQueue("orders.dlq");
await channel.assertQueue("orders", {
deadLetterExchange: "",
deadLetterRoutingKey: "orders.dlq",
});
// Handle failed messages
await channel.consume("orders.dlq", async (msg) => {
await notifyAdmin(msg);
channel.ack(msg);
});
Best Practices
Do:
- Use idempotent consumers
- Implement dead letter queues
- Add correlation IDs
- Monitor queue depth
Don't:
- Rely on message order
- Skip error handling
- Ignore backpressure
- Forget acknowledgments
Troubleshooting
| Issue | Cause | Solution |
|---|---|---|
| Message loss | No persistence | Enable durable queues |
| Duplicate processing | No idempotency | Add idempotency keys |
| Queue buildup | Slow consumers | Scale consumers |
References
Weekly Installs
2
Repository
g1joshi/agent-skillsGitHub Stars
7
First Seen
Feb 10, 2026
Security Audits
Installed on
mcpjam2
claude-code2
replit2
junie2
windsurf2
zencoder2