bullmq
BullMQ
Redis-backed queue system for Node.js. Four core classes: Queue, Worker, QueueEvents, FlowProducer.
Table of Contents
- Install
- Quick Start
- Connections
- Queue
- Worker
- TypeScript Generics
- Events
- Job Lifecycle States
- Advanced Topics
Install
yarn add bullmq — requires Redis 5.0+ with maxmemory-policy=noeviction.
<quick_reference>
Quick Start
import { Queue, Worker, QueueEvents } from "bullmq";
// --- Producer ---
const queue = new Queue("my-queue", {
connection: { host: "localhost", port: 6379 },
});
await queue.add("job-name", { foo: "bar" });
// --- Consumer ---
const worker = new Worker(
"my-queue",
async (job) => {
// process job
await job.updateProgress(50);
return { result: "done" };
},
{ connection: { host: "localhost", port: 6379 } },
);
worker.on("completed", (job, returnvalue) => {
console.log(`${job.id} completed with`, returnvalue);
});
worker.on("failed", (job, err) => {
console.error(`${job.id} failed with`, err.message);
});
// IMPORTANT: always attach an error handler
worker.on("error", (err) => {
console.error(err);
});
// --- Global event listener (all workers) ---
const queueEvents = new QueueEvents("my-queue", {
connection: { host: "localhost", port: 6379 },
});
queueEvents.on("completed", ({ jobId, returnvalue }) => {
console.log(`Job ${jobId} completed`);
});
queueEvents.on("failed", ({ jobId, failedReason }) => {
console.error(`Job ${jobId} failed: ${failedReason}`);
});
Job Lifecycle States
add() → wait / prioritized / delayed
↓
active → completed
↓
failed → (retry) → wait/delayed
With FlowProducer: jobs can also be in waiting-children state until all children complete.
</quick_reference>
Connections
BullMQ uses ioredis internally. Pass connection options or an existing ioredis instance.
import { Queue, Worker } from "bullmq";
import { Redis } from "ioredis";
// Option 1: connection config (new connection per instance)
const queue = new Queue("q", {
connection: { host: "redis.example.com", port: 6379 },
});
// Option 2: reuse ioredis instance (Queue and multiple Queues can share)
const connection = new Redis();
const q1 = new Queue("q1", { connection });
const q2 = new Queue("q2", { connection });
// Option 3: reuse for Workers (BullMQ internally duplicates for blocking)
const workerConn = new Redis({ maxRetriesPerRequest: null });
const w1 = new Worker("q1", async (job) => {}, { connection: workerConn });
Critical rules:
- Workers REQUIRE
maxRetriesPerRequest: nullon the ioredis instance. BullMQ enforces this and will warn/throw if not set. - Do NOT use ioredis
keyPrefixoption — use BullMQ'sprefixoption instead. QueueEventscannot share connections (uses blocking Redis commands).- Redis MUST have
maxmemory-policy=noeviction.
Queue
const queue = new Queue("paint", { connection });
// Add a job
await queue.add("job-name", { color: "red" });
// Add with options
await queue.add(
"job-name",
{ color: "blue" },
{
delay: 5000, // wait 5s before processing
priority: 1, // lower = higher priority (0 is highest, max 2^21)
attempts: 3, // retry up to 3 times
backoff: { type: "exponential", delay: 1000 },
removeOnComplete: true, // or { count: 100 } to keep last 100
removeOnFail: 1000, // keep last 1000 failed jobs
},
);
// Add bulk
await queue.addBulk([
{ name: "job1", data: { x: 1 } },
{ name: "job2", data: { x: 2 }, opts: { priority: 1 } },
]);
// Queue operations
await queue.pause();
await queue.resume();
await queue.obliterate({ force: true }); // remove all data
await queue.close();
Worker
const worker = new Worker<MyData, MyReturn>(
"paint",
async (job) => {
await job.updateProgress(42);
return { cost: 100 };
},
{
connection,
concurrency: 5, // process 5 jobs concurrently
autorun: false, // don't start immediately
},
);
worker.run(); // start when ready
// Update concurrency at runtime
worker.concurrency = 10;
Processor receives 3 args: (job, token?, signal?) — signal is an AbortSignal for cancellation support.
TypeScript Generics
interface JobData {
color: string;
}
interface JobReturn {
cost: number;
}
const queue = new Queue<JobData, JobReturn>("paint");
const worker = new Worker<JobData, JobReturn>("paint", async (job) => {
// job.data is typed as JobData
return { cost: 100 }; // must match JobReturn
});
Events
Worker events (local to that worker instance):
| Event | Callback signature |
|---|---|
completed |
(job, returnvalue) |
failed |
(job | undefined, error, prev) |
progress |
(job, progress: number | object) |
drained |
() — queue is empty |
error |
(error) — MUST attach this handler |
QueueEvents (global, all workers, uses Redis Streams):
| Event | Callback signature |
|---|---|
completed |
({ jobId, returnvalue }) |
failed |
({ jobId, failedReason }) |
progress |
({ jobId, data }) |
waiting |
({ jobId }) |
active |
({ jobId, prev }) |
delayed |
({ jobId, delay }) |
deduplicated |
({ jobId, deduplicationId, deduplicatedJobId }) |
Event stream is auto-trimmed (~10,000 events). Configure via streams.events.maxLen.
Advanced Topics
- Job types and options (delayed, prioritized, deduplication, repeatable): See references/job-types-and-options.md
- Flows and schedulers (FlowProducer, parent-child, job schedulers, cron): See references/flows-and-schedulers.md
- Patterns (step jobs, idempotent, throttle, manual rate-limit): See references/patterns.md
- Production (shutdown, Redis config, retries, backoff, monitoring): See references/production.md
More from jgamaraalv/ts-dev-kit
ioredis
ioredis v5 reference for Node.js Redis client — connection setup, RedisOptions, pipelines, transactions, Pub/Sub, Lua scripting, Cluster, and Sentinel. Use when: (1) creating or configuring Redis connections (standalone, cluster, sentinel), (2) writing Redis commands with ioredis (get/set, pipelines, multi/exec), (3) setting up Pub/Sub or Streams, (4) configuring retryStrategy, TLS, or auto-pipelining, (5) working with Redis Cluster options (scaleReads, NAT mapping), or (6) debugging ioredis connection issues. Important: use named import `import { Redis } from 'ioredis'` for correct TypeScript types with NodeNext.
35nextjs-best-practices
Next.js App Router best practices — file conventions, RSC boundaries, data patterns, async APIs, metadata, error handling, route handlers, image/font optimization, bundling. Use when writing, reviewing, or debugging Next.js App Router code.
29ui-ux-guidelines
Review UI code for Web Interface Guidelines compliance. Use when asked to review UI, check accessibility, audit design, review UX, or check against best practices.
26service-worker
Service Worker API implementation guide — registration, lifecycle management, caching strategies, push notifications, and background sync. Use when: (1) creating or modifying service worker files (sw.js), (2) implementing offline-first caching (cache-first, network-first, stale-while-revalidate), (3) setting up push notifications or background sync, (4) debugging service worker registration, scope, or update issues, (5) implementing navigation preload, (6) user mentions 'service worker', 'sw.js', 'offline support', 'cache strategy', 'push notification', 'background sync', 'workbox alternative', or 'PWA caching'.
25react-best-practices
React and Next.js performance patterns. Use when writing, reviewing, or optimizing React components.
21typescript-conventions
TypeScript coding conventions for strict, type-safe projects. Use when: (1) writing or reviewing TypeScript code, (2) choosing between `any` and `unknown`, (3) structuring imports with verbatimModuleSyntax, (4) defining types, interfaces, unions, or generics, (5) naming functions, booleans, queries, and commands, (6) handling errors with guard clauses and early returns, (7) narrowing types with guards and discriminated unions, or (8) avoiding common anti-patterns like primitive obsession, magic strings, and premature abstraction.
21