Job Queues with Bull — Background Workers, Retry, and Scheduled Jobs

A Message Queue decouples producers (HTTP request handlers) from consumers (background workers), enabling asynchronous processing of work that does not need to happen within the HTTP request lifecycle: sending emails, generating PDFs, processing images, syncing with external systems, running heavy reports. Bull is the most widely used Redis-backed job queue for Node.js, providing job prioritisation, retry logic, concurrency control, progress reporting, and a dashboard for monitoring queue health. This lesson builds a complete job queue system for the task manager’s background operations.

Bull Queue Concepts

Concept Description
Queue Named channel for a type of work — email, imageProcess, report
Job A unit of work with a payload, options, and lifecycle state
Producer Adds jobs to the queue — typically an Express route handler
Worker Processes jobs from the queue — a function that does the actual work
Concurrency How many jobs a worker processes simultaneously
Retry Automatically re-queue failed jobs with backoff strategy
Priority Lower number = higher priority; high-priority jobs jump the queue
Delay Schedule a job to run after a delay — useful for reminders
Repeatable Cron-like scheduled jobs that repeat on a schedule
Note: Bull (and its modern successor BullMQ) uses Redis as the job store. Jobs are persisted in Redis sorted sets — jobs survive process restarts. The queue state is durable: jobs in the waiting or active state at the moment of a process crash are automatically recovered when the process restarts. This durability is why Redis-backed queues are preferred over in-process queues (like simple arrays) for production use.
Tip: Separate your queue workers into separate Node.js processes from your HTTP server. If the HTTP server and workers run in the same process and a worker crashes the process, the HTTP server goes down too. Running workers separately means each can be scaled, monitored, and restarted independently. The producer (HTTP server) just adds jobs to Redis; the consumer (worker process) reads and processes them. They share only the Redis connection.
Warning: Always implement idempotency in job handlers. Jobs can be processed more than once in rare failure scenarios (worker crashes after processing but before acknowledging). Design handlers so that processing a job twice produces the same result as once — check if the work was already done before doing it: if (await EmailLog.exists({ jobId: job.id })) return;. This prevents duplicate emails, duplicate database records, and duplicate charges in payment processing.

Complete Bull Queue Implementation

// ── src/queues/index.js — queue definitions ───────────────────────────────
const Bull = require('bull');

const redisConfig = { url: process.env.REDIS_URL || 'redis://localhost:6379' };

const emailQueue   = new Bull('email',   { redis: redisConfig, defaultJobOptions: {
    attempts:     3,
    backoff:      { type: 'exponential', delay: 2000 },
    removeOnComplete: 100,   // keep last 100 completed jobs
    removeOnFail:      50,   // keep last 50 failed jobs
}});

const reportQueue  = new Bull('report',  { redis: redisConfig, defaultJobOptions: {
    attempts:     2,
    timeout:      120_000,   // 2-minute timeout for heavy reports
    removeOnComplete: 20,
}});

const notifyQueue  = new Bull('notify',  { redis: redisConfig });

module.exports = { emailQueue, reportQueue, notifyQueue };

// ── src/workers/email.worker.js — processes email jobs ────────────────────
const { emailQueue } = require('../queues');
const nodemailer     = require('nodemailer');
const EmailLog       = require('../models/email-log.model');
const { logger }     = require('../config/logger');

const transporter = nodemailer.createTransport({ /* SES / SMTP config */ });

// Process up to 5 emails concurrently
emailQueue.process('*', 5, async job => {
    const { type, to, data } = job.data;
    logger.info('Processing email job', { jobId: job.id, type, to });

    // Idempotency check — was this already sent?
    const existing = await EmailLog.findOne({ jobId: job.id.toString() });
    if (existing) {
        logger.warn('Email job already processed — skipping', { jobId: job.id });
        return { skipped: true };
    }

    // Update progress
    await job.progress(10);

    let subject, html;
    switch (type) {
        case 'WELCOME':
            subject = 'Welcome to Task Manager!';
            html    = renderTemplate('welcome', data);
            break;
        case 'PASSWORD_RESET':
            subject = 'Reset your password';
            html    = renderTemplate('password-reset', { resetUrl: data.resetUrl });
            break;
        case 'TASK_REMINDER':
            subject = `Reminder: "${data.taskTitle}" is due soon`;
            html    = renderTemplate('task-reminder', data);
            break;
        default:
            throw new Error(`Unknown email type: ${type}`);
    }

    await job.progress(50);

    const info = await transporter.sendMail({ from: process.env.FROM_EMAIL, to, subject, html });

    // Record successful send for idempotency
    await EmailLog.create({
        jobId:     job.id.toString(),
        type,
        to,
        messageId: info.messageId,
        sentAt:    new Date(),
    });

    await job.progress(100);
    logger.info('Email sent', { jobId: job.id, messageId: info.messageId });
    return { messageId: info.messageId };
});

// Job lifecycle events
emailQueue.on('completed', (job, result) => {
    logger.debug('Email job completed', { jobId: job.id, ...result });
});

emailQueue.on('failed', (job, err) => {
    logger.error('Email job failed', {
        jobId:    job.id,
        attempt:  job.attemptsMade,
        maxAttempts: job.opts.attempts,
        error:    err.message,
        data:     job.data.type,
    });
});

