A Message Queue decouples producers (HTTP request handlers) from consumers (background workers), enabling asynchronous processing of work that does not need to happen within the HTTP request lifecycle: sending emails, generating PDFs, processing images, syncing with external systems, running heavy reports. Bull is the most widely used Redis-backed job queue for Node.js, providing job prioritisation, retry logic, concurrency control, progress reporting, and a dashboard for monitoring queue health. This lesson builds a complete job queue system for the task manager’s background operations.
Bull Queue Concepts
| Concept | Description |
|---|---|
| Queue | Named channel for a type of work — email, imageProcess, report |
| Job | A unit of work with a payload, options, and lifecycle state |
| Producer | Adds jobs to the queue — typically an Express route handler |
| Worker | Processes jobs from the queue — a function that does the actual work |
| Concurrency | How many jobs a worker processes simultaneously |
| Retry | Automatically re-queue failed jobs with backoff strategy |
| Priority | Lower number = higher priority; high-priority jobs jump the queue |
| Delay | Schedule a job to run after a delay — useful for reminders |
| Repeatable | Cron-like scheduled jobs that repeat on a schedule |
waiting or active state at the moment of a process crash are automatically recovered when the process restarts. This durability is why Redis-backed queues are preferred over in-process queues (like simple arrays) for production use.if (await EmailLog.exists({ jobId: job.id })) return;. This prevents duplicate emails, duplicate database records, and duplicate charges in payment processing.Complete Bull Queue Implementation
// ── src/queues/index.js — queue definitions ───────────────────────────────
const Bull = require('bull');
const redisConfig = { url: process.env.REDIS_URL || 'redis://localhost:6379' };
const emailQueue = new Bull('email', { redis: redisConfig, defaultJobOptions: {
attempts: 3,
backoff: { type: 'exponential', delay: 2000 },
removeOnComplete: 100, // keep last 100 completed jobs
removeOnFail: 50, // keep last 50 failed jobs
}});
const reportQueue = new Bull('report', { redis: redisConfig, defaultJobOptions: {
attempts: 2,
timeout: 120_000, // 2-minute timeout for heavy reports
removeOnComplete: 20,
}});
const notifyQueue = new Bull('notify', { redis: redisConfig });
module.exports = { emailQueue, reportQueue, notifyQueue };
// ── src/workers/email.worker.js — processes email jobs ────────────────────
const { emailQueue } = require('../queues');
const nodemailer = require('nodemailer');
const EmailLog = require('../models/email-log.model');
const { logger } = require('../config/logger');
const transporter = nodemailer.createTransport({ /* SES / SMTP config */ });
// Process up to 5 emails concurrently
emailQueue.process('*', 5, async job => {
const { type, to, data } = job.data;
logger.info('Processing email job', { jobId: job.id, type, to });
// Idempotency check — was this already sent?
const existing = await EmailLog.findOne({ jobId: job.id.toString() });
if (existing) {
logger.warn('Email job already processed — skipping', { jobId: job.id });
return { skipped: true };
}
// Update progress
await job.progress(10);
let subject, html;
switch (type) {
case 'WELCOME':
subject = 'Welcome to Task Manager!';
html = renderTemplate('welcome', data);
break;
case 'PASSWORD_RESET':
subject = 'Reset your password';
html = renderTemplate('password-reset', { resetUrl: data.resetUrl });
break;
case 'TASK_REMINDER':
subject = `Reminder: "${data.taskTitle}" is due soon`;
html = renderTemplate('task-reminder', data);
break;
default:
throw new Error(`Unknown email type: ${type}`);
}
await job.progress(50);
const info = await transporter.sendMail({ from: process.env.FROM_EMAIL, to, subject, html });
// Record successful send for idempotency
await EmailLog.create({
jobId: job.id.toString(),
type,
to,
messageId: info.messageId,
sentAt: new Date(),
});
await job.progress(100);
logger.info('Email sent', { jobId: job.id, messageId: info.messageId });
return { messageId: info.messageId };
});
// Job lifecycle events
emailQueue.on('completed', (job, result) => {
logger.debug('Email job completed', { jobId: job.id, ...result });
});
emailQueue.on('failed', (job, err) => {
logger.error('Email job failed', {
jobId: job.id,
attempt: job.attemptsMade,
maxAttempts: job.opts.attempts,
error: err.message,
data: job.data.type,
});
});
emailQueue.on('stalled', job => {
logger.warn('Email job stalled (worker crashed?)', { jobId: job.id });
});
// ── src/services/email.service.js — producer ──────────────────────────────
const { emailQueue } = require('../queues');
async function queueWelcomeEmail(user) {
const job = await emailQueue.add('welcome', {
type: 'WELCOME',
to: user.email,
data: { name: user.name },
}, {
priority: 1, // high priority — welcome emails should go out fast
delay: 0,
});
return job.id;
}
async function queuePasswordResetEmail(user, resetUrl) {
return emailQueue.add('passwordReset', {
type: 'PASSWORD_RESET',
to: user.email,
data: { name: user.name, resetUrl },
}, {
priority: 1,
attempts: 2, // only retry password reset twice — sensitive operation
removeOnComplete: true,
});
}
// Schedule a task reminder 24 hours before due date
async function scheduleTaskReminder(task, user) {
const delay = new Date(task.dueDate) - Date.now() - (24 * 60 * 60 * 1000);
if (delay < 0) return; // already past
return emailQueue.add('taskReminder', {
type: 'TASK_REMINDER',
to: user.email,
data: { taskTitle: task.title, taskId: task._id, dueDate: task.dueDate },
}, {
delay,
jobId: `reminder:${task._id}`, // deterministic ID — prevents duplicate schedules
removeOnComplete: true,
});
}
// Cancel a scheduled reminder (task due date changed or task completed)
async function cancelTaskReminder(taskId) {
const job = await emailQueue.getJob(`reminder:${taskId}`);
if (job) await job.remove();
}
// ── Repeatable jobs — cron-like scheduled tasks ───────────────────────────
const { reportQueue } = require('../queues');
// Daily summary report at 8am
reportQueue.add('dailySummary', {}, {
repeat: { cron: '0 8 * * *', tz: 'Europe/London' },
jobId: 'daily-summary', // stable ID — prevents duplicate schedules on restart
});
// Process report
reportQueue.process('dailySummary', 1, async job => {
const stats = await generateDailySummary();
await sendSummaryToAdmins(stats);
return stats;
});
module.exports = { queueWelcomeEmail, queuePasswordResetEmail, scheduleTaskReminder, cancelTaskReminder };
How It Works
Step 1 — Bull Persists Jobs in Redis Sorted Sets
Bull uses multiple Redis keys per queue: bull:email:wait (waiting jobs), bull:email:active (currently processing), bull:email:completed, bull:email:failed, and bull:email:delayed (scheduled for future). Jobs are stored as serialised JSON with their options. When a worker picks up a job, it moves from wait to active. Completion moves it to completed (or failed). This structure survives process restarts.
Step 2 — Job Types Route to Different Handlers
queue.process('jobType', concurrency, handler) registers a handler for jobs of a specific type (the first argument to queue.add()). Using '*' handles all job types. Routing by type within one handler (the switch statement) allows one worker process to handle multiple email variants while keeping the queue-level organisation simple. More complex cases can have separate process calls per job type.
Step 3 — Deterministic Job IDs Prevent Duplicate Scheduling
Setting jobId: 'reminder:taskId' creates a job with a known, deterministic ID. If emailQueue.add() is called again with the same jobId and the job already exists, Bull updates it rather than creating a duplicate. This is essential for scheduled reminders that might be recreated on app restart — without deterministic IDs, each restart would add a new reminder job for every active task.
Step 4 — Exponential Backoff Spaces Out Retries
backoff: { type: 'exponential', delay: 2000 } makes retry delays grow: 2s, 4s, 8s, 16s… This prevents repeatedly hammering a temporarily unavailable email service. The attempts: 3 option caps total attempts. For transient failures (SMTP server briefly unavailable), the first retry usually succeeds. For permanent failures (invalid email address), all retries fail and the job moves to the failed queue for investigation.
Step 5 — Stalled Job Detection Recovers from Worker Crashes
When a worker process crashes while processing a job, the job remains in the active state with no worker processing it — a “stalled” job. Bull periodically checks for stalled jobs (jobs in active state with no heartbeat) and moves them back to the waiting queue for re-processing. The 'stalled' event on the queue fires when this happens, allowing logging and monitoring of worker crashes.
Quick Reference
| Task | Code |
|---|---|
| Create queue | new Bull('name', { redis: redisConfig }) |
| Add job | queue.add('type', data, { priority, delay, attempts }) |
| Process jobs | queue.process('type', concurrency, async job => { ... }) |
| Report progress | await job.progress(percentage) |
| Delayed job | queue.add('type', data, { delay: milliseconds }) |
| Repeatable job | queue.add('type', {}, { repeat: { cron: '0 8 * * *' } }) |
| Cancel job | const job = await queue.getJob(id); await job.remove() |
| Idempotency check | if (await Log.exists({ jobId: job.id })) return { skipped: true } |
| Deterministic job ID | queue.add('type', data, { jobId: 'stable-id' }) |