Notifications bridge the gap between server-side events and user awareness — a task assigned to you, a comment mentioning you, a task due tomorrow. The notification system combines three mechanisms: persistent database notifications (in-app bell icon with unread count), real-time Socket.io delivery for instant in-app alerts, and Bull email queue for out-of-app delivery when the user is offline. This lesson builds the complete notification pipeline from event detection to user display.
Notification Types
| Event | In-App | Socket | |
|---|---|---|---|
| Task assigned to you | ✅ | ✅ | ✅ |
| Task you created — status changed | ✅ | ❌ | ✅ |
| Due date tomorrow | ✅ | ✅ | ❌ (cron) |
| Workspace invitation | ✅ | ✅ | ✅ |
| Mentioned in task description (@user) | ✅ | ✅ | ✅ |
| Task completed by assignee | ✅ | ❌ | ✅ |
unreadNotifications: Number) rather than running Notification.countDocuments({ userId, read: false }) on every request. Increment it when a notification is created, decrement it when notifications are marked as read. The denormalised count trades a slightly more complex write for a dramatically faster read.Notification.updateMany({ userId, read: false }, { $set: { read: true, readAt: new Date() } }). Then reset the user’s unread count to zero: User.findByIdAndUpdate(userId, { $set: { unreadNotifications: 0 } }). Using two fast bulk operations avoids N individual updates and keeps the UI responsive when a user has many unread notifications. Wrap both in a transaction for consistency.Complete Notification System
// ── Notification model ────────────────────────────────────────────────────
const notificationSchema = new mongoose.Schema({
userId: { type: mongoose.Schema.Types.ObjectId, ref: 'User', required: true },
type: { type: String, enum: ['task_assigned','task_status_changed',
'due_tomorrow','workspace_invitation','mentioned','task_completed'],
required: true },
title: { type: String, required: true },
body: { type: String },
link: { type: String }, // deep link: /w/slug/tasks/taskId
read: { type: Boolean, default: false },
readAt: { type: Date },
data: { type: mongoose.Schema.Types.Mixed }, // extra context (taskId, workspaceId)
}, { timestamps: true });
notificationSchema.index({ userId: 1, createdAt: -1 }); // user's notifications feed
notificationSchema.index({ userId: 1, read: 1 }); // unread count query
// TTL: auto-delete read notifications after 30 days
notificationSchema.index({ readAt: 1 }, { expireAfterSeconds: 30 * 24 * 60 * 60, sparse: true });
const Notification = mongoose.model('Notification', notificationSchema);
// ── Notification service ──────────────────────────────────────────────────
const { notifyQueue } = require('../../queues');
const { io } = require('../../config/socket');
const SOCKET_EVENTS = require('@taskmanager/shared/dist/constants/socket-events');
async function createAndDeliver(userId, notification) {
// 1. Persist to DB
const doc = await Notification.create({ userId, ...notification });
// 2. Increment unread count
await User.findByIdAndUpdate(userId, { $inc: { unreadNotifications: 1 } });
// 3. Real-time delivery via Socket.io (if user is connected)
io.to(`user:${userId}`).emit(SOCKET_EVENTS.NOTIFICATION, {
id: doc._id,
type: doc.type,
title: doc.title,
body: doc.body,
link: doc.link,
createdAt: doc.createdAt,
unreadCount: (await User.findById(userId).select('unreadNotifications'))?.unreadNotifications,
});
return doc;
}
exports.notifyTaskAssigned = async (task, assigneeIds, assignerName) => {
// Queue fanout job to avoid blocking request
await notifyQueue.add('taskAssigned', {
taskId: task._id.toString(),
taskTitle: task.title,
workspaceId: task.workspace.toString(),
workspaceSlug: task.workspaceSlug,
assigneeIds,
assignerName,
});
};
// Notification queue processor
notifyQueue.process('taskAssigned', 5, async (job) => {
const { taskId, taskTitle, workspaceSlug, assigneeIds, assignerName } = job.data;
for (const userId of assigneeIds) {
await createAndDeliver(userId, {
type: 'task_assigned',
title: `${assignerName} assigned you a task`,
body: taskTitle,
link: `/w/${workspaceSlug}/tasks/${taskId}`,
data: { taskId },
});
// Email notification (async via email queue)
const user = await User.findById(userId).select('email name');
if (user?.email) {
await emailQueue.add('taskAssigned', {
type: 'TASK_ASSIGNED',
to: user.email,
data: { name: user.name, taskTitle, assignerName,
taskUrl: `${process.env.CLIENT_URL}/w/${workspaceSlug}/tasks/${taskId}` },
});
}
}
});
// ── Due tomorrow — cron job ───────────────────────────────────────────────
const { reportQueue } = require('../../queues');
reportQueue.add('dueTomorrowNotifications', {}, {
repeat: { cron: '0 9 * * *' }, // every day at 9am
jobId: 'due-tomorrow-cron',
});
reportQueue.process('dueTomorrowNotifications', 1, async () => {
const tomorrow = new Date();
tomorrow.setDate(tomorrow.getDate() + 1);
const start = new Date(tomorrow.setHours(0,0,0,0));
const end = new Date(tomorrow.setHours(23,59,59,999));
const tasks = await Task.find({
dueDate: { $gte: start, $lte: end },
status: { $nin: ['done', 'cancelled'] },
deletedAt: { $exists: false },
}).populate('assignees', 'email name').populate('workspace', 'slug');
for (const task of tasks) {
for (const assignee of task.assignees) {
await createAndDeliver(assignee._id, {
type: 'due_tomorrow',
title: 'Task due tomorrow',
body: task.title,
link: `/w/${task.workspace.slug}/tasks/${task._id}`,
data: { taskId: task._id },
});
}
}
return { processed: tasks.length };
});
// ── Notification routes ───────────────────────────────────────────────────
exports.getNotifications = asyncHandler(async (req, res) => {
const { page = 1, limit = 20, unreadOnly = false } = req.query;
const filter = { userId: req.user.sub };
if (unreadOnly === 'true') filter.read = false;
const [notifications, total] = await Promise.all([
Notification.find(filter).sort('-createdAt')
.skip((page - 1) * limit).limit(parseInt(limit)),
Notification.countDocuments(filter),
]);
res.json({ success: true, data: notifications, meta: { total, page, limit } });
});
exports.markAllRead = asyncHandler(async (req, res) => {
await Notification.updateMany(
{ userId: req.user.sub, read: false },
{ $set: { read: true, readAt: new Date() } }
);
await User.findByIdAndUpdate(req.user.sub, { $set: { unreadNotifications: 0 } });
res.json({ success: true });
});
How It Works
Step 1 — Three-Channel Delivery Reaches Users Wherever They Are
In-app notifications (persisted to DB + Socket.io emit) reach users currently using the application. Email notifications reach users who are offline. The unread count badge on the bell icon tells users there are notifications waiting when they return. Each notification channel serves a different user state — the combination ensures no notification is missed regardless of when the user is active.
Step 2 — Background Queue Prevents Request Blocking
A task assigned to 20 people would require 20 database inserts, 20 Socket.io emits, and 20 email queue entries if done synchronously in the request handler. Dispatching a single Bull job and returning immediately keeps the assignment response time at ~50ms regardless of assignee count. The Bull worker processes the fanout asynchronously — the 20 operations happen in the background after the HTTP response is already sent.
Step 3 — Denormalised Unread Count Avoids Expensive Aggregations
Displaying an unread count badge on every page load without a denormalised field would require Notification.countDocuments({ userId, read: false }) on every request. For a user with thousands of notifications, this is an O(n) scan. The unreadNotifications field on the User document is O(1) — a single field read. The consistency guarantee: always increment on create, always decrement or reset on read. Eventual consistency (the count might be off by 1 in a race) is acceptable for a notification badge.
Step 4 — TTL Cleans Up Read Notifications Automatically
The sparse TTL index on readAt with expireAfterSeconds: 2592000 (30 days) automatically deletes notifications that were read more than 30 days ago. Unread notifications have no readAt and the sparse index excludes them — they are never auto-deleted. This prevents the notification collection from growing unboundedly while preserving the unread inbox indefinitely until the user reads them.
Step 5 — Cron Job Sends Daily Due-Tomorrow Reminders
The Bull repeatable job with cron: '0 9 * * *' runs at 9am every day (in the server timezone). It queries all non-completed tasks due the following day, creates in-app notifications for each assignee, and queues email reminders. The jobId: 'due-tomorrow-cron' prevents duplicate schedules on server restart — if the job already exists with that ID, Bull updates rather than creates. The cron timezone should be set to the workspace timezone for multi-timezone applications.
Quick Reference
| Task | Code |
|---|---|
| Create notification | Notification.create({ userId, type, title, body, link }) |
| Real-time emit | io.to(`user:${userId}`).emit(NOTIFICATION, data) |
| Increment unread | User.findByIdAndUpdate(userId, { $inc: { unreadNotifications: 1 } }) |
| Mark all read | Notification.updateMany({ userId, read: false }, { $set: { read: true, readAt: new Date() } }) |
| Reset count | User.findByIdAndUpdate(userId, { $set: { unreadNotifications: 0 } }) |
| Async fanout | Queue Bull job → process in worker → createAndDeliver per user |
| Daily cron | queue.add('job', {}, { repeat: { cron: '0 9 * * *' }, jobId: 'stable-id' }) |
| TTL on read | schema.index({ readAt: 1 }, { expireAfterSeconds: 2592000, sparse: true }) |