django-celery
SKILL.md
Django + Celery
You are a Celery and async task expert. Your goal is to help set up reliable background processing.
Initial Assessment
Check for project context first:
If .agents/django-project-context.md exists, read it for the existing broker (Redis/RabbitMQ), cache backend, and deployment setup.
Installation & Setup
pip install celery redis django-celery-results django-celery-beat
Celery Configuration
# config/celery.py
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings.base')
app = Celery('myproject')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
# config/__init__.py
from .celery import app as celery_app
__all__ = ('celery_app',)
Settings
# settings.py
CELERY_BROKER_URL = env('CELERY_BROKER_URL', default='redis://localhost:6379/0')
CELERY_RESULT_BACKEND = env('CELERY_RESULT_BACKEND', default='redis://localhost:6379/0')
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TIMEZONE = TIME_ZONE
# Store task results in Django DB (optional — requires django-celery-results)
CELERY_RESULT_BACKEND = 'django-db'
INSTALLED_APPS += ['django_celery_results']
# Periodic tasks via django-celery-beat
INSTALLED_APPS += ['django_celery_beat']
Writing Tasks
Basic Task
# articles/tasks.py
from celery import shared_task
from django.core.mail import send_mail
@shared_task
def send_welcome_email(user_id):
from django.contrib.auth import get_user_model
User = get_user_model()
user = User.objects.get(pk=user_id)
send_mail(
subject='Welcome!',
message=f'Hi {user.first_name}, welcome aboard.',
from_email='hello@example.com',
recipient_list=[user.email],
)
Call it from a view:
# Never pass model instances to tasks — pass PKs
send_welcome_email.delay(user.pk)
Task with Retry
@shared_task(
bind=True,
max_retries=3,
default_retry_delay=60, # seconds
)
def sync_to_external_api(self, record_id):
try:
result = call_external_api(record_id)
return result
except ExternalAPIError as exc:
raise self.retry(exc=exc, countdown=2 ** self.request.retries * 60)
Task with Configuration
@shared_task(
name='articles.generate_thumbnail', # Explicit name (avoids rename issues)
queue='images', # Route to specific queue
time_limit=300, # Hard time limit (seconds)
soft_time_limit=240, # Soft limit (raises SoftTimeLimitExceeded)
acks_late=True, # Acknowledge after completion (safer)
reject_on_worker_lost=True,
)
def generate_thumbnail(image_id):
...
Task Patterns
Fan-out (Parallel Processing)
from celery import group
@shared_task
def process_all_articles():
ids = list(Article.objects.values_list('pk', flat=True))
job = group(process_article.s(pk) for pk in ids)
job.apply_async()
Chain (Sequential Steps)
from celery import chain
pipeline = chain(
fetch_data.s(source_url),
process_data.s(),
save_results.s(),
)
pipeline.delay()
Chord (Fan-out + Callback)
from celery import chord
callback = aggregate_results.s()
job = chord(
(process_chunk.s(chunk) for chunk in chunks),
callback
)
job.delay()
Periodic Tasks (Celery Beat)
Option A: Settings-Based Schedule
# settings.py
from celery.schedules import crontab
CELERY_BEAT_SCHEDULE = {
'send-daily-digest': {
'task': 'articles.tasks.send_daily_digest',
'schedule': crontab(hour=8, minute=0), # Every day at 8 AM
},
'cleanup-old-sessions': {
'task': 'accounts.tasks.cleanup_sessions',
'schedule': crontab(hour=2, day_of_week=0), # Sundays at 2 AM
},
'sync-every-5-minutes': {
'task': 'integrations.tasks.sync_data',
'schedule': 300.0, # Every 5 minutes
},
}
Option B: Database-Backed (django-celery-beat)
Manage schedules via Django admin at /admin/django_celery_beat/.
Running Workers
# Development
celery -A config worker --loglevel=info
# With Beat scheduler
celery -A config worker --loglevel=info &
celery -A config beat --loglevel=info --scheduler django_celery_beat.schedulers:DatabaseScheduler
# Named queues
celery -A config worker -Q default,emails,images --loglevel=info
Docker Compose Setup
# docker-compose.yml
services:
redis:
image: redis:7-alpine
worker:
build: .
command: celery -A config worker --loglevel=info -Q default
environment:
- CELERY_BROKER_URL=redis://redis:6379/0
depends_on: [redis, db]
beat:
build: .
command: celery -A config beat --loglevel=info --scheduler django_celery_beat.schedulers:DatabaseScheduler
depends_on: [redis, db]
Best Practices
- Never pass model instances to tasks — always pass PKs and fetch inside the task
- Make tasks idempotent — safe to run twice if something fails
- Keep tasks small — one clear responsibility per task
- Use
acks_late=Truefor critical tasks (don't lose them on worker crash) - Set time limits on all tasks to prevent runaway processes
- Use
bind=Truewhen you needself(for retries, task ID, etc.) - Log inside tasks for observability
Monitoring
# Flower (web UI for Celery monitoring)
pip install flower
celery -A config flower --port=5555
Related Skills
- django-deployment: Docker and supervisor setup for workers in production
- django-tests: Testing Celery tasks synchronously in tests
- django-debug: Logging and monitoring task execution
Weekly Installs
2
Repository
ristemingov/dja…de-setupFirst Seen
5 days ago
Security Audits
Installed on
amp2
cline2
opencode2
cursor2
kimi-cli2
codex2