message_queues
Message Queues
Implement asynchronous communication patterns for event-driven architectures, background job processing, and service decoupling.
When to Use This Skill
Use message queues when:
- Long-running operations block HTTP requests (report generation, video processing)
- Service decoupling required (microservices, event-driven architecture)
- Guaranteed delivery needed (payment processing, order fulfillment)
- Event streaming for analytics (log aggregation, metrics pipelines)
- Workflow orchestration for complex processes (multi-step sagas, human-in-the-loop)
- Background job processing (email sending, image resizing)
Broker Selection Decision Tree
Choose message broker based on primary need:
Event Streaming / Log Aggregation
→ Apache Kafka
- Throughput: 500K-1M msg/s
- Replay events (event sourcing)
- Exactly-once semantics
- Long-term retention
- Use: Analytics pipelines, CQRS, event sourcing
Simple Background Jobs
→ Task Queues
- Python → Celery + Redis
- TypeScript → BullMQ + Redis
- Go → Asynq + Redis
- Use: Email sending, report generation, webhooks
Complex Workflows / Sagas
→ Temporal
- Durable execution (survives restarts)
- Saga pattern support
- Human-in-the-loop workflows
- Use: Order processing, AI agent orchestration
Request-Reply / RPC Patterns
→ NATS
- Built-in request-reply
- Sub-millisecond latency
- Cloud-native, simple operations
- Use: Microservices RPC, IoT command/control
Complex Message Routing
→ RabbitMQ
- Exchanges (direct, topic, fanout, headers)
- Dead letter exchanges
- Message TTL, priorities
- Use: Multi-consumer patterns, pub/sub
Already Using Redis
→ Redis Streams
- No new infrastructure
- Simple consumer groups
- Moderate throughput (100K+ msg/s)
- Use: Notification queues, simple job queues
Performance Comparison
| Broker | Throughput | Latency (p99) | Best For |
|---|---|---|---|
| Kafka | 500K-1M msg/s | 10-50ms | Event streaming |
| NATS JetStream | 200K-400K msg/s | Sub-ms to 5ms | Cloud-native microservices |
| RabbitMQ | 50K-100K msg/s | 5-20ms | Task queues, complex routing |
| Redis Streams | 100K+ msg/s | Sub-ms | Simple queues, caching |
Quick Start Examples
Kafka Producer/Consumer (Python)
See examples/kafka-python/ for working code.
from confluent_kafka import Producer, Consumer
# Producer
producer = Producer({'bootstrap.servers': 'localhost:9092'})
producer.produce('orders', key='order_123', value='{"status": "created"}')
producer.flush()
# Consumer
consumer = Consumer({
'bootstrap.servers': 'localhost:9092',
'group.id': 'order-processors',
'auto.offset.reset': 'earliest'
})
consumer.subscribe(['orders'])
while True:
msg = consumer.poll(1.0)
if msg is not None:
process_order(msg.value())
Celery Background Jobs (Python)
See examples/celery-image-processing/ for full implementation.
from celery import Celery
app = Celery('tasks', broker='redis://localhost:6379')
@app.task(bind=True, max_retries=3)
def process_image(self, image_url: str):
try:
result = expensive_image_processing(image_url)
return result
except RecoverableError as e:
raise self.retry(exc=e, countdown=60)
BullMQ Job Processing (TypeScript)
See examples/bullmq-webhook-processor/ for full implementation.
import { Queue, Worker } from 'bullmq'
const queue = new Queue('webhooks', {
connection: { host: 'localhost', port: 6379 }
})
// Enqueue job
await queue.add('send-webhook', {
url: 'https://example.com/webhook',
payload: { event: 'order.created' }
})
// Process jobs
const worker = new Worker('webhooks', async job => {
await fetch(job.data.url, {
method: 'POST',
body: JSON.stringify(job.data.payload)
})
}, { connection: { host: 'localhost', port: 6379 } })
Temporal Workflow Orchestration
See examples/temporal-order-saga/ for saga pattern implementation.
from temporalio import workflow, activity
from datetime import timedelta
@workflow.defn
class OrderSagaWorkflow:
@workflow.run
async def run(self, order_id: str) -> str:
# Step 1: Reserve inventory
inventory_id = await workflow.execute_activity(
reserve_inventory,
order_id,
start_to_close_timeout=timedelta(seconds=10),
)
# Step 2: Charge payment
payment_id = await workflow.execute_activity(
charge_payment,
order_id,
start_to_close_timeout=timedelta(seconds=30),
)
return f"Order {order_id} completed"
Core Patterns
Event Naming Convention
Use: Domain.Entity.Action.Version
Examples:
order.created.v1user.profile.updated.v2payment.failed.v1
Event Schema Structure
{
"event_type": "order.created.v2",
"event_id": "uuid-here",
"timestamp": "2025-12-02T10:00:00Z",
"version": "2.0",
"data": {
"order_id": "ord_123",
"customer_id": "cus_456"
},
"metadata": {
"producer": "order-service",
"trace_id": "abc123",
"correlation_id": "xyz789"
}
}
Dead Letter Queue Pattern
Route failed messages to dead letter queue (DLQ) after max retries:
@app.task(bind=True, max_retries=3)
def process_order(self, order_id: str):
try:
result = perform_processing(order_id)
return result
except UnrecoverableError as e:
send_to_dlq(order_id, str(e))
raise Reject(e, requeue=False)
Idempotency for Exactly-Once Processing
@app.post("/process")
async def process_payment(
payment_data: dict,
idempotency_key: str = Header(None)
):
# Check if already processed
cached_result = redis_client.get(f"idempotency:{idempotency_key}")
if cached_result:
return {"status": "already_processed"}
result = process_payment_logic(payment_data)
redis_client.setex(f"idempotency:{idempotency_key}", 86400, result)
return {"status": "processed", "result": result}
Frontend Integration
Job Status Updates via SSE
# FastAPI endpoint for real-time job status
@app.get("/status/{task_id}")
async def task_status_stream(task_id: str):
async def event_generator():
while True:
task = celery_app.AsyncResult(task_id)
if task.state == 'PROGRESS':
yield {"event": "progress", "data": task.info.get('progress', 0)}
elif task.state == 'SUCCESS':
yield {"event": "complete", "data": task.result}
break
await asyncio.sleep(0.5)
return EventSourceResponse(event_generator())
React Component
export function JobStatus({ jobId }: { jobId: string }) {
const [progress, setProgress] = useState(0)
useEffect(() => {
const eventSource = new EventSource(`/api/status/${jobId}`)
eventSource.addEventListener('progress', (e) => {
setProgress(JSON.parse(e.data))
})
eventSource.addEventListener('complete', (e) => {
toast({ title: 'Job complete', description: JSON.parse(e.data) })
eventSource.close()
})
return () => eventSource.close()
}, [jobId])
return <ProgressBar value={progress} />
}
Detailed Guides
For comprehensive documentation, see reference files:
Broker-Specific Guides
- Kafka: See
references/kafka.mdfor partitioning, consumer groups, exactly-once semantics - RabbitMQ: See
references/rabbitmq.mdfor exchanges, bindings, routing patterns - NATS: See
references/nats.mdfor JetStream, request-reply patterns - Redis Streams: See
references/redis-streams.mdfor consumer groups, acknowledgments
Task Queue Guides
- Celery: See
references/celery.mdfor periodic tasks, canvas (workflows), monitoring - BullMQ: See
references/bullmq.mdfor job prioritization, flows, Bull Board monitoring - Temporal: See
references/temporal-workflows.mdfor saga patterns, signals, queries
Pattern Guides
- Event Patterns: See
references/event-patterns.mdfor event sourcing, CQRS, outbox pattern
Common Anti-Patterns to Avoid
1. Synchronous API for Long Operations
# ❌ BAD: Blocks request thread
@app.post("/generate-report")
def generate_report(user_id: str):
report = expensive_computation(user_id) # 5 minutes!
return report
# ✅ GOOD: Enqueue background job
@app.post("/generate-report")
async def generate_report(user_id: str):
task = generate_report_task.delay(user_id)
return {"task_id": task.id}
2. Non-Idempotent Consumers
# ❌ BAD: Processes duplicates
@app.task
def send_email(email: str):
send_email_service(email) # Sends twice if retried!
# ✅ GOOD: Idempotent with deduplication
@app.task
def send_email(email: str, idempotency_key: str):
if redis.exists(f"sent:{idempotency_key}"):
return "already_sent"
send_email_service(email)
redis.setex(f"sent:{idempotency_key}", 86400, "1")
3. Ignoring Dead Letter Queues
# ❌ BAD: Failed messages lost forever
@app.task(max_retries=3)
def risky_task(data):
process(data) # If all retries fail, data disappears
# ✅ GOOD: DLQ for manual inspection
@app.task(max_retries=3)
def risky_task(data):
try:
process(data)
except Exception as e:
if self.request.retries >= 3:
send_to_dlq(data, str(e))
raise
4. Using Kafka for Request-Reply
# ❌ BAD: Kafka is not designed for RPC
def get_user_profile(user_id: str):
kafka_producer.send("user_requests", {"user_id": user_id})
# How to correlate response? Kafka is asynchronous!
# ✅ GOOD: Use NATS request-reply or HTTP/gRPC
response = await nats.request("user.profile", user_id.encode())
Library Recommendations
Context7 Research
Confluent Kafka (Python)
- Context7 ID:
/confluentinc/confluent-kafka-python - Trust Score: 68.8/100
- Code Snippets: 192+
- Production-ready Python Kafka client
Temporal
- Context7 ID:
/websites/temporal_io - Trust Score: 80.9/100
- Code Snippets: 3,769+
- Workflow orchestration for durable execution
Installation
Python:
pip install confluent-kafka celery[redis] temporalio aio-pika redis
TypeScript/Node.js:
npm install kafkajs bullmq @temporalio/client amqplib ioredis
Rust:
cargo add rdkafka lapin async-nats redis
Go:
go get github.com/confluentinc/confluent-kafka-go
go get github.com/hibiken/asynq
go get go.temporal.io/sdk
Utilities
Use scripts for setup automation:
- Kafka setup: Run
python scripts/kafka_producer_consumer.pyfor test utilities - Schema validation: Run
python scripts/validate_message_schema.pyto validate event schemas
Related Skills
- api-patterns: API design for async job submission
- realtime-sync: WebSocket/SSE for job status updates
- feedback: Toast notifications for job completion
- databases-*: Persistent storage for event logs Message Queues v1.1 - Enhanced
🔄 Workflow
Kaynak: Enterprise Integration Patterns & Confluent Kafka Guide
Aşama 1: Design Phase
- Pattern Selection: Point-to-Point (Queue) mi Pub-Sub (Topic) mi karar ver.
- Schema Registry: Mesaj formatını (Avro/Protobuf) ve versiyonlamayı baştan yap.
- Partitioning: Veri dağılımını (Ordering garantisi için Key seçimi) planla.
Aşama 2: Implementation Checklist
- Idempotency: Consumer tarafında "Exactly-Once" veya "At-Least-Once" stratejisini kur.
- DLQ: İşlenemeyen mesajlar için Dead Letter Queue ve Alarm kur.
- Backpressure: Consumer yavaşlarsa Producer'ı yavaşlatacak mekanizmayı düşün.
Aşama 3: Operations
- Lag Monitoring: Consumer Lag (üretim hızı vs tüketim hızı) metriğini izle.
- Retention: Disk doluluğunu önlemek için retention policy (süre veya boyut) ayarla.
Kontrol Noktaları
| Aşama | Doğrulama |
|---|---|
| 1 | Mesaj sırasında (ordering) bozulma iş mantığını bozuyor mu? |
| 2 | Sistem 24 saatlik log kaybına dayanıklı mı (Durability)? |
| 3 | Poison message (formatı bozuk mesaj) sistemi kilitliyor mu? |
More from vuralserhat86/antigravity-agentic-skills
skill_creator
Guide for creating effective skills. This skill should be used when users want to create a new skill (or update an existing skill) that extends Claude's capabilities with specialized knowledge, workflows, or tool integrations.
37huggingface_transformers
Hugging Face Transformers best practices including model loading, tokenization, fine-tuning workflows, and inference optimization. Use when working with transformer models, fine-tuning LLMs, implementing NLP tasks, or optimizing transformer inference.
22responsive_design
Build responsive, mobile-first layouts using fluid containers, flexible units, media queries, and touch-friendly design that works across all screen sizes. Use this skill when creating or modifying UI layouts, responsive grids, breakpoint styles, mobile navigation, or any interface that needs to adapt to different screen sizes. Apply when working with responsive CSS, media queries, viewport settings, flexbox/grid layouts, mobile-first styling, breakpoint definitions (mobile, tablet, desktop), touch target sizing, relative units (rem, em, %), image optimization for different screens, or testing layouts across multiple devices. Use for any task involving multi-device support, responsive design patterns, or adaptive layouts.
20cache_patterns
Instruction set for enabling and operating the Spring Cache abstraction in Spring Boot when implementing application-level caching for performance-sensitive workloads.
16zustand_state
Production-tested setup for Zustand state management in React. Includes patterns for persistence, devtools, and TypeScript patterns. Prevents hydration mismatches and render loops.
14vitest_runner
Modern JavaScript/TypeScript testing with Vitest including mocking and coverage.
13