Notification System — In-App, Email, Real-Time, and Due-Tomorrow Cron

Notifications bridge the gap between server-side events and user awareness — a task assigned to you, a comment mentioning you, a task due tomorrow. The notification system combines three mechanisms: persistent database notifications (in-app bell icon with unread count), real-time Socket.io delivery for instant in-app alerts, and Bull email queue for out-of-app delivery when the user is offline. This lesson builds the complete notification pipeline from event detection to user display.

Notification Types

Event In-App Email Socket
Task assigned to you
Task you created — status changed
Due date tomorrow ❌ (cron)
Workspace invitation
Mentioned in task description (@user)
Task completed by assignee
Note: The unread notification count displayed in the bell icon must be fast — it is fetched on every page load and polled or updated via Socket.io. Store the unread count as a denormalised field on the User document (unreadNotifications: Number) rather than running Notification.countDocuments({ userId, read: false }) on every request. Increment it when a notification is created, decrement it when notifications are marked as read. The denormalised count trades a slightly more complex write for a dramatically faster read.
Tip: Batch “mark all as read” in a single update: Notification.updateMany({ userId, read: false }, { $set: { read: true, readAt: new Date() } }). Then reset the user’s unread count to zero: User.findByIdAndUpdate(userId, { $set: { unreadNotifications: 0 } }). Using two fast bulk operations avoids N individual updates and keeps the UI responsive when a user has many unread notifications. Wrap both in a transaction for consistency.
Warning: Notification fanout to many users can be slow. A task assigned to 20 people creates 20 notifications, each requiring a DB insert and a Socket.io emit. Do this asynchronously — dispatch a Bull job for notification fanout rather than creating all 20 notifications in the HTTP request handler. The request returns immediately with the assignment confirmation; the notification job processes in the background, creates the notifications, and emits the Socket.io events. This prevents the assignment response time from scaling with the number of assignees.

Complete Notification System

// ── Notification model ────────────────────────────────────────────────────
const notificationSchema = new mongoose.Schema({
    userId:     { type: mongoose.Schema.Types.ObjectId, ref: 'User', required: true },
    type:       { type: String, enum: ['task_assigned','task_status_changed',
                  'due_tomorrow','workspace_invitation','mentioned','task_completed'],
                  required: true },
    title:      { type: String, required: true },
    body:       { type: String },
    link:       { type: String },   // deep link: /w/slug/tasks/taskId
    read:       { type: Boolean,  default: false },
    readAt:     { type: Date },
    data:       { type: mongoose.Schema.Types.Mixed },  // extra context (taskId, workspaceId)
}, { timestamps: true });

notificationSchema.index({ userId: 1, createdAt: -1 });  // user's notifications feed
notificationSchema.index({ userId: 1, read: 1 });         // unread count query
// TTL: auto-delete read notifications after 30 days
notificationSchema.index({ readAt: 1 }, { expireAfterSeconds: 30 * 24 * 60 * 60, sparse: true });

const Notification = mongoose.model('Notification', notificationSchema);

// ── Notification service ──────────────────────────────────────────────────
const { notifyQueue } = require('../../queues');
const { io }          = require('../../config/socket');
const SOCKET_EVENTS   = require('@taskmanager/shared/dist/constants/socket-events');

async function createAndDeliver(userId, notification) {
    // 1. Persist to DB
    const doc = await Notification.create({ userId, ...notification });

    // 2. Increment unread count
    await User.findByIdAndUpdate(userId, { $inc: { unreadNotifications: 1 } });

    // 3. Real-time delivery via Socket.io (if user is connected)
    io.to(`user:${userId}`).emit(SOCKET_EVENTS.NOTIFICATION, {
        id:        doc._id,
        type:      doc.type,
        title:     doc.title,
        body:      doc.body,
        link:      doc.link,
        createdAt: doc.createdAt,
        unreadCount: (await User.findById(userId).select('unreadNotifications'))?.unreadNotifications,
    });

    return doc;
}

exports.notifyTaskAssigned = async (task, assigneeIds, assignerName) => {
    // Queue fanout job to avoid blocking request
    await notifyQueue.add('taskAssigned', {
        taskId:      task._id.toString(),
        taskTitle:   task.title,
        workspaceId: task.workspace.toString(),
        workspaceSlug: task.workspaceSlug,
        assigneeIds,
        assignerName,
    });
};

// Notification queue processor
notifyQueue.process('taskAssigned', 5, async (job) => {
    const { taskId, taskTitle, workspaceSlug, assigneeIds, assignerName } = job.data;

    for (const userId of assigneeIds) {
        await createAndDeliver(userId, {
            type:  'task_assigned',
            title: `${assignerName} assigned you a task`,
            body:  taskTitle,
            link:  `/w/${workspaceSlug}/tasks/${taskId}`,
            data:  { taskId },
        });

        // Email notification (async via email queue)
        const user = await User.findById(userId).select('email name');
        if (user?.email) {
            await emailQueue.add('taskAssigned', {
                type: 'TASK_ASSIGNED',
                to:   user.email,
                data: { name: user.name, taskTitle, assignerName,
                        taskUrl: `${process.env.CLIENT_URL}/w/${workspaceSlug}/tasks/${taskId}` },
            });
        }
    }
});

