routing-strategies
Celery Task Routing Strategies Skill
This skill provides comprehensive templates, scripts, and patterns for implementing advanced task routing and queue management in Celery applications, including priority queues, topic-based routing, and worker-specific queue assignments.
Overview
Effective task routing is crucial for:
- Performance Optimization - Route compute-intensive tasks to dedicated workers
- Priority Management - High-priority tasks bypass slower queues
- Resource Isolation - Separate critical operations from background jobs
- Scalability - Independent scaling of different task types
This skill covers routing with RabbitMQ, Redis, and custom broker configurations.
Available Scripts
1. Test Routing Configuration
Script: scripts/test-routing.sh <config-file>
Purpose: Validates routing configuration and tests queue connectivity
Checks:
- Broker connectivity (RabbitMQ/Redis)
- Queue declarations
- Exchange configurations
- Routing key patterns
- Worker queue bindings
- Priority queue setup
Usage:
# Test routing configuration
./scripts/test-routing.sh ./celery_config.py
# Test with custom broker URL
BROKER_URL=amqp://user:password@localhost:5672// ./scripts/test-routing.sh ./celery_config.py
# Verbose output
VERBOSE=1 ./scripts/test-routing.sh ./celery_config.py
Exit Codes:
0: All routing tests passed1: Configuration errors detected2: Broker connection failed
2. Validate Queue Configuration
Script: scripts/validate-queues.sh <project-dir>
Purpose: Validates queue setup across application code
Checks:
- Task decorators use valid queues
- No hardcoded queue names (use config)
- All queues defined in routing configuration
- Priority settings are valid (0-255)
- Exchange types match routing patterns
- Worker configurations reference valid queues
Usage:
# Validate current project
./scripts/validate-queues.sh .
# Validate specific directory
./scripts/validate-queues.sh /path/to/celery-app
# Generate detailed report
REPORT=1 ./scripts/validate-queues.sh . > queue-validation-report.md
Exit Codes:
0: Validation passed1: Validation failed (must fix issues)
Available Templates
1. Basic Queue Configuration
Template: templates/queue-config.py
Features:
- Default queue setup
- Named queues for different task types
- Queue-to-exchange bindings
- Priority settings
- Worker routing configuration
Usage:
from celery import Celery
from templates.queue_config import CELERY_ROUTES, CELERY_QUEUES
app = Celery('myapp')
app.conf.task_routes = CELERY_ROUTES
app.conf.task_queues = CELERY_QUEUES
Configuration Example:
CELERY_QUEUES = (
Queue('default', Exchange('default'), routing_key='default'),
Queue('high_priority', Exchange('default'), routing_key='high'),
Queue('low_priority', Exchange('default'), routing_key='low'),
Queue('emails', Exchange('emails'), routing_key='email.*'),
Queue('reports', Exchange('reports'), routing_key='report.*'),
)
CELERY_ROUTES = {
'myapp.tasks.send_email': {'queue': 'emails', 'routing_key': 'email.send'},
'myapp.tasks.generate_report': {'queue': 'reports', 'routing_key': 'report.generate'},
'myapp.tasks.urgent_task': {'queue': 'high_priority', 'priority': 9},
}
2. Dynamic Routing Rules
Template: templates/routing-rules.py
Features:
- Pattern-based routing
- Conditional routing logic
- Dynamic queue selection
- Routing by task name patterns
- Routing by task arguments
Key Functions:
def route_task(name, args, kwargs, options, task=None, **kw):
"""
Dynamic routing based on task name or arguments
"""
if name.startswith('urgent.'):
return {'queue': 'high_priority', 'priority': 9}
if 'priority' in kwargs and kwargs['priority'] == 'high':
return {'queue': 'high_priority'}
if name.startswith('email.'):
return {'queue': 'emails', 'exchange': 'emails'}
return {'queue': 'default'}
app.conf.task_routes = (route_task,)
3. Priority Queue Setup
Template: templates/priority-queues.py
Features:
- Multi-level priority queues (0-255)
- Priority inheritance
- Default priority configuration
- Queue priority enforcement
Priority Levels:
# Priority queue configuration
CELERY_QUEUES = (
Queue('critical', Exchange('tasks'), routing_key='critical',
queue_arguments={'x-max-priority': 10}),
Queue('high', Exchange('tasks'), routing_key='high',
queue_arguments={'x-max-priority': 10}),
Queue('normal', Exchange('tasks'), routing_key='normal',
queue_arguments={'x-max-priority': 10}),
Queue('low', Exchange('tasks'), routing_key='low',
queue_arguments={'x-max-priority': 10}),
)
# Task priority mapping
PRIORITY_LEVELS = {
'critical': 10, # Highest priority
'high': 7,
'normal': 5,
'low': 2,
}
# Apply priority to task
@app.task(priority=PRIORITY_LEVELS['high'])
def urgent_processing():
pass
4. Topic Exchange Routing
Template: templates/topic-exchange.py
Features:
- Topic-based routing patterns
- Wildcard routing keys
- Multi-queue routing
- Pattern matching
Topic Patterns:
from kombu import Exchange, Queue
# Topic exchange setup
task_exchange = Exchange('tasks', type='topic', durable=True)
CELERY_QUEUES = (
# Match specific patterns
Queue('user.notifications', exchange=task_exchange,
routing_key='user.notification.*'),
# Match all email types
Queue('emails', exchange=task_exchange,
routing_key='email.#'),
# Match processing tasks
Queue('processing', exchange=task_exchange,
routing_key='*.processing.*'),
# Match all reports
Queue('reports', exchange=task_exchange,
routing_key='report.*'),
)
# Routing configuration
CELERY_ROUTES = {
'myapp.tasks.send_welcome_email': {
'exchange': 'tasks',
'routing_key': 'email.welcome.send'
},
'myapp.tasks.notify_user': {
'exchange': 'tasks',
'routing_key': 'user.notification.send'
},
}
5. Worker-Specific Routing
Template: templates/worker-routing.py
Features:
- Dedicated worker pools
- Worker-specific queues
- CPU vs I/O task separation
- Geographic routing
- Resource-based routing
Worker Configuration:
# Worker pool definitions
WORKER_POOLS = {
'cpu_intensive': {
'queues': ['ml_training', 'video_processing', 'data_analysis'],
'concurrency': 4,
'prefetch_multiplier': 1,
},
'io_intensive': {
'queues': ['api_calls', 'file_uploads', 'emails'],
'concurrency': 50,
'prefetch_multiplier': 10,
},
'general': {
'queues': ['default', 'background'],
'concurrency': 10,
'prefetch_multiplier': 4,
},
}
# Start workers
# celery -A myapp worker --queues=ml_training,video_processing -c 4 -n cpu_worker@%h
# celery -A myapp worker --queues=api_calls,file_uploads -c 50 -n io_worker@%h
Available Examples
1. Priority Queue Setup Guide
Example: examples/priority-queue-setup.md
Demonstrates:
- Configuring RabbitMQ priority queues
- Setting task priorities
- Priority inheritance patterns
- Testing priority routing
- Monitoring priority queue performance
Key Concepts:
- Priority range: 0 (lowest) to 255 (highest)
- RabbitMQ
x-max-priorityargument - Priority at task definition vs runtime
- Queue argument configuration
2. Topic-Based Routing Implementation
Example: examples/topic-routing.md
Demonstrates:
- Topic exchange setup
- Routing key patterns (* and # wildcards)
- Multi-queue routing
- Pattern matching strategies
- Consumer binding patterns
Routing Key Patterns:
*- Matches exactly one word#- Matches zero or more words- Example:
email.*.sendmatchesemail.welcome.send,email.notification.send - Example:
user.#matchesuser.create,user.update.profile
3. Worker Queue Assignment Strategy
Example: examples/worker-queue-assignment.md
Demonstrates:
- CPU-bound vs I/O-bound task separation
- Worker pool configuration
- Queue-to-worker mapping
- Scaling strategies per worker type
- Resource allocation patterns
Worker Types:
# CPU-intensive workers (low concurrency)
celery -A myapp worker -Q ml_training,video_processing -c 4 -n cpu@%h
# I/O-intensive workers (high concurrency)
celery -A myapp worker -Q api_calls,emails -c 50 -n io@%h
# General purpose workers
celery -A myapp worker -Q default,background -c 10 -n general@%h
Routing Strategies Comparison
1. Direct Exchange (Default)
- Use Case: Simple queue-to-task mapping
- Pros: Simple, predictable, fast
- Cons: Limited flexibility
- Example: Each task type goes to one specific queue
2. Topic Exchange
- Use Case: Pattern-based routing, hierarchical task categories
- Pros: Flexible, supports wildcards, multi-queue routing
- Cons: More complex configuration
- Example:
email.*.sendroutes all email types to email queue
3. Fanout Exchange
- Use Case: Broadcasting tasks to multiple queues
- Pros: Simple broadcast mechanism
- Cons: No routing logic, all queues receive all messages
- Example: Notifications sent to multiple consumer types
4. Headers Exchange
- Use Case: Complex routing based on message headers
- Pros: Very flexible, metadata-based routing
- Cons: Performance overhead, complex configuration
- Example: Route by
priority=highandregion=us-east
Performance Considerations
1. Queue Configuration
- Durable queues: Messages persist across broker restarts (use for critical tasks)
- Transient queues: Faster but messages lost on restart (use for disposable tasks)
- Queue length limits: Prevent memory issues with
x-max-length
2. Prefetch Settings
- CPU-bound tasks: Low prefetch (1-2) to prevent blocking
- I/O-bound tasks: High prefetch (10+) to keep workers busy
- Configure per worker:
celery worker --prefetch-multiplier=4
3. Priority Queue Performance
- RabbitMQ: Native priority support, efficient
- Redis: Priority emulation via separate queues, less efficient
- Trade-off: Priority checking adds overhead, use only when needed
Security Compliance
This skill follows strict security rules:
- All code examples use placeholder values only
- No real broker credentials, passwords, or secrets
- Environment variable references in all code (BROKER_URL, BACKEND_URL)
.gitignoreprotection documented- Broker URLs use placeholder format:
amqp://user:password@localhost:5672//
Never hardcode:
# ❌ WRONG
BROKER_URL = 'amqp://myuser:secretpass123@rabbitmq.example.com:5672//'
# ✅ CORRECT
import os
BROKER_URL = os.environ.get('CELERY_BROKER_URL', 'amqp://guest:guest@localhost:5672//')
Best Practices
- Use Environment-Based Configuration - Different queues for dev/staging/prod
- Separate Critical Tasks - High-priority queue for time-sensitive operations
- Match Worker to Task Type - CPU workers for compute, I/O workers for network/disk
- Monitor Queue Lengths - Alert on queue buildup indicating bottlenecks
- Use Topic Exchanges for Hierarchical Tasks - Cleaner routing than multiple direct exchanges
- Test Routing Configuration - Validate routes before deploying to production
- Document Routing Logic - Especially for complex pattern-based routing
- Use Priority Sparingly - Overuse defeats the purpose and adds overhead
- Configure Prefetch Per Worker Type - Optimize based on task characteristics
- Plan for Scaling - Design routing to allow independent queue scaling
Requirements
- Celery 5.0+
- Message broker (RabbitMQ 3.8+ or Redis 6.0+)
- Python 3.8+
- kombu library (included with Celery)
- Environment variables:
CELERY_BROKER_URL(required)CELERY_RESULT_BACKEND(optional)
- For RabbitMQ priority queues: RabbitMQ 3.5+
- For testing scripts: netcat/telnet for connectivity checks
Progressive Disclosure
For advanced routing patterns, see:
examples/priority-queue-setup.md- Priority queue implementationexamples/topic-routing.md- Topic exchange patternsexamples/worker-queue-assignment.md- Worker pool strategies
Troubleshooting
Queue Not Receiving Tasks
- Check routing configuration matches task name
- Verify queue declaration in CELERY_QUEUES
- Ensure workers are listening to correct queues
- Check broker connectivity with test script
Priority Not Working
- Verify
x-max-priorityset on queue (RabbitMQ only) - Check tasks are setting priority correctly
- Confirm workers consuming from priority queue
- Redis: Implement separate high/low priority queues
Worker Not Processing Tasks
- Verify worker queue list matches routed queues
- Check prefetch_multiplier isn't too low
- Ensure no task failures blocking queue
- Monitor worker logs for errors
Plugin: celery Version: 1.0.0 Last Updated: 2025-11-16