Background Jobs and Task Queues with Bull and Redis

Not every operation triggered by an HTTP request should complete before sending the response. Sending a welcome email, generating a PDF report, processing an uploaded image, or sending push notifications can take seconds โ€” making the client wait is poor UX and ties up your Node.js event loop. Background jobs offload these operations to a separate worker process that runs independently. Bull (and its modern successor BullMQ) is the industry-standard Node.js job queue backed by Redis. It provides reliable job persistence, retry logic, scheduled jobs, progress tracking, and a clean API for defining workers. This lesson builds a complete background job system for the MEAN Stack task manager.

Bull / BullMQ Core Concepts

Concept Description
Queue Named channel for a category of jobs โ€” e.g. email, notifications
Job Unit of work โ€” a plain JS object with an optional name and data payload
Producer Code that adds jobs to the queue โ€” typically an Express controller or service
Worker (Consumer) Process that listens to the queue and executes jobs
Processor Function that handles a job โ€” receives job data, returns result
Concurrency How many jobs the worker processes simultaneously
Retry Automatically re-enqueue a failed job up to N times with backoff
Delay Schedule a job to run after N milliseconds
Repeat (Cron) Schedule a job to run on a cron schedule
Priority Higher priority jobs run before lower priority ones

Job Lifecycle States

State Meaning
waiting Added to queue, waiting for a worker to pick it up
active Being processed by a worker right now
completed Worker finished successfully
failed Worker threw an error โ€” may retry depending on config
delayed Scheduled for a future time โ€” not yet in waiting state
paused Queue is paused โ€” jobs wait until resumed
Note: Bull and BullMQ require a running Redis instance. Redis acts as both the job store (persisting jobs so they survive server restarts) and the coordination layer (ensuring a job is processed by only one worker even in a multi-worker setup). In development, run Redis locally with docker run -p 6379:6379 redis:alpine. In production, use a managed Redis service โ€” AWS ElastiCache, Upstash, or Redis Cloud โ€” to get automatic failover and persistence.
Tip: Always run workers in a separate process from your Express API server. If the worker’s job processor throws an unhandled error and crashes the process, it should not take down your API. Use a separate entry point file for workers โ€” node src/workers/index.js โ€” managed by PM2 or Docker with automatic restart. Bull persists jobs in Redis, so jobs added by the API before the worker crashes are still there when the worker restarts.
Warning: Do not perform database queries or HTTP requests synchronously inside a job processor without try/catch. A processor that throws causes the job to fail. BullMQ will retry based on your attempts setting โ€” without retries, the job silently disappears. Always set meaningful attempts (3-5 for most jobs), use exponential backoff for transient failures, and add job failure event listeners to log and alert on repeated failures.

Complete Bull/BullMQ Setup

// npm install bullmq ioredis

const { Queue, Worker, QueueEvents, FlowProducer } = require('bullmq');
const Redis = require('ioredis');

// โ”€โ”€ Redis connection โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
const redisConnection = new Redis(process.env.REDIS_URL || 'redis://localhost:6379', {
    maxRetriesPerRequest: null,   // required by BullMQ
});

// โ”€โ”€ Queue definitions โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
// queues/index.js

const defaultJobOptions = {
    attempts:    3,
    backoff: {
        type:  'exponential',
        delay: 2000,   // 2s, 4s, 8s
    },
    removeOnComplete: { count: 100 },  // keep last 100 completed jobs
    removeOnFail:     { count: 500 },  // keep last 500 failed jobs for debugging
};

const emailQueue = new Queue('email', {
    connection:     redisConnection,
    defaultJobOptions,
});

const notificationQueue = new Queue('notifications', {
    connection:     redisConnection,
    defaultJobOptions,
});

const reportsQueue = new Queue('reports', {
    connection:        redisConnection,
    defaultJobOptions: { ...defaultJobOptions, attempts: 1 },  // reports don't retry
});

module.exports = { emailQueue, notificationQueue, reportsQueue };

// โ”€โ”€ Job producers (called from services/controllers) โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
// services/email.service.js

const { emailQueue } = require('../queues');

