task-patterns
Task Patterns Skill
Production-ready Celery task templates with comprehensive error handling, retry mechanisms, rate limiting, and custom behavior patterns.
Overview
This skill provides battle-tested templates and patterns for building robust Celery tasks. Each template demonstrates production-ready code with proper error handling, logging, retry logic, and security best practices.
Core Patterns
1. Basic Task Template
Template: templates/basic-task.py
Simple task with standard error handling and logging.
Use when:
- Creating your first Celery task
- Need straightforward task execution
- No complex retry or limiting requirements
Features:
- Proper logging setup
- Basic error handling
- Synchronous and asynchronous execution examples
- Clear docstrings
2. Retry Task Template
Template: templates/retry-task.py
Tasks with automatic retry mechanisms and exponential backoff.
Use when:
- Calling external APIs that may fail transiently
- Database operations that could timeout
- Network operations prone to connection issues
Features:
- Automatic retry with
autoretry_for - Exponential backoff with jitter
- Manual retry control
- Fixed delay retries
- Configurable max retries
Example: See examples/task-with-retries.md for complete implementation guide
3. Rate Limited Task Template
Template: templates/rate-limited-task.py
Tasks with rate limiting to control execution speed.
Use when:
- Respecting external API rate limits
- Protecting database from overload
- Controlling resource consumption
- Meeting SLA requirements
Features:
- Per-second, per-minute, per-hour limits
- Batch processing with rate control
- Large dataset handling
- Dynamic rate limit adjustment
Example: See examples/rate-limiting.md for complete guide
4. Time Limited Task Template
Template: templates/time-limited-task.py
Tasks with soft and hard time limits to prevent runaway execution.
Use when:
- Long-running operations that could hang
- Operations with external dependencies
- Tasks that must complete within timeframe
- Preventing resource exhaustion
Features:
- Soft time limit (catchable)
- Hard time limit (force kill)
- Graceful timeout handling
- Progress saving on timeout
- Combined with retry logic
5. Custom Task Class Template
Template: templates/custom-task-class.py
Custom task base classes with specialized behavior.
Use when:
- Need database connection pooling
- Want to cache task results
- Require metrics tracking
- Need shared resources across tasks
- Implementing lifecycle hooks
Features:
- Database connection pooling
- Automatic result caching
- Metrics and monitoring
- Resource pool management
- Lifecycle hooks (before_start, on_success, on_failure, on_retry, after_return)
Example: See examples/custom-task-classes.md for comprehensive guide
6. Pydantic Validation Template
Template: templates/pydantic-validation.py
Type-safe tasks with Pydantic model validation.
Use when:
- Need strict input validation
- Want type safety
- Complex nested data structures
- API contract enforcement
Features:
- Pydantic models for input validation
- Enum types for constrained values
- Custom validators
- Standardized result format
- Comprehensive error messages
7. Database Task Template
Template: templates/database-task.py
Best practices for database operations in tasks.
Use when:
- Performing database queries
- Bulk database operations
- Transactional operations
- Database migrations
Features:
- Connection pooling
- Parameterized queries (SQL injection prevention)
- Transaction management
- Bulk insert operations
- Pagination support
- Aggregation queries
8. API Task Template
Template: templates/api-task.py
Best practices for external API calls in tasks.
Use when:
- Calling external APIs
- Webhook delivery
- Paginated API fetching
- Batch API calls
- API authentication
Features:
- Automatic retry on connection errors
- Rate limiting for API quotas
- Timeout handling
- Authentication patterns (Bearer, API key)
- Pagination support
- Batch processing
Scripts
generate-task.sh
Usage: ./scripts/generate-task.sh <template_name> <output_file> [task_name]
Generate new Celery task from template with customizations.
Available templates:
- basic-task
- retry-task
- rate-limited-task
- time-limited-task
- custom-task-class
- pydantic-validation
- database-task
- api-task
Example:
./scripts/generate-task.sh retry-task tasks/my_api_call.py fetch_data
test-task.sh
Usage: ./scripts/test-task.sh <task_file.py> [task_name]
Test Celery task by:
- Validating Python syntax
- Checking imports
- Verifying task structure
- Security scanning
- Best practices check
- Optionally running task
Example:
./scripts/test-task.sh templates/retry-task.py fetch_api_data
validate-task.sh
Usage: ./scripts/validate-task.sh <task_file.py>
Comprehensive validation including:
- Required elements (Celery import, app initialization, decorators)
- Best practices (docstrings, type hints, error handling, logging)
- Retry configuration
- Security checks (no hardcoded credentials, SQL injection)
- Performance checks (rate limits, time limits)
- Code quality (examples, configuration)
- Pattern detection
Example:
./scripts/validate-task.sh templates/api-task.py
Usage Examples
Example 1: Create API Task with Retries
# Use retry-task.py template
from templates.retry_task import fetch_api_data
# Queue task
result = fetch_api_data.delay('https://api.example.com/data')
# Get result
data = result.get(timeout=60)
print(data)
Example 2: Rate-Limited Batch Processing
# Use rate-limited-task.py template
from templates.rate_limited_task import api_call_rate_limited
# Process 100 items at 10/minute rate
for i in range(100):
api_call_rate_limited.delay(f'/endpoint/{i}')
# Celery automatically enforces rate limit
Example 3: Database Operations with Connection Pooling
# Use database-task.py template
from templates.database_task import insert_record, bulk_insert
# Single insert
result1 = insert_record.delay('users', {
'name': 'John Doe',
'email': 'john@example.com'
})
# Bulk insert
records = [
{'name': 'Jane', 'email': 'jane@example.com'},
{'name': 'Bob', 'email': 'bob@example.com'},
]
result2 = bulk_insert.delay('users', records)
Example 4: Custom Task with Metrics
# Use custom-task-class.py template
from templates.custom_task_class import monitored_task
# Automatic metrics tracking
result = monitored_task.delay(123)
# Metrics logged automatically:
# - Start time
# - Duration
# - Success/failure
# - Arguments
Example 5: Type-Safe Task with Pydantic
# Use pydantic-validation.py template
from templates.pydantic_validation import process_user
# Valid data
user_data = {
'user_id': 123,
'email': 'user@example.com',
'username': 'john_doe',
'age': 30,
'tags': ['premium']
}
result = process_user.delay(user_data)
# Invalid data returns structured error
invalid_data = {
'user_id': -1, # Invalid
'email': 'not-email', # Invalid
}
result = process_user.delay(invalid_data)
# Returns: {'status': 'error', 'errors': [...]}
Best Practices
1. Always Use Placeholders for Credentials
# ✅ CORRECT
api_key = os.getenv('API_KEY', 'your_api_key_here')
# ❌ WRONG
api_key = 'sk-abc123xyz456'
2. Set Appropriate Timeouts
# ✅ CORRECT
response = requests.get(url, timeout=30)
# ❌ WRONG
response = requests.get(url) # Can hang forever
3. Use Parameterized Queries
# ✅ CORRECT
db.execute("SELECT * FROM users WHERE id = %s", (user_id,))
# ❌ WRONG
db.execute(f"SELECT * FROM users WHERE id = {user_id}")
4. Combine Retry with Rate Limiting
@app.task(
bind=True,
autoretry_for=(RequestException,),
retry_backoff=True,
rate_limit='10/m'
)
def robust_api_call(self, url: str):
"""Retries on failure, respects rate limits."""
pass
5. Log All Important Events
@app.task(bind=True)
def well_logged_task(self, item_id: int):
logger.info(f"Starting task for item {item_id}")
try:
result = process(item_id)
logger.info(f"Successfully processed {item_id}")
return result
except Exception as exc:
logger.error(f"Failed to process {item_id}: {exc}")
raise
Security Compliance
This skill follows strict security rules:
- All templates use placeholders for credentials
- No real API keys, passwords, or secrets
- Environment variable references in all code
- Parameterized queries to prevent SQL injection
- Security validation in test scripts
Pattern Selection Guide
Choose basic-task.py when:
- Simple, straightforward operations
- No external dependencies
- Low failure risk
Choose retry-task.py when:
- External API calls
- Network operations
- Transient failures expected
Choose rate-limited-task.py when:
- API rate limits exist
- Need to control execution speed
- Protecting resources from overload
Choose time-limited-task.py when:
- Long-running operations
- Risk of hanging
- Need guaranteed completion time
Choose custom-task-class.py when:
- Need resource pooling
- Want lifecycle hooks
- Shared logic across tasks
- Metrics tracking required
Choose pydantic-validation.py when:
- Complex input validation needed
- Type safety important
- API contract enforcement
- Clear error messages required
Choose database-task.py when:
- Database operations
- Need connection pooling
- Transactional operations
- Bulk processing
Choose api-task.py when:
- External API integration
- Webhook delivery
- Authentication required
- Pagination needed
Testing Tasks
Test Individual Task
# Validate task structure
./scripts/validate-task.sh templates/retry-task.py
# Test task execution
./scripts/test-task.sh templates/retry-task.py fetch_api_data
Run with Celery Worker
# Start worker
celery -A tasks worker --loglevel=info
# Execute tasks
python3 templates/retry-task.py
Common Patterns
Pattern: Retry with Backoff
@app.task(
bind=True,
autoretry_for=(RequestException,),
retry_backoff=True,
retry_jitter=True,
max_retries=5
)
Pattern: Rate + Time Limits
@app.task(
rate_limit='10/m',
soft_time_limit=60,
time_limit=120
)
Pattern: Validation + Retry
@app.task(
bind=True,
autoretry_for=(ValidationError, RequestException),
retry_backoff=True,
max_retries=3
)
def validated_api_task(self, request: dict):
# Validate with Pydantic
req = ApiRequest(**request)
# Make API call with retry
return make_call(req.url)
Quick Reference
Task Decorator Options
@app.task(
bind=True, # Access self (required for retry)
base=CustomTask, # Custom task class
autoretry_for=(Exception,), # Exceptions to retry
retry_backoff=True, # Exponential backoff
retry_backoff_max=600, # Max backoff seconds
retry_jitter=True, # Add randomness
max_retries=5, # Max attempts
default_retry_delay=10, # Default delay
rate_limit='10/m', # Rate limit
soft_time_limit=60, # Soft timeout (catchable)
time_limit=120, # Hard timeout (kills task)
ignore_result=False, # Store result
priority=5 # Task priority (0-9)
)
Retry Syntax
# Automatic
autoretry_for=(RequestException, Timeout)
# Manual
raise self.retry(exc=exc, countdown=60)
Rate Limit Syntax
rate_limit='10/s' # 10 per second
rate_limit='100/m' # 100 per minute
rate_limit='1000/h' # 1000 per hour
References
- Celery Tasks Documentation
examples/task-with-retries.md- Complete retry guideexamples/rate-limiting.md- Complete rate limiting guideexamples/custom-task-classes.md- Custom class guide
Validation
Always run validation before committing:
./scripts/validate-task.sh your-task.py
Validation checks:
- ✅ Required imports and setup
- ✅ Task decorators present
- ✅ Docstrings and type hints
- ✅ Error handling
- ✅ Security (no hardcoded credentials)
- ✅ Best practices compliance