pg-boss
PG Boss Job Queue
PG Boss is a PostgreSQL-based job queue for Node.js. It stores jobs in the database, providing persistence and reliability.
Quick Setup
import { PgBoss } from 'pg-boss';
const boss = new PgBoss({
connectionString: process.env.DATABASE_URL,
schema: 'pgboss',
monitorIntervalSeconds: 30,
});
await boss.start();
Common Patterns
1. Scheduled Jobs (Cron-like)
// v12: queue must exist before schedule/work
await boss.createQueue('my-job', {
retryLimit: 3,
retryDelay: 60,
expireInSeconds: 600,
deleteAfterSeconds: 7 * 24 * 60 * 60,
});
// Schedule job to run every minute
await boss.schedule('my-job', '* * * * *', {}, {
tz: 'UTC',
singletonKey: 'my-job-schedule', // Prevents duplicates on restart
});
// Register handler
await boss.work('my-job', { pollingIntervalSeconds: 10 }, async (jobs) => {
for (const job of jobs) {
// Process job
}
});
2. Queuing Jobs
// Queue a single job
await boss.send('process-user', { userId: '123' });
// Queue with options
await boss.send('send-email', { to: 'user@example.com' }, {
retryLimit: 5,
expireInSeconds: 300,
});
3. Batch Processing with Concurrency
await boss.work(
'batch-job',
{ batchSize: 10 }, // Process 10 jobs at once
async (jobs) => {
await Promise.all(jobs.map(job => processJob(job)));
}
);
Reliability Patterns
Singleton Pattern (Prevent Duplicates)
// Use getPgBoss() singleton instead of creating new instances
let boss: PgBoss | null = null;
export async function getPgBoss(): Promise<PgBoss> {
if (boss) return boss;
boss = new PgBoss({ connectionString: process.env.DATABASE_URL });
await boss.start();
return boss;
}
Watchdog for Auto-Recovery
PG Boss can stop unexpectedly (connection drops, crashes). Add a watchdog:
let boss: PgBoss | null = null;
let watchdogInterval: NodeJS.Timeout | null = null;
// Listen for unexpected stops
boss.on('stopped', () => {
console.error('[PG Boss] Stopped unexpectedly');
boss = null;
// Watchdog will attempt recovery
});
// Watchdog checks every 60s
watchdogInterval = setInterval(async () => {
if (!boss && workerRegistrationFn) {
console.log('[PG Boss] Attempting recovery...');
await workerRegistrationFn();
}
}, 60000);
Health Check Integration
Make health check fail when PG Boss is dead (triggers container restart):
app.get('/health', (c) => {
const pgBossRunning = boss !== null;
if (!pgBossRunning) {
return c.json({ status: 'degraded', pgBossRunning: false }, 503);
}
return c.json({ status: 'ok', pgBossRunning: true });
});
Debugging
Check PG Boss State in Database
-- Check scheduled jobs
SELECT name, cron FROM pgboss.schedule;
-- Check recent jobs
SELECT name, state, created_on, completed_on
FROM pgboss.job
ORDER BY created_on DESC
LIMIT 20;
-- Check job queue size
SELECT name, COUNT(*)
FROM pgboss.job
WHERE state = 'created'
GROUP BY name;
Via Node.js
const boss = await getPgBoss();
// Get queue size
const pending = await boss.getQueueSize('my-job');
// Get job by ID
const job = await boss.getJobById(jobId);
Railway-Specific Debugging
# Check environment variables
railway variables --service api
# Get public DATABASE_URL for direct access
railway variables --service Postgres | grep DATABASE_PUBLIC_URL
# Query database directly
DATABASE_URL="postgresql://..." node -e "
const { Pool } = require('pg');
const pool = new Pool({ connectionString: process.env.DATABASE_URL, ssl: { rejectUnauthorized: false } });
// Run queries
"
Common Issues
| Issue | Cause | Fix |
|---|---|---|
| Jobs not running | PG Boss not started | Check health endpoint, verify DATABASE_URL |
| Duplicate jobs | Missing singletonKey |
Add singletonKey to schedule() call |
| Queue not found | schedule() called before createQueue() |
Call boss.createQueue(name) first |
| Jobs stuck in 'created' | No worker registered | Call boss.work() for the queue |
| Connection errors | Database URL wrong/expired | Check DATABASE_URL, SSL settings |
| Jobs not persisting | Wrong schema | Verify schema: 'pgboss' option |
| Worker over-polling | Wrong option name | Use pollingIntervalSeconds (not newJobCheckIntervalSeconds) |
Next.js build fails (stream/net/tls) |
instrumentation bundles server-only deps | Use /* webpackIgnore: true */ on instrumentation dynamic imports |
Best Practices
- Always use singletonKey for scheduled jobs to prevent duplicates on restart
- Create queues explicitly in PG Boss v10+:
await boss.createQueue('my-job') - Handle the 'stopped' event to detect unexpected shutdowns
- Use a watchdog to auto-recover from crashes (clear old interval before starting a new one)
- Return 503 in health check when PG Boss is dead (triggers container restart)
- Log job errors but don't swallow them - let PG Boss retry
- Use separate workers for different job types (rollover, email, etc.)
- Graceful shutdown: call
boss.stop({ graceful: true, timeout: 30000 })on SIGTERM/SIGINT - Next.js instrumentation: load server-only job modules with
import(/* webpackIgnore: true */ ...)
File Structure Example
apps/api/src/
├── lib/
│ └── pgboss.ts # Singleton, getPgBoss(), stopPgBoss()
├── workers/
│ ├── index.ts # registerAllWorkers()
│ ├── rollover/
│ │ ├── index.ts # registerRolloverWorkers()
│ │ ├── timezone-check.ts
│ │ └── batch-processor.ts
│ └── email/
│ ├── index.ts # registerEmailWorkers()
│ └── send-email.ts
└── index.ts # Calls registerAllWorkers() on startup
Timezone-Aware Scheduling
For jobs that need to run at specific local times (like midnight rollover):
import { toZonedTime } from 'date-fns-tz';
// Check if it's midnight in a timezone
const zonedNow = toZonedTime(new Date(), userTimezone);
const hour = zonedNow.getHours();
const minute = zonedNow.getMinutes();
// 10-minute window for reliability
const isMidnightWindow = hour === 0 && minute <= 10;
Idempotency
Use a log table to prevent duplicate processing:
// Check if already processed
const existing = await db.query.logs.findFirst({
where: and(
eq(logs.timezone, timezone),
eq(logs.date, targetDate)
),
});
if (existing) return; // Already processed
// Process and log
await processJobs();
await db.insert(logs).values({ timezone, date: targetDate, status: 'completed' });
More from blink-new/claude
saas-sidebar
Build a modern, collapsible sidebar for SaaS dashboards following the ChatGPT/Notion design pattern
75seo-article-writing
A comprehensive workflow for creating high-ranking SEO blog articles with keyword research, competitive analysis, AI-generated unique images, and optimized content structure
69kanban-dnd
Build world-class kanban board drag-and-drop with @dnd-kit. Linear-quality UX with proper collision detection, smooth animations, and visual feedback
57datafast
Accelerate adoption of DataFast analytics across any stack by codifying the installation, attribution, event, proxy, and API patterns that drive reliable conversion intelligence
54wysiwyg-editor
Build production-grade WYSIWYG editors using Tiptap v3 with proper markdown-style formatting, instant rendering, and bullet/numbered list support
51team-saas
Build production-grade multi-tenant SaaS applications with team workspaces, member invitation, authentication, and modern UI
51