skills/lobbi-docs/claude/FastAPI Background Tasks

FastAPI Background Tasks

SKILL.md

FastAPI Background Task Processing

This skill provides patterns for background task processing with multiple frameworks: ARQ (recommended for async), Celery, and Dramatiq.

ARQ (Async Redis Queue) - Recommended

Installation

pip install arq

Configuration

# app/workers/config.py
from arq.connections import RedisSettings
from app.config import get_settings

settings = get_settings()

class WorkerSettings:
    redis_settings = RedisSettings(
        host=settings.redis_host,
        port=settings.redis_port,
        password=settings.redis_password,
        database=1  # Separate from cache
    )

    # Job settings
    max_jobs = 10
    job_timeout = 300  # 5 minutes
    keep_result = 3600  # 1 hour
    queue_name = "default"

    # Cron jobs
    cron_jobs = []

Task Definitions

# app/workers/tasks.py
from arq import cron
from typing import Dict, Any
import asyncio

async def send_email(ctx: Dict[str, Any], to: str, subject: str, body: str):
    """Send email asynchronously."""
    email_service = ctx.get("email_service")
    await email_service.send(to=to, subject=subject, body=body)
    return {"status": "sent", "to": to}

async def process_upload(ctx: Dict[str, Any], file_id: str, user_id: str):
    """Process uploaded file (resize, convert, etc.)."""
    storage = ctx.get("storage")
    file_data = await storage.get(file_id)

    # Process file
    processed = await process_file(file_data)

    # Save processed file
    await storage.put(f"processed/{file_id}", processed)

    return {"status": "processed", "file_id": file_id}

async def cleanup_expired(ctx: Dict[str, Any]):
    """Periodic cleanup of expired data."""
    db = ctx.get("db")
    result = await db.delete_expired()
    return {"deleted": result.deleted_count}

# Cron job example
@cron(hour=2, minute=0)  # Run at 2 AM daily
async def daily_report(ctx: Dict[str, Any]):
    """Generate daily report."""
    report_service = ctx.get("report_service")
    await report_service.generate_daily()

Worker Entry Point

# app/workers/main.py
from arq import create_pool
from arq.connections import RedisSettings
from app.workers.config import WorkerSettings
from app.workers.tasks import send_email, process_upload, cleanup_expired, daily_report
from app.infrastructure.database import init_database
from app.services.email import EmailService

async def startup(ctx: Dict[str, Any]):
    """Worker startup - initialize services."""
    await init_database()
    ctx["email_service"] = EmailService()
    ctx["db"] = get_db()

async def shutdown(ctx: Dict[str, Any]):
    """Worker shutdown - cleanup."""
    await close_database()

class WorkerSettings(WorkerSettings):
    functions = [send_email, process_upload, cleanup_expired]
    cron_jobs = [daily_report]
    on_startup = startup
    on_shutdown = shutdown

# Run with: arq app.workers.main.WorkerSettings

Enqueueing Tasks from FastAPI

# app/dependencies.py
from arq import ArqRedis, create_pool
from arq.connections import RedisSettings

async def get_task_queue() -> ArqRedis:
    return await create_pool(RedisSettings())

# app/routes/users.py
from fastapi import Depends
from arq import ArqRedis

@router.post("/users/{user_id}/welcome")
async def send_welcome_email(
    user_id: str,
    queue: ArqRedis = Depends(get_task_queue)
):
    user = await get_user(user_id)

    # Enqueue background task
    job = await queue.enqueue_job(
        "send_email",
        to=user.email,
        subject="Welcome!",
        body="Thanks for signing up."
    )

    return {"job_id": job.job_id, "status": "queued"}

@router.post("/uploads")
async def upload_file(
    file: UploadFile,
    user: User = Depends(get_current_user),
    queue: ArqRedis = Depends(get_task_queue)
):
    # Save file
    file_id = await save_file(file)

    # Enqueue processing
    await queue.enqueue_job(
        "process_upload",
        file_id=file_id,
        user_id=str(user.id),
        _defer_by=5  # Delay 5 seconds
    )

    return {"file_id": file_id, "status": "processing"}

Celery (Battle-Tested)

Configuration

# app/workers/celery_app.py
from celery import Celery
from app.config import get_settings

settings = get_settings()

celery_app = Celery(
    "worker",
    broker=settings.celery_broker_url,
    backend=settings.celery_result_backend,
    include=["app.workers.celery_tasks"]
)

celery_app.conf.update(
    task_serializer="json",
    accept_content=["json"],
    result_serializer="json",
    timezone="UTC",
    enable_utc=True,
    task_track_started=True,
    task_time_limit=300,
    worker_prefetch_multiplier=1,
)

# Periodic tasks (Celery Beat)
celery_app.conf.beat_schedule = {
    "cleanup-every-hour": {
        "task": "app.workers.celery_tasks.cleanup_expired",
        "schedule": 3600.0,
    },
    "daily-report": {
        "task": "app.workers.celery_tasks.generate_daily_report",
        "schedule": crontab(hour=2, minute=0),
    },
}

Celery Tasks

# app/workers/celery_tasks.py
from app.workers.celery_app import celery_app
import asyncio

def run_async(coro):
    """Helper to run async code in sync Celery tasks."""
    loop = asyncio.get_event_loop()
    return loop.run_until_complete(coro)

@celery_app.task(bind=True, max_retries=3)
def send_email(self, to: str, subject: str, body: str):
    try:
        run_async(_send_email_async(to, subject, body))
        return {"status": "sent", "to": to}
    except Exception as exc:
        self.retry(exc=exc, countdown=60)

@celery_app.task
def process_upload(file_id: str, user_id: str):
    run_async(_process_upload_async(file_id, user_id))
    return {"status": "processed", "file_id": file_id}

Dramatiq (Modern Celery Alternative)

Configuration

# app/workers/dramatiq_app.py
import dramatiq
from dramatiq.brokers.redis import RedisBroker
from dramatiq.results import Results
from dramatiq.results.backends import RedisBackend

redis_broker = RedisBroker(url="redis://localhost:6379/0")
result_backend = RedisBackend(url="redis://localhost:6379/1")

redis_broker.add_middleware(Results(backend=result_backend))
dramatiq.set_broker(redis_broker)

Dramatiq Tasks

# app/workers/dramatiq_tasks.py
import dramatiq

@dramatiq.actor(max_retries=3, min_backoff=1000)
def send_email(to: str, subject: str, body: str):
    # Sync implementation
    return {"status": "sent", "to": to}

@dramatiq.actor(time_limit=300000)  # 5 min timeout
def process_upload(file_id: str, user_id: str):
    return {"status": "processed", "file_id": file_id}

FastAPI Built-in Background Tasks

For simple fire-and-forget tasks (no persistence):

from fastapi import BackgroundTasks

async def write_log(message: str):
    with open("log.txt", "a") as f:
        f.write(f"{message}\n")

@router.post("/log")
async def create_log(message: str, background_tasks: BackgroundTasks):
    background_tasks.add_task(write_log, message)
    return {"status": "logged"}

Additional Resources

Reference Files

For detailed patterns:

  • references/arq-advanced.md - ARQ advanced patterns, retries, priorities
  • references/celery-patterns.md - Celery best practices, chains, groups
  • references/monitoring.md - Flower, task monitoring

Example Files

Working examples in examples/:

  • examples/arq_worker.py - Complete ARQ worker
  • examples/celery_app.py - Celery configuration
  • examples/task_service.py - Task enqueueing service
Weekly Installs
0
GitHub Stars
9
First Seen
Jan 1, 1970