async function sendWelcomeEmail(user) {
    const job = await emailQueue.add('welcome', {
        userId:   user._id,
        email:    user.email,
        name:     user.name,
        loginUrl: process.env.FRONTEND_URL + '/auth/login',
    }, {
        priority: 1,    // high priority
        delay:    0,    // send immediately
    });
    return job.id;
}

async function sendTaskAssignedEmail(task, assignee) {
    return emailQueue.add('task-assigned', {
        to:       assignee.email,
        taskId:   task._id,
        taskTitle:task.title,
    });
}

async function sendDailyDigest(userId) {
    return emailQueue.add('daily-digest', { userId }, {
        delay:    10 * 60 * 1000,   // send 10 minutes from now
        priority: 5,                 // low priority
    });
}

// Schedule a cron job โ€” runs every day at 8am UTC
async function scheduleDailyDigests() {
    await emailQueue.add('digest-batch', {}, {
        repeat: { cron: '0 8 * * *' },
    });
}

// โ”€โ”€ Workers (separate process: node src/workers/index.js) โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
// workers/email.worker.js

const { Worker } = require('bullmq');
const nodemailer  = require('nodemailer');
const ejs         = require('ejs');
const path        = require('path');
const logger      = require('../utils/logger');

const transporter = nodemailer.createTransport({
    host: process.env.SMTP_HOST,
    port: parseInt(process.env.SMTP_PORT),
    auth: { user: process.env.SMTP_USER, pass: process.env.SMTP_PASS },
});

const emailWorker = new Worker('email', async (job) => {
    logger.info('Processing email job', { jobId: job.id, name: job.name });

    const templatePath = path.join(__dirname, '../views/emails', `${job.name}.ejs`);
    const html         = await ejs.renderFile(templatePath, job.data);

    const mailOptions = {
        from:    '"Task Manager" <noreply@taskmanager.io>',
        to:      job.data.email || job.data.to,
        subject: getSubject(job.name),
        html,
    };

    const result = await transporter.sendMail(mailOptions);
    logger.info('Email sent', { jobId: job.id, messageId: result.messageId });
    return { messageId: result.messageId };

}, {
    connection: redisConnection,
    concurrency: 5,   // process 5 emails simultaneously
});

function getSubject(jobName) {
    const subjects = {
        'welcome':        'Welcome to Task Manager!',
        'task-assigned':  'A task has been assigned to you',
        'daily-digest':   'Your daily task summary',
    };
    return subjects[jobName] || 'Notification from Task Manager';
}

// โ”€โ”€ QueueEvents โ€” monitor job lifecycle โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
const emailQueueEvents = new QueueEvents('email', { connection: redisConnection });

emailQueueEvents.on('completed', ({ jobId, returnvalue }) => {
    logger.info('Email job completed', { jobId, result: JSON.parse(returnvalue) });
});

emailQueueEvents.on('failed', ({ jobId, failedReason }) => {
    logger.error('Email job failed', { jobId, reason: failedReason });
});

emailQueueEvents.on('stalled', ({ jobId }) => {
    logger.warn('Email job stalled', { jobId });
});

// โ”€โ”€ Workers entry point โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
// workers/index.js
require('./email.worker');
require('./notification.worker');
console.log('Workers started');

Bull Board โ€” Queue Monitoring UI

// npm install @bull-board/express @bull-board/api

const { createBullBoard }    = require('@bull-board/api');
const { BullMQAdapter }      = require('@bull-board/api/bullMQAdapter');
const { ExpressAdapter }     = require('@bull-board/express');
const { emailQueue, notificationQueue, reportsQueue } = require('./queues');

function setupBullBoard(app) {
    if (process.env.NODE_ENV === 'production') return;  // disable in prod

    const serverAdapter = new ExpressAdapter();
    serverAdapter.setBasePath('/admin/queues');

    createBullBoard({
        queues: [
            new BullMQAdapter(emailQueue),
            new BullMQAdapter(notificationQueue),
            new BullMQAdapter(reportsQueue),
        ],
        serverAdapter,
    });

    app.use('/admin/queues', serverAdapter.getRouter());
    console.log('Bull Board: http://localhost:3000/admin/queues');
}

module.exports = setupBullBoard;
// Visit /admin/queues to see all queues, jobs, retry failed jobs

How It Works

Step 1 โ€” Redis Provides Reliable Job Persistence

