queue-job-processor
SKILL.md
Queue Job Processor
Build robust background job processing with BullMQ and Redis.
Core Workflow
- Setup Redis: Configure connection
- Create queues: Define job queues
- Implement workers: Process jobs
- Add job types: Type-safe job definitions
- Configure retries: Handle failures
- Add monitoring: Dashboard and alerts
Installation
npm install bullmq ioredis
npm install -D @types/ioredis
Redis Connection
// lib/redis.ts
import IORedis from 'ioredis';
export const redis = new IORedis(process.env.REDIS_URL!, {
maxRetriesPerRequest: null, // Required for BullMQ
enableReadyCheck: false,
});
export const redisSubscriber = new IORedis(process.env.REDIS_URL!, {
maxRetriesPerRequest: null,
enableReadyCheck: false,
});
Queue Setup
Define Job Types
// jobs/types.ts
export interface EmailJobData {
to: string;
subject: string;
template: string;
variables: Record<string, string>;
}
export interface ImageProcessingJobData {
imageId: string;
userId: string;
operations: Array<{
type: 'resize' | 'crop' | 'watermark';
params: Record<string, any>;
}>;
}
export interface ReportJobData {
reportId: string;
userId: string;
type: 'daily' | 'weekly' | 'monthly';
dateRange: {
start: string;
end: string;
};
}
export interface WebhookJobData {
url: string;
payload: Record<string, any>;
headers?: Record<string, string>;
retryCount?: number;
}
export type JobData =
| { type: 'email'; data: EmailJobData }
| { type: 'image-processing'; data: ImageProcessingJobData }
| { type: 'report'; data: ReportJobData }
| { type: 'webhook'; data: WebhookJobData };
Create Queues
// queues/index.ts
import { Queue, QueueOptions } from 'bullmq';
import { redis } from '../lib/redis';
import {
EmailJobData,
ImageProcessingJobData,
ReportJobData,
WebhookJobData,
} from './types';
const defaultOptions: QueueOptions = {
connection: redis,
defaultJobOptions: {
attempts: 3,
backoff: {
type: 'exponential',
delay: 1000,
},
removeOnComplete: {
count: 1000, // Keep last 1000 completed jobs
age: 24 * 3600, // Keep for 24 hours
},
removeOnFail: {
count: 5000, // Keep last 5000 failed jobs
},
},
};
export const emailQueue = new Queue<EmailJobData>('email', defaultOptions);
export const imageQueue = new Queue<ImageProcessingJobData>('image-processing', {
...defaultOptions,
defaultJobOptions: {
...defaultOptions.defaultJobOptions,
attempts: 5,
timeout: 5 * 60 * 1000, // 5 minutes
},
});
export const reportQueue = new Queue<ReportJobData>('reports', {
...defaultOptions,
defaultJobOptions: {
...defaultOptions.defaultJobOptions,
timeout: 30 * 60 * 1000, // 30 minutes
},
});
export const webhookQueue = new Queue<WebhookJobData>('webhooks', {
...defaultOptions,
defaultJobOptions: {
...defaultOptions.defaultJobOptions,
attempts: 5,
backoff: {
type: 'exponential',
delay: 5000,
},
},
});
Workers
Email Worker
// workers/email.worker.ts
import { Worker, Job } from 'bullmq';
import { redis } from '../lib/redis';
import { EmailJobData } from '../jobs/types';
import { sendEmail } from '../lib/email';
const emailWorker = new Worker<EmailJobData>(
'email',
async (job: Job<EmailJobData>) => {
const { to, subject, template, variables } = job.data;
console.log(`Processing email job ${job.id} to ${to}`);
// Update progress
await job.updateProgress(10);
// Render template
const html = await renderTemplate(template, variables);
await job.updateProgress(50);
// Send email
const result = await sendEmail({
to,
subject,
html,
});
await job.updateProgress(100);
return { messageId: result.messageId, sentAt: new Date() };
},
{
connection: redis,
concurrency: 10, // Process 10 emails at a time
limiter: {
max: 100, // Max 100 jobs
duration: 60000, // Per minute
},
}
);
// Event handlers
emailWorker.on('completed', (job, result) => {
console.log(`Email job ${job.id} completed:`, result);
});
emailWorker.on('failed', (job, error) => {
console.error(`Email job ${job?.id} failed:`, error);
});
emailWorker.on('progress', (job, progress) => {
console.log(`Email job ${job.id} progress: ${progress}%`);
});
export { emailWorker };
Image Processing Worker
// workers/image.worker.ts
import { Worker, Job } from 'bullmq';
import { redis } from '../lib/redis';
import { ImageProcessingJobData } from '../jobs/types';
import sharp from 'sharp';
import { S3Client, PutObjectCommand } from '@aws-sdk/client-s3';
const s3 = new S3Client({ region: process.env.AWS_REGION });
const imageWorker = new Worker<ImageProcessingJobData>(
'image-processing',
async (job: Job<ImageProcessingJobData>) => {
const { imageId, userId, operations } = job.data;
console.log(`Processing image ${imageId} for user ${userId}`);
// Download original image
const originalBuffer = await downloadImage(imageId);
let image = sharp(originalBuffer);
// Apply operations
for (let i = 0; i < operations.length; i++) {
const op = operations[i];
switch (op.type) {
case 'resize':
image = image.resize(op.params.width, op.params.height, {
fit: op.params.fit || 'cover',
});
break;
case 'crop':
image = image.extract({
left: op.params.left,
top: op.params.top,
width: op.params.width,
height: op.params.height,
});
break;
case 'watermark':
image = image.composite([
{ input: op.params.watermarkPath, gravity: 'southeast' },
]);
break;
}
await job.updateProgress(((i + 1) / operations.length) * 80);
}
// Convert and upload
const processedBuffer = await image.webp({ quality: 85 }).toBuffer();
const key = `processed/${userId}/${imageId}.webp`;
await s3.send(
new PutObjectCommand({
Bucket: process.env.S3_BUCKET,
Key: key,
Body: processedBuffer,
ContentType: 'image/webp',
})
);
await job.updateProgress(100);
return {
url: `https://${process.env.S3_BUCKET}.s3.amazonaws.com/${key}`,
size: processedBuffer.length,
};
},
{
connection: redis,
concurrency: 5,
}
);
imageWorker.on('failed', async (job, error) => {
// Notify user of failure
if (job) {
await notifyUser(job.data.userId, {
type: 'image-processing-failed',
imageId: job.data.imageId,
error: error.message,
});
}
});
export { imageWorker };
Webhook Worker with Retries
// workers/webhook.worker.ts
import { Worker, Job } from 'bullmq';
import { redis } from '../lib/redis';
import { WebhookJobData } from '../jobs/types';
const webhookWorker = new Worker<WebhookJobData>(
'webhooks',
async (job: Job<WebhookJobData>) => {
const { url, payload, headers = {} } = job.data;
const response = await fetch(url, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-Webhook-Signature': generateSignature(payload),
...headers,
},
body: JSON.stringify(payload),
signal: AbortSignal.timeout(30000), // 30s timeout
});
if (!response.ok) {
// Retry for server errors
if (response.status >= 500) {
throw new Error(`Webhook failed: ${response.status}`);
}
// Don't retry for client errors
return {
success: false,
status: response.status,
message: 'Client error, not retrying',
};
}
return {
success: true,
status: response.status,
};
},
{
connection: redis,
concurrency: 20,
}
);
export { webhookWorker };
Adding Jobs
Service Layer
// services/jobs.service.ts
import { emailQueue, imageQueue, reportQueue, webhookQueue } from '../queues';
import { JobsOptions } from 'bullmq';
export class JobService {
// Send email
static async sendEmail(data: EmailJobData, options?: JobsOptions) {
return emailQueue.add('send-email', data, {
...options,
priority: data.template === 'password-reset' ? 1 : 10,
});
}
// Bulk emails
static async sendBulkEmails(emails: EmailJobData[]) {
const jobs = emails.map((data, index) => ({
name: 'send-email',
data,
opts: {
delay: index * 100, // Stagger by 100ms
},
}));
return emailQueue.addBulk(jobs);
}
// Process image
static async processImage(data: ImageProcessingJobData) {
return imageQueue.add('process', data, {
jobId: `image-${data.imageId}`, // Prevent duplicates
});
}
// Schedule report
static async scheduleReport(data: ReportJobData, runAt: Date) {
return reportQueue.add('generate', data, {
delay: runAt.getTime() - Date.now(),
});
}
// Send webhook
static async sendWebhook(data: WebhookJobData) {
return webhookQueue.add('deliver', data);
}
}
API Usage
// app/api/users/route.ts
import { JobService } from '@/services/jobs.service';
export async function POST(req: Request) {
const data = await req.json();
// Create user
const user = await db.user.create({ data });
// Queue welcome email
await JobService.sendEmail({
to: user.email,
subject: 'Welcome!',
template: 'welcome',
variables: { name: user.name },
});
return Response.json(user);
}
Scheduled Jobs (Cron)
// schedulers/index.ts
import { Queue, QueueScheduler } from 'bullmq';
import { redis } from '../lib/redis';
// Daily report scheduler
export async function setupSchedulers() {
// Clean up old jobs daily
await reportQueue.add(
'cleanup',
{},
{
repeat: {
pattern: '0 0 * * *', // Every day at midnight
},
}
);
// Hourly metrics aggregation
await metricsQueue.add(
'aggregate',
{},
{
repeat: {
pattern: '0 * * * *', // Every hour
},
}
);
// Weekly digest
await emailQueue.add(
'weekly-digest',
{ template: 'weekly-digest' },
{
repeat: {
pattern: '0 9 * * 1', // Every Monday at 9 AM
},
}
);
}
Job Events & Monitoring
Event Listeners
// monitoring/events.ts
import { QueueEvents } from 'bullmq';
import { redis } from '../lib/redis';
const emailQueueEvents = new QueueEvents('email', { connection: redis });
emailQueueEvents.on('completed', ({ jobId, returnvalue }) => {
console.log(`Job ${jobId} completed with:`, returnvalue);
metrics.increment('email.completed');
});
emailQueueEvents.on('failed', ({ jobId, failedReason }) => {
console.error(`Job ${jobId} failed:`, failedReason);
metrics.increment('email.failed');
// Alert on repeated failures
alertOnFailure(jobId, failedReason);
});
emailQueueEvents.on('delayed', ({ jobId, delay }) => {
console.log(`Job ${jobId} delayed by ${delay}ms`);
});
emailQueueEvents.on('progress', ({ jobId, data }) => {
console.log(`Job ${jobId} progress:`, data);
});
emailQueueEvents.on('stalled', ({ jobId }) => {
console.warn(`Job ${jobId} stalled`);
metrics.increment('email.stalled');
});
Bull Board Dashboard
// app/api/admin/queues/route.ts
import { createBullBoard } from '@bull-board/api';
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
import { ExpressAdapter } from '@bull-board/express';
import { emailQueue, imageQueue, reportQueue, webhookQueue } from '@/queues';
const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath('/api/admin/queues');
createBullBoard({
queues: [
new BullMQAdapter(emailQueue),
new BullMQAdapter(imageQueue),
new BullMQAdapter(reportQueue),
new BullMQAdapter(webhookQueue),
],
serverAdapter,
});
export const GET = serverAdapter.getRouter();
export const POST = serverAdapter.getRouter();
Error Handling
// workers/base.worker.ts
import { Worker, Job, UnrecoverableError } from 'bullmq';
// Custom error for non-retryable failures
export class NonRetryableError extends UnrecoverableError {
constructor(message: string) {
super(message);
this.name = 'NonRetryableError';
}
}
// Worker with error handling
const worker = new Worker(
'queue-name',
async (job: Job) => {
try {
// Validate input
if (!job.data.requiredField) {
throw new NonRetryableError('Missing required field');
}
// Process job
return await processJob(job.data);
} catch (error) {
if (error instanceof NonRetryableError) {
throw error; // Won't retry
}
// Log and rethrow for retry
console.error(`Job ${job.id} error:`, error);
throw error;
}
},
{
connection: redis,
}
);
// Handle worker errors
worker.on('error', (error) => {
console.error('Worker error:', error);
});
Graceful Shutdown
// server.ts
import { emailWorker, imageWorker, reportWorker } from './workers';
const workers = [emailWorker, imageWorker, reportWorker];
async function gracefulShutdown() {
console.log('Shutting down workers...');
// Close workers gracefully
await Promise.all(
workers.map((worker) =>
worker.close().catch((err) => {
console.error('Error closing worker:', err);
})
)
);
// Close Redis connections
await redis.quit();
await redisSubscriber.quit();
console.log('Workers shut down');
process.exit(0);
}
process.on('SIGTERM', gracefulShutdown);
process.on('SIGINT', gracefulShutdown);
Best Practices
- Idempotent jobs: Jobs should be safe to retry
- Unique job IDs: Prevent duplicate processing
- Set timeouts: Prevent stuck jobs
- Use progress updates: For long-running jobs
- Handle failures gracefully: Alert and log
- Clean up old jobs: Remove completed/failed jobs
- Graceful shutdown: Wait for jobs to complete
- Monitor queues: Use Bull Board or similar
Output Checklist
Every queue implementation should include:
- Redis connection with proper config
- Typed job data interfaces
- Queue with default options
- Worker with concurrency limits
- Retry and backoff configuration
- Event handlers for monitoring
- Error handling (retryable vs non-retryable)
- Graceful shutdown handling
- Bull Board or monitoring dashboard
- Scheduled/recurring jobs (if needed)
Weekly Installs
11
Repository
patricio0312rev/skillsFirst Seen
10 days ago
Installed on
claude-code9
gemini-cli8
opencode8
antigravity7
windsurf7
github-copilot7