queue-job-processor
Queue Job Processor
Build robust background job processing with BullMQ and Redis.
Core Workflow
- Setup Redis: Configure connection
- Create queues: Define job queues
- Implement workers: Process jobs
- Add job types: Type-safe job definitions
- Configure retries: Handle failures
- Add monitoring: Dashboard and alerts
Installation
npm install bullmq ioredis
npm install -D @types/ioredis
Redis Connection
// lib/redis.ts
import IORedis from 'ioredis';
export const redis = new IORedis(process.env.REDIS_URL!, {
maxRetriesPerRequest: null, // Required for BullMQ
enableReadyCheck: false,
});
export const redisSubscriber = new IORedis(process.env.REDIS_URL!, {
maxRetriesPerRequest: null,
enableReadyCheck: false,
});
Queue Setup
Define Job Types
// jobs/types.ts
export interface EmailJobData {
to: string;
subject: string;
template: string;
variables: Record<string, string>;
}
export interface ImageProcessingJobData {
imageId: string;
userId: string;
operations: Array<{
type: 'resize' | 'crop' | 'watermark';
params: Record<string, any>;
}>;
}
export interface ReportJobData {
reportId: string;
userId: string;
type: 'daily' | 'weekly' | 'monthly';
dateRange: {
start: string;
end: string;
};
}
export interface WebhookJobData {
url: string;
payload: Record<string, any>;
headers?: Record<string, string>;
retryCount?: number;
}
export type JobData =
| { type: 'email'; data: EmailJobData }
| { type: 'image-processing'; data: ImageProcessingJobData }
| { type: 'report'; data: ReportJobData }
| { type: 'webhook'; data: WebhookJobData };
Create Queues
// queues/index.ts
import { Queue, QueueOptions } from 'bullmq';
import { redis } from '../lib/redis';
import {
EmailJobData,
ImageProcessingJobData,
ReportJobData,
WebhookJobData,
} from './types';
const defaultOptions: QueueOptions = {
connection: redis,
defaultJobOptions: {
attempts: 3,
backoff: {
type: 'exponential',
delay: 1000,
},
removeOnComplete: {
count: 1000, // Keep last 1000 completed jobs
age: 24 * 3600, // Keep for 24 hours
},
removeOnFail: {
count: 5000, // Keep last 5000 failed jobs
},
},
};
export const emailQueue = new Queue<EmailJobData>('email', defaultOptions);
export const imageQueue = new Queue<ImageProcessingJobData>('image-processing', {
...defaultOptions,
defaultJobOptions: {
...defaultOptions.defaultJobOptions,
attempts: 5,
timeout: 5 * 60 * 1000, // 5 minutes
},
});
export const reportQueue = new Queue<ReportJobData>('reports', {
...defaultOptions,
defaultJobOptions: {
...defaultOptions.defaultJobOptions,
timeout: 30 * 60 * 1000, // 30 minutes
},
});
export const webhookQueue = new Queue<WebhookJobData>('webhooks', {
...defaultOptions,
defaultJobOptions: {
...defaultOptions.defaultJobOptions,
attempts: 5,
backoff: {
type: 'exponential',
delay: 5000,
},
},
});
Workers
Email Worker
// workers/email.worker.ts
import { Worker, Job } from 'bullmq';
import { redis } from '../lib/redis';
import { EmailJobData } from '../jobs/types';
import { sendEmail } from '../lib/email';
const emailWorker = new Worker<EmailJobData>(
'email',
async (job: Job<EmailJobData>) => {
const { to, subject, template, variables } = job.data;
console.log(`Processing email job ${job.id} to ${to}`);
// Update progress
await job.updateProgress(10);
// Render template
const html = await renderTemplate(template, variables);
await job.updateProgress(50);
// Send email
const result = await sendEmail({
to,
subject,
html,
});
await job.updateProgress(100);
return { messageId: result.messageId, sentAt: new Date() };
},
{
connection: redis,
concurrency: 10, // Process 10 emails at a time
limiter: {
max: 100, // Max 100 jobs
duration: 60000, // Per minute
},
}
);
// Event handlers
emailWorker.on('completed', (job, result) => {
console.log(`Email job ${job.id} completed:`, result);
});
emailWorker.on('failed', (job, error) => {
console.error(`Email job ${job?.id} failed:`, error);
});
emailWorker.on('progress', (job, progress) => {
console.log(`Email job ${job.id} progress: ${progress}%`);
});
export { emailWorker };
Image Processing Worker
// workers/image.worker.ts
import { Worker, Job } from 'bullmq';
import { redis } from '../lib/redis';
import { ImageProcessingJobData } from '../jobs/types';
import sharp from 'sharp';
import { S3Client, PutObjectCommand } from '@aws-sdk/client-s3';
const s3 = new S3Client({ region: process.env.AWS_REGION });
const imageWorker = new Worker<ImageProcessingJobData>(
'image-processing',
async (job: Job<ImageProcessingJobData>) => {
const { imageId, userId, operations } = job.data;
console.log(`Processing image ${imageId} for user ${userId}`);
// Download original image
const originalBuffer = await downloadImage(imageId);
let image = sharp(originalBuffer);
// Apply operations
for (let i = 0; i < operations.length; i++) {
const op = operations[i];
switch (op.type) {
case 'resize':
image = image.resize(op.params.width, op.params.height, {
fit: op.params.fit || 'cover',
});
break;
case 'crop':
image = image.extract({
left: op.params.left,
top: op.params.top,
width: op.params.width,
height: op.params.height,
});
break;
case 'watermark':
image = image.composite([
{ input: op.params.watermarkPath, gravity: 'southeast' },
]);
break;
}
await job.updateProgress(((i + 1) / operations.length) * 80);
}
// Convert and upload
const processedBuffer = await image.webp({ quality: 85 }).toBuffer();
const key = `processed/${userId}/${imageId}.webp`;
await s3.send(
new PutObjectCommand({
Bucket: process.env.S3_BUCKET,
Key: key,
Body: processedBuffer,
ContentType: 'image/webp',
})
);
await job.updateProgress(100);
return {
url: `https://${process.env.S3_BUCKET}.s3.amazonaws.com/${key}`,
size: processedBuffer.length,
};
},
{
connection: redis,
concurrency: 5,
}
);
imageWorker.on('failed', async (job, error) => {
// Notify user of failure
if (job) {
await notifyUser(job.data.userId, {
type: 'image-processing-failed',
imageId: job.data.imageId,
error: error.message,
});
}
});
export { imageWorker };
Webhook Worker with Retries
// workers/webhook.worker.ts
import { Worker, Job } from 'bullmq';
import { redis } from '../lib/redis';
import { WebhookJobData } from '../jobs/types';
const webhookWorker = new Worker<WebhookJobData>(
'webhooks',
async (job: Job<WebhookJobData>) => {
const { url, payload, headers = {} } = job.data;
const response = await fetch(url, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-Webhook-Signature': generateSignature(payload),
...headers,
},
body: JSON.stringify(payload),
signal: AbortSignal.timeout(30000), // 30s timeout
});
if (!response.ok) {
// Retry for server errors
if (response.status >= 500) {
throw new Error(`Webhook failed: ${response.status}`);
}
// Don't retry for client errors
return {
success: false,
status: response.status,
message: 'Client error, not retrying',
};
}
return {
success: true,
status: response.status,
};
},
{
connection: redis,
concurrency: 20,
}
);
export { webhookWorker };
Adding Jobs
Service Layer
// services/jobs.service.ts
import { emailQueue, imageQueue, reportQueue, webhookQueue } from '../queues';
import { JobsOptions } from 'bullmq';
export class JobService {
// Send email
static async sendEmail(data: EmailJobData, options?: JobsOptions) {
return emailQueue.add('send-email', data, {
...options,
priority: data.template === 'password-reset' ? 1 : 10,
});
}
// Bulk emails
static async sendBulkEmails(emails: EmailJobData[]) {
const jobs = emails.map((data, index) => ({
name: 'send-email',
data,
opts: {
delay: index * 100, // Stagger by 100ms
},
}));
return emailQueue.addBulk(jobs);
}
// Process image
static async processImage(data: ImageProcessingJobData) {
return imageQueue.add('process', data, {
jobId: `image-${data.imageId}`, // Prevent duplicates
});
}
// Schedule report
static async scheduleReport(data: ReportJobData, runAt: Date) {
return reportQueue.add('generate', data, {
delay: runAt.getTime() - Date.now(),
});
}
// Send webhook
static async sendWebhook(data: WebhookJobData) {
return webhookQueue.add('deliver', data);
}
}
API Usage
// app/api/users/route.ts
import { JobService } from '@/services/jobs.service';
export async function POST(req: Request) {
const data = await req.json();
// Create user
const user = await db.user.create({ data });
// Queue welcome email
await JobService.sendEmail({
to: user.email,
subject: 'Welcome!',
template: 'welcome',
variables: { name: user.name },
});
return Response.json(user);
}
Scheduled Jobs (Cron)
// schedulers/index.ts
import { Queue, QueueScheduler } from 'bullmq';
import { redis } from '../lib/redis';
// Daily report scheduler
export async function setupSchedulers() {
// Clean up old jobs daily
await reportQueue.add(
'cleanup',
{},
{
repeat: {
pattern: '0 0 * * *', // Every day at midnight
},
}
);
// Hourly metrics aggregation
await metricsQueue.add(
'aggregate',
{},
{
repeat: {
pattern: '0 * * * *', // Every hour
},
}
);
// Weekly digest
await emailQueue.add(
'weekly-digest',
{ template: 'weekly-digest' },
{
repeat: {
pattern: '0 9 * * 1', // Every Monday at 9 AM
},
}
);
}
Job Events & Monitoring
Event Listeners
// monitoring/events.ts
import { QueueEvents } from 'bullmq';
import { redis } from '../lib/redis';
const emailQueueEvents = new QueueEvents('email', { connection: redis });
emailQueueEvents.on('completed', ({ jobId, returnvalue }) => {
console.log(`Job ${jobId} completed with:`, returnvalue);
metrics.increment('email.completed');
});
emailQueueEvents.on('failed', ({ jobId, failedReason }) => {
console.error(`Job ${jobId} failed:`, failedReason);
metrics.increment('email.failed');
// Alert on repeated failures
alertOnFailure(jobId, failedReason);
});
emailQueueEvents.on('delayed', ({ jobId, delay }) => {
console.log(`Job ${jobId} delayed by ${delay}ms`);
});
emailQueueEvents.on('progress', ({ jobId, data }) => {
console.log(`Job ${jobId} progress:`, data);
});
emailQueueEvents.on('stalled', ({ jobId }) => {
console.warn(`Job ${jobId} stalled`);
metrics.increment('email.stalled');
});
Bull Board Dashboard
// app/api/admin/queues/route.ts
import { createBullBoard } from '@bull-board/api';
import { BullMQAdapter } from '@bull-board/api/bullMQAdapter';
import { ExpressAdapter } from '@bull-board/express';
import { emailQueue, imageQueue, reportQueue, webhookQueue } from '@/queues';
const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath('/api/admin/queues');
createBullBoard({
queues: [
new BullMQAdapter(emailQueue),
new BullMQAdapter(imageQueue),
new BullMQAdapter(reportQueue),
new BullMQAdapter(webhookQueue),
],
serverAdapter,
});
export const GET = serverAdapter.getRouter();
export const POST = serverAdapter.getRouter();
Error Handling
// workers/base.worker.ts
import { Worker, Job, UnrecoverableError } from 'bullmq';
// Custom error for non-retryable failures
export class NonRetryableError extends UnrecoverableError {
constructor(message: string) {
super(message);
this.name = 'NonRetryableError';
}
}
// Worker with error handling
const worker = new Worker(
'queue-name',
async (job: Job) => {
try {
// Validate input
if (!job.data.requiredField) {
throw new NonRetryableError('Missing required field');
}
// Process job
return await processJob(job.data);
} catch (error) {
if (error instanceof NonRetryableError) {
throw error; // Won't retry
}
// Log and rethrow for retry
console.error(`Job ${job.id} error:`, error);
throw error;
}
},
{
connection: redis,
}
);
// Handle worker errors
worker.on('error', (error) => {
console.error('Worker error:', error);
});
Graceful Shutdown
// server.ts
import { emailWorker, imageWorker, reportWorker } from './workers';
const workers = [emailWorker, imageWorker, reportWorker];
async function gracefulShutdown() {
console.log('Shutting down workers...');
// Close workers gracefully
await Promise.all(
workers.map((worker) =>
worker.close().catch((err) => {
console.error('Error closing worker:', err);
})
)
);
// Close Redis connections
await redis.quit();
await redisSubscriber.quit();
console.log('Workers shut down');
process.exit(0);
}
process.on('SIGTERM', gracefulShutdown);
process.on('SIGINT', gracefulShutdown);
Best Practices
- Idempotent jobs: Jobs should be safe to retry
- Unique job IDs: Prevent duplicate processing
- Set timeouts: Prevent stuck jobs
- Use progress updates: For long-running jobs
- Handle failures gracefully: Alert and log
- Clean up old jobs: Remove completed/failed jobs
- Graceful shutdown: Wait for jobs to complete
- Monitor queues: Use Bull Board or similar
Output Checklist
Every queue implementation should include:
- Redis connection with proper config
- Typed job data interfaces
- Queue with default options
- Worker with concurrency limits
- Retry and backoff configuration
- Event handlers for monitoring
- Error handling (retryable vs non-retryable)
- Graceful shutdown handling
- Bull Board or monitoring dashboard
- Scheduled/recurring jobs (if needed)
More from monkey1sai/openai-cli
multi-tenant-safety-checker
Ensures tenant isolation at query and policy level using Row Level Security, automated testing, and security audits. Prevents data leakage between tenants. Use for "multi-tenancy", "tenant isolation", "RLS", or "data security".
10modal-drawer-system
Implements accessible modals and drawers with focus trap, ESC to close, scroll lock, portal rendering, and ARIA attributes. Includes sample implementations for common use cases like edit forms, confirmations, and detail views. Use when building "modals", "dialogs", "drawers", "sidebars", or "overlays".
10eslint-prettier-config
Configures ESLint and Prettier for consistent code quality with TypeScript, React, and modern best practices. Use when users request "ESLint setup", "Prettier config", "linting configuration", "code formatting", or "lint rules".
9api-security-hardener
Hardens API security with rate limiting, input validation, authentication, and protection against common attacks. Use when users request "API security", "secure API", "rate limiting", "input validation", or "API protection".
9secure-headers-csp-builder
Implements security headers and Content Security Policy with safe rollout strategy (report-only → enforce), testing, and compatibility checks. Use for "security headers", "CSP", "HTTP headers", or "XSS protection".
9security-incident-playbook-generator
Creates response procedures for security incidents with containment steps, communication templates, and evidence collection. Use for "incident response", "security playbook", "breach response", or "IR plan".
9