FastAPI Background Tasks
SKILL.md
FastAPI Background Task Processing
This skill provides patterns for background task processing with multiple frameworks: ARQ (recommended for async), Celery, and Dramatiq.
ARQ (Async Redis Queue) - Recommended
Installation
pip install arq
Configuration
# app/workers/config.py
from arq.connections import RedisSettings
from app.config import get_settings
settings = get_settings()
class WorkerSettings:
redis_settings = RedisSettings(
host=settings.redis_host,
port=settings.redis_port,
password=settings.redis_password,
database=1 # Separate from cache
)
# Job settings
max_jobs = 10
job_timeout = 300 # 5 minutes
keep_result = 3600 # 1 hour
queue_name = "default"
# Cron jobs
cron_jobs = []
Task Definitions
# app/workers/tasks.py
from arq import cron
from typing import Dict, Any
import asyncio
async def send_email(ctx: Dict[str, Any], to: str, subject: str, body: str):
"""Send email asynchronously."""
email_service = ctx.get("email_service")
await email_service.send(to=to, subject=subject, body=body)
return {"status": "sent", "to": to}
async def process_upload(ctx: Dict[str, Any], file_id: str, user_id: str):
"""Process uploaded file (resize, convert, etc.)."""
storage = ctx.get("storage")
file_data = await storage.get(file_id)
# Process file
processed = await process_file(file_data)
# Save processed file
await storage.put(f"processed/{file_id}", processed)
return {"status": "processed", "file_id": file_id}
async def cleanup_expired(ctx: Dict[str, Any]):
"""Periodic cleanup of expired data."""
db = ctx.get("db")
result = await db.delete_expired()
return {"deleted": result.deleted_count}
# Cron job example
@cron(hour=2, minute=0) # Run at 2 AM daily
async def daily_report(ctx: Dict[str, Any]):
"""Generate daily report."""
report_service = ctx.get("report_service")
await report_service.generate_daily()
Worker Entry Point
# app/workers/main.py
from arq import create_pool
from arq.connections import RedisSettings
from app.workers.config import WorkerSettings
from app.workers.tasks import send_email, process_upload, cleanup_expired, daily_report
from app.infrastructure.database import init_database
from app.services.email import EmailService
async def startup(ctx: Dict[str, Any]):
"""Worker startup - initialize services."""
await init_database()
ctx["email_service"] = EmailService()
ctx["db"] = get_db()
async def shutdown(ctx: Dict[str, Any]):
"""Worker shutdown - cleanup."""
await close_database()
class WorkerSettings(WorkerSettings):
functions = [send_email, process_upload, cleanup_expired]
cron_jobs = [daily_report]
on_startup = startup
on_shutdown = shutdown
# Run with: arq app.workers.main.WorkerSettings
Enqueueing Tasks from FastAPI
# app/dependencies.py
from arq import ArqRedis, create_pool
from arq.connections import RedisSettings
async def get_task_queue() -> ArqRedis:
return await create_pool(RedisSettings())
# app/routes/users.py
from fastapi import Depends
from arq import ArqRedis
@router.post("/users/{user_id}/welcome")
async def send_welcome_email(
user_id: str,
queue: ArqRedis = Depends(get_task_queue)
):
user = await get_user(user_id)
# Enqueue background task
job = await queue.enqueue_job(
"send_email",
to=user.email,
subject="Welcome!",
body="Thanks for signing up."
)
return {"job_id": job.job_id, "status": "queued"}
@router.post("/uploads")
async def upload_file(
file: UploadFile,
user: User = Depends(get_current_user),
queue: ArqRedis = Depends(get_task_queue)
):
# Save file
file_id = await save_file(file)
# Enqueue processing
await queue.enqueue_job(
"process_upload",
file_id=file_id,
user_id=str(user.id),
_defer_by=5 # Delay 5 seconds
)
return {"file_id": file_id, "status": "processing"}
Celery (Battle-Tested)
Configuration
# app/workers/celery_app.py
from celery import Celery
from app.config import get_settings
settings = get_settings()
celery_app = Celery(
"worker",
broker=settings.celery_broker_url,
backend=settings.celery_result_backend,
include=["app.workers.celery_tasks"]
)
celery_app.conf.update(
task_serializer="json",
accept_content=["json"],
result_serializer="json",
timezone="UTC",
enable_utc=True,
task_track_started=True,
task_time_limit=300,
worker_prefetch_multiplier=1,
)
# Periodic tasks (Celery Beat)
celery_app.conf.beat_schedule = {
"cleanup-every-hour": {
"task": "app.workers.celery_tasks.cleanup_expired",
"schedule": 3600.0,
},
"daily-report": {
"task": "app.workers.celery_tasks.generate_daily_report",
"schedule": crontab(hour=2, minute=0),
},
}
Celery Tasks
# app/workers/celery_tasks.py
from app.workers.celery_app import celery_app
import asyncio
def run_async(coro):
"""Helper to run async code in sync Celery tasks."""
loop = asyncio.get_event_loop()
return loop.run_until_complete(coro)
@celery_app.task(bind=True, max_retries=3)
def send_email(self, to: str, subject: str, body: str):
try:
run_async(_send_email_async(to, subject, body))
return {"status": "sent", "to": to}
except Exception as exc:
self.retry(exc=exc, countdown=60)
@celery_app.task
def process_upload(file_id: str, user_id: str):
run_async(_process_upload_async(file_id, user_id))
return {"status": "processed", "file_id": file_id}
Dramatiq (Modern Celery Alternative)
Configuration
# app/workers/dramatiq_app.py
import dramatiq
from dramatiq.brokers.redis import RedisBroker
from dramatiq.results import Results
from dramatiq.results.backends import RedisBackend
redis_broker = RedisBroker(url="redis://localhost:6379/0")
result_backend = RedisBackend(url="redis://localhost:6379/1")
redis_broker.add_middleware(Results(backend=result_backend))
dramatiq.set_broker(redis_broker)
Dramatiq Tasks
# app/workers/dramatiq_tasks.py
import dramatiq
@dramatiq.actor(max_retries=3, min_backoff=1000)
def send_email(to: str, subject: str, body: str):
# Sync implementation
return {"status": "sent", "to": to}
@dramatiq.actor(time_limit=300000) # 5 min timeout
def process_upload(file_id: str, user_id: str):
return {"status": "processed", "file_id": file_id}
FastAPI Built-in Background Tasks
For simple fire-and-forget tasks (no persistence):
from fastapi import BackgroundTasks
async def write_log(message: str):
with open("log.txt", "a") as f:
f.write(f"{message}\n")
@router.post("/log")
async def create_log(message: str, background_tasks: BackgroundTasks):
background_tasks.add_task(write_log, message)
return {"status": "logged"}
Additional Resources
Reference Files
For detailed patterns:
references/arq-advanced.md- ARQ advanced patterns, retries, prioritiesreferences/celery-patterns.md- Celery best practices, chains, groupsreferences/monitoring.md- Flower, task monitoring
Example Files
Working examples in examples/:
examples/arq_worker.py- Complete ARQ workerexamples/celery_app.py- Celery configurationexamples/task_service.py- Task enqueueing service