emailQueue.on('stalled', job => {
    logger.warn('Email job stalled (worker crashed?)', { jobId: job.id });
});

// ── src/services/email.service.js — producer ──────────────────────────────
const { emailQueue } = require('../queues');

async function queueWelcomeEmail(user) {
    const job = await emailQueue.add('welcome', {
        type: 'WELCOME',
        to:   user.email,
        data: { name: user.name },
    }, {
        priority: 1,   // high priority — welcome emails should go out fast
        delay:    0,
    });
    return job.id;
}

async function queuePasswordResetEmail(user, resetUrl) {
    return emailQueue.add('passwordReset', {
        type: 'PASSWORD_RESET',
        to:   user.email,
        data: { name: user.name, resetUrl },
    }, {
        priority:  1,
        attempts:  2,   // only retry password reset twice — sensitive operation
        removeOnComplete: true,
    });
}

// Schedule a task reminder 24 hours before due date
async function scheduleTaskReminder(task, user) {
    const delay = new Date(task.dueDate) - Date.now() - (24 * 60 * 60 * 1000);
    if (delay < 0) return;  // already past

    return emailQueue.add('taskReminder', {
        type: 'TASK_REMINDER',
        to:   user.email,
        data: { taskTitle: task.title, taskId: task._id, dueDate: task.dueDate },
    }, {
        delay,
        jobId: `reminder:${task._id}`,   // deterministic ID — prevents duplicate schedules
        removeOnComplete: true,
    });
}

// Cancel a scheduled reminder (task due date changed or task completed)
async function cancelTaskReminder(taskId) {
    const job = await emailQueue.getJob(`reminder:${taskId}`);
    if (job) await job.remove();
}

// ── Repeatable jobs — cron-like scheduled tasks ───────────────────────────
const { reportQueue } = require('../queues');

// Daily summary report at 8am
reportQueue.add('dailySummary', {}, {
    repeat: { cron: '0 8 * * *', tz: 'Europe/London' },
    jobId: 'daily-summary',   // stable ID — prevents duplicate schedules on restart
});

// Process report
reportQueue.process('dailySummary', 1, async job => {
    const stats = await generateDailySummary();
    await sendSummaryToAdmins(stats);
    return stats;
});

module.exports = { queueWelcomeEmail, queuePasswordResetEmail, scheduleTaskReminder, cancelTaskReminder };

How It Works

Step 1 — Bull Persists Jobs in Redis Sorted Sets

Bull uses multiple Redis keys per queue: bull:email:wait (waiting jobs), bull:email:active (currently processing), bull:email:completed, bull:email:failed, and bull:email:delayed (scheduled for future). Jobs are stored as serialised JSON with their options. When a worker picks up a job, it moves from wait to active. Completion moves it to completed (or failed). This structure survives process restarts.

Step 2 — Job Types Route to Different Handlers

queue.process('jobType', concurrency, handler) registers a handler for jobs of a specific type (the first argument to queue.add()). Using '*' handles all job types. Routing by type within one handler (the switch statement) allows one worker process to handle multiple email variants while keeping the queue-level organisation simple. More complex cases can have separate process calls per job type.

Step 3 — Deterministic Job IDs Prevent Duplicate Scheduling

Setting jobId: 'reminder:taskId' creates a job with a known, deterministic ID. If emailQueue.add() is called again with the same jobId and the job already exists, Bull updates it rather than creating a duplicate. This is essential for scheduled reminders that might be recreated on app restart — without deterministic IDs, each restart would add a new reminder job for every active task.

Step 4 — Exponential Backoff Spaces Out Retries

backoff: { type: 'exponential', delay: 2000 } makes retry delays grow: 2s, 4s, 8s, 16s… This prevents repeatedly hammering a temporarily unavailable email service. The attempts: 3 option caps total attempts. For transient failures (SMTP server briefly unavailable), the first retry usually succeeds. For permanent failures (invalid email address), all retries fail and the job moves to the failed queue for investigation.

Step 5 — Stalled Job Detection Recovers from Worker Crashes

When a worker process crashes while processing a job, the job remains in the active state with no worker processing it — a “stalled” job. Bull periodically checks for stalled jobs (jobs in active state with no heartbeat) and moves them back to the waiting queue for re-processing. The 'stalled' event on the queue fires when this happens, allowing logging and monitoring of worker crashes.

Quick Reference

Task Code
Create queue new Bull('name', { redis: redisConfig })
Add job queue.add('type', data, { priority, delay, attempts })
Process jobs queue.process('type', concurrency, async job => { ... })
Report progress await job.progress(percentage)
Delayed job queue.add('type', data, { delay: milliseconds })
Repeatable job queue.add('type', {}, { repeat: { cron: '0 8 * * *' } })
Cancel job const job = await queue.getJob(id); await job.remove()
Idempotency check if (await Log.exists({ jobId: job.id })) return { skipped: true }
Deterministic job ID queue.add('type', data, { jobId: 'stable-id' })

🧠 Test Yourself

A worker crashes while processing a “send welcome email” job. The job is in the active state with no worker running. What does Bull do with this stalled job?