// ── Due tomorrow — cron job ───────────────────────────────────────────────
const { reportQueue } = require('../../queues');

reportQueue.add('dueTomorrowNotifications', {}, {
    repeat:  { cron: '0 9 * * *' },  // every day at 9am
    jobId:   'due-tomorrow-cron',
});

reportQueue.process('dueTomorrowNotifications', 1, async () => {
    const tomorrow = new Date();
    tomorrow.setDate(tomorrow.getDate() + 1);
    const start = new Date(tomorrow.setHours(0,0,0,0));
    const end   = new Date(tomorrow.setHours(23,59,59,999));

    const tasks = await Task.find({
        dueDate:   { $gte: start, $lte: end },
        status:    { $nin: ['done', 'cancelled'] },
        deletedAt: { $exists: false },
    }).populate('assignees', 'email name').populate('workspace', 'slug');

    for (const task of tasks) {
        for (const assignee of task.assignees) {
            await createAndDeliver(assignee._id, {
                type:  'due_tomorrow',
                title: 'Task due tomorrow',
                body:  task.title,
                link:  `/w/${task.workspace.slug}/tasks/${task._id}`,
                data:  { taskId: task._id },
            });
        }
    }
    return { processed: tasks.length };
});

// ── Notification routes ───────────────────────────────────────────────────
exports.getNotifications = asyncHandler(async (req, res) => {
    const { page = 1, limit = 20, unreadOnly = false } = req.query;
    const filter = { userId: req.user.sub };
    if (unreadOnly === 'true') filter.read = false;

    const [notifications, total] = await Promise.all([
        Notification.find(filter).sort('-createdAt')
            .skip((page - 1) * limit).limit(parseInt(limit)),
        Notification.countDocuments(filter),
    ]);

    res.json({ success: true, data: notifications, meta: { total, page, limit } });
});

exports.markAllRead = asyncHandler(async (req, res) => {
    await Notification.updateMany(
        { userId: req.user.sub, read: false },
        { $set: { read: true, readAt: new Date() } }
    );
    await User.findByIdAndUpdate(req.user.sub, { $set: { unreadNotifications: 0 } });
    res.json({ success: true });
});

How It Works

Step 1 — Three-Channel Delivery Reaches Users Wherever They Are

In-app notifications (persisted to DB + Socket.io emit) reach users currently using the application. Email notifications reach users who are offline. The unread count badge on the bell icon tells users there are notifications waiting when they return. Each notification channel serves a different user state — the combination ensures no notification is missed regardless of when the user is active.

Step 2 — Background Queue Prevents Request Blocking

A task assigned to 20 people would require 20 database inserts, 20 Socket.io emits, and 20 email queue entries if done synchronously in the request handler. Dispatching a single Bull job and returning immediately keeps the assignment response time at ~50ms regardless of assignee count. The Bull worker processes the fanout asynchronously — the 20 operations happen in the background after the HTTP response is already sent.

Step 3 — Denormalised Unread Count Avoids Expensive Aggregations

Displaying an unread count badge on every page load without a denormalised field would require Notification.countDocuments({ userId, read: false }) on every request. For a user with thousands of notifications, this is an O(n) scan. The unreadNotifications field on the User document is O(1) — a single field read. The consistency guarantee: always increment on create, always decrement or reset on read. Eventual consistency (the count might be off by 1 in a race) is acceptable for a notification badge.

Step 4 — TTL Cleans Up Read Notifications Automatically

The sparse TTL index on readAt with expireAfterSeconds: 2592000 (30 days) automatically deletes notifications that were read more than 30 days ago. Unread notifications have no readAt and the sparse index excludes them — they are never auto-deleted. This prevents the notification collection from growing unboundedly while preserving the unread inbox indefinitely until the user reads them.

Step 5 — Cron Job Sends Daily Due-Tomorrow Reminders

The Bull repeatable job with cron: '0 9 * * *' runs at 9am every day (in the server timezone). It queries all non-completed tasks due the following day, creates in-app notifications for each assignee, and queues email reminders. The jobId: 'due-tomorrow-cron' prevents duplicate schedules on server restart — if the job already exists with that ID, Bull updates rather than creates. The cron timezone should be set to the workspace timezone for multi-timezone applications.

Quick Reference

Task Code
Create notification Notification.create({ userId, type, title, body, link })
Real-time emit io.to(`user:${userId}`).emit(NOTIFICATION, data)
Increment unread User.findByIdAndUpdate(userId, { $inc: { unreadNotifications: 1 } })
Mark all read Notification.updateMany({ userId, read: false }, { $set: { read: true, readAt: new Date() } })
Reset count User.findByIdAndUpdate(userId, { $set: { unreadNotifications: 0 } })
Async fanout Queue Bull job → process in worker → createAndDeliver per user
Daily cron queue.add('job', {}, { repeat: { cron: '0 9 * * *' }, jobId: 'stable-id' })
TTL on read schema.index({ readAt: 1 }, { expireAfterSeconds: 2592000, sparse: true })

🧠 Test Yourself

A task is assigned to 50 people. The notification fanout is done synchronously in the request handler. What is the user experience problem and how does the Bull queue pattern solve it?