When a producer adds a job, BullMQ serialises it to JSON and stores it in a Redis sorted set. The job remains in Redis until a worker processes it โ€” surviving server restarts, crashes, and deployments. If the API server crashes between adding a job and the worker picking it up, the job is still there when everything comes back online. This is the core reliability guarantee that in-memory queues cannot provide.

Step 2 โ€” Workers Compete for Jobs Atomically

When multiple worker processes are running, they all listen to the same Redis queue. BullMQ uses Redis atomic operations (ZPOPMIN, BRPOPLPUSH) to ensure that when multiple workers try to claim the same job simultaneously, exactly one succeeds. The winning worker moves the job to an “active” state and begins processing. Other workers see the job is already active and skip it.

Step 3 โ€” Exponential Backoff Handles Transient Failures

If an email fails because the SMTP server is temporarily unavailable, retrying immediately will probably fail again. Exponential backoff increases the delay between each retry: 2s, 4s, 8s. By the third attempt, the SMTP server has likely recovered. BullMQ manages this automatically based on the attempts and backoff job options โ€” the processor just needs to throw for failures.

Step 4 โ€” Concurrency Controls Parallel Processing

Setting concurrency: 5 on a worker means it processes up to 5 jobs simultaneously within the same process, using the event loop’s non-blocking I/O. For I/O-bound jobs (sending emails, making API calls), high concurrency is efficient. For CPU-bound jobs (image processing, PDF generation), use concurrency 1-2 per process and run multiple worker processes to utilise multiple CPU cores.

Step 5 โ€” Cron Jobs Replace setInterval for Scheduled Tasks

Using setInterval(fn, 24 * 60 * 60 * 1000) to schedule daily tasks is unreliable โ€” it resets on restart, runs on every instance in a multi-instance deployment (causing duplicates), and cannot survive crashes. BullMQ’s repeat/cron option persists the schedule in Redis. Only one worker processes the job at the scheduled time, regardless of how many worker processes are running.

Common Mistakes

Mistake 1 โ€” Running workers in the same process as the API server

โŒ Wrong โ€” a crashed worker takes down the API:

// In app.js
const worker = new Worker('email', processEmail, { connection: redis });
app.listen(3000);  // API and worker in same process โ€” worker crash = API down

✅ Correct โ€” separate entry points, managed by PM2 or Docker:

node src/index.js    # API server process
node src/workers/index.js  # Worker process โ€” separate, restarts independently

Mistake 2 โ€” Not handling job failure events

โŒ Wrong โ€” failed jobs disappear silently:

const worker = new Worker('email', processEmail, { connection: redis });
// No failure handler โ€” failed jobs pile up, nobody knows

✅ Correct โ€” monitor failures and alert:

worker.on('failed', (job, err) => {
    logger.error('Job failed', { jobId: job.id, name: job.name, error: err.message, attempts: job.attemptsMade });
    if (job.attemptsMade >= job.opts.attempts) {
        // All retries exhausted โ€” alert the team
        alertService.critical('Job permanently failed', { jobId: job.id });
    }
});

Mistake 3 โ€” Storing large objects as job data

โŒ Wrong โ€” entire document serialised to Redis:

emailQueue.add('welcome', { user: fullUserDocument });  // may be 10KB+ serialised
// Redis stores full document โ€” queue bloat, slow serialisation

✅ Correct โ€” store only the identifier and fetch fresh data in the worker:

emailQueue.add('welcome', { userId: user._id.toString() });
// Worker:
const user = await User.findById(job.data.userId);  // fresh data at processing time

Quick Reference

Task Code
Create queue new Queue('email', { connection: redis })
Add job queue.add('jobName', { ...data })
Add delayed job queue.add('name', data, { delay: 5000 })
Schedule cron job queue.add('name', data, { repeat: { cron: '0 8 * * *' } })
Create worker new Worker('email', async job => { ... }, { connection, concurrency: 5 })
Handle failure worker.on('failed', (job, err) => { ... })
Retry config { attempts: 3, backoff: { type: 'exponential', delay: 2000 } }
Monitor UI @bull-board/express โ€” visual queue dashboard

🧠 Test Yourself

After a user registers, your API currently calls emailService.sendWelcomeEmail(user) synchronously and the response is delayed by 2 seconds. What is the best architectural fix?