django-celery

SKILL.md

Django + Celery

You are a Celery and async task expert. Your goal is to help set up reliable background processing.

Initial Assessment

Check for project context first: If .agents/django-project-context.md exists, read it for the existing broker (Redis/RabbitMQ), cache backend, and deployment setup.


Installation & Setup

pip install celery redis django-celery-results django-celery-beat

Celery Configuration

# config/celery.py
import os
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings.base')

app = Celery('myproject')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
# config/__init__.py
from .celery import app as celery_app
__all__ = ('celery_app',)

Settings

# settings.py
CELERY_BROKER_URL = env('CELERY_BROKER_URL', default='redis://localhost:6379/0')
CELERY_RESULT_BACKEND = env('CELERY_RESULT_BACKEND', default='redis://localhost:6379/0')

CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TIMEZONE = TIME_ZONE

# Store task results in Django DB (optional — requires django-celery-results)
CELERY_RESULT_BACKEND = 'django-db'
INSTALLED_APPS += ['django_celery_results']

# Periodic tasks via django-celery-beat
INSTALLED_APPS += ['django_celery_beat']

Writing Tasks

Basic Task

# articles/tasks.py
from celery import shared_task
from django.core.mail import send_mail

@shared_task
def send_welcome_email(user_id):
    from django.contrib.auth import get_user_model
    User = get_user_model()
    user = User.objects.get(pk=user_id)
    send_mail(
        subject='Welcome!',
        message=f'Hi {user.first_name}, welcome aboard.',
        from_email='hello@example.com',
        recipient_list=[user.email],
    )

Call it from a view:

# Never pass model instances to tasks — pass PKs
send_welcome_email.delay(user.pk)

Task with Retry

@shared_task(
    bind=True,
    max_retries=3,
    default_retry_delay=60,  # seconds
)
def sync_to_external_api(self, record_id):
    try:
        result = call_external_api(record_id)
        return result
    except ExternalAPIError as exc:
        raise self.retry(exc=exc, countdown=2 ** self.request.retries * 60)

Task with Configuration

@shared_task(
    name='articles.generate_thumbnail',   # Explicit name (avoids rename issues)
    queue='images',                        # Route to specific queue
    time_limit=300,                        # Hard time limit (seconds)
    soft_time_limit=240,                   # Soft limit (raises SoftTimeLimitExceeded)
    acks_late=True,                        # Acknowledge after completion (safer)
    reject_on_worker_lost=True,
)
def generate_thumbnail(image_id):
    ...

Task Patterns

Fan-out (Parallel Processing)

from celery import group

@shared_task
def process_all_articles():
    ids = list(Article.objects.values_list('pk', flat=True))
    job = group(process_article.s(pk) for pk in ids)
    job.apply_async()

Chain (Sequential Steps)

from celery import chain

pipeline = chain(
    fetch_data.s(source_url),
    process_data.s(),
    save_results.s(),
)
pipeline.delay()

Chord (Fan-out + Callback)

from celery import chord

callback = aggregate_results.s()
job = chord(
    (process_chunk.s(chunk) for chunk in chunks),
    callback
)
job.delay()

Periodic Tasks (Celery Beat)

Option A: Settings-Based Schedule

# settings.py
from celery.schedules import crontab

CELERY_BEAT_SCHEDULE = {
    'send-daily-digest': {
        'task': 'articles.tasks.send_daily_digest',
        'schedule': crontab(hour=8, minute=0),  # Every day at 8 AM
    },
    'cleanup-old-sessions': {
        'task': 'accounts.tasks.cleanup_sessions',
        'schedule': crontab(hour=2, day_of_week=0),  # Sundays at 2 AM
    },
    'sync-every-5-minutes': {
        'task': 'integrations.tasks.sync_data',
        'schedule': 300.0,  # Every 5 minutes
    },
}

Option B: Database-Backed (django-celery-beat)

Manage schedules via Django admin at /admin/django_celery_beat/.


Running Workers

# Development
celery -A config worker --loglevel=info

# With Beat scheduler
celery -A config worker --loglevel=info &
celery -A config beat --loglevel=info --scheduler django_celery_beat.schedulers:DatabaseScheduler

# Named queues
celery -A config worker -Q default,emails,images --loglevel=info

Docker Compose Setup

# docker-compose.yml
services:
  redis:
    image: redis:7-alpine

  worker:
    build: .
    command: celery -A config worker --loglevel=info -Q default
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
    depends_on: [redis, db]

  beat:
    build: .
    command: celery -A config beat --loglevel=info --scheduler django_celery_beat.schedulers:DatabaseScheduler
    depends_on: [redis, db]

Best Practices

  • Never pass model instances to tasks — always pass PKs and fetch inside the task
  • Make tasks idempotent — safe to run twice if something fails
  • Keep tasks small — one clear responsibility per task
  • Use acks_late=True for critical tasks (don't lose them on worker crash)
  • Set time limits on all tasks to prevent runaway processes
  • Use bind=True when you need self (for retries, task ID, etc.)
  • Log inside tasks for observability

Monitoring

# Flower (web UI for Celery monitoring)
pip install flower
celery -A config flower --port=5555

Related Skills

  • django-deployment: Docker and supervisor setup for workers in production
  • django-tests: Testing Celery tasks synchronously in tests
  • django-debug: Logging and monitoring task execution
Weekly Installs
2
First Seen
5 days ago
Installed on
amp2
cline2
opencode2
cursor2
kimi-cli2
codex2