Change Streams are MongoDB’s real-time data change notification system — a subscribable cursor that emits events whenever documents in a collection, database, or entire cluster are inserted, updated, replaced, or deleted. They are built on the replica set oplog, providing exactly-once delivery with resumability after connection interruptions. Change Streams are the MongoDB-native alternative to polling, enabling event-driven architectures where the database pushes changes rather than the application pulling them. In a MEAN Stack application, Change Streams power real-time task updates, audit trails, cache invalidation, and event-sourcing patterns.
Change Stream Event Types
| Operation | When Fired | Key Fields |
|---|---|---|
insert |
New document created | fullDocument — the inserted document |
update |
Document updated | updateDescription.updatedFields, fullDocument (if fullDocument: 'updateLookup') |
replace |
Document fully replaced | fullDocument — the new complete document |
delete |
Document deleted | documentKey._id — only the ID is available after deletion |
drop |
Collection dropped | Collection namespace |
invalidate |
Change stream invalidated (collection dropped/renamed) | Stream should be closed |
_id field called the resume token. Store the most recently processed resume token persistently (in a database or file). If your application restarts or loses its connection, pass the stored resume token to collection.watch(pipeline, { resumeAfter: savedToken }) to pick up from exactly where you left off — no events are missed and no events are reprocessed. Without this, a reconnection starts the stream from the current point, missing any events that occurred during the downtime.collection.watch([{ $match: { 'fullDocument.user': userId, operationType: { $in: ['insert', 'update'] } } }]) sends only the events your application cares about. Without a pipeline filter, MongoDB sends every change event to your application — your application discards most of them, wasting network bandwidth and application CPU. The $match runs on the MongoDB server, before events are sent over the network.command: ["mongod", "--replSet", "rs0"] in docker-compose.yml, then run rs.initiate() in the mongo shell once. The mongodb-memory-server package also supports replica sets for testing Change Streams.Complete Change Stream Implementation
const mongoose = require('mongoose');
const Task = require('../models/task.model');
const ResumeToken = require('../models/resume-token.model');
const { logger } = require('../config/logger');
// ── Start a resumable change stream ───────────────────────────────────────
async function startTaskChangeStream(onEvent) {
// Load the last saved resume token
const tokenDoc = await ResumeToken.findOne({ stream: 'tasks' });
const options = { fullDocument: 'updateLookup' };
if (tokenDoc?.token) options.resumeAfter = tokenDoc.token;
// Pipeline: only watch inserts and updates on non-deleted tasks
const pipeline = [
{ $match: {
operationType: { $in: ['insert', 'update', 'replace', 'delete'] },
}},
];
const changeStream = Task.watch(pipeline, options);
changeStream.on('change', async event => {
try {
// Process the event
await onEvent(event);
// Persist the resume token after successful processing
await ResumeToken.findOneAndUpdate(
{ stream: 'tasks' },
{ token: event._id, updatedAt: new Date() },
{ upsert: true }
);
} catch (err) {
logger.error('Change stream handler failed', {
eventId: event._id,
operation: event.operationType,
error: err.message,
});
}
});
changeStream.on('error', async err => {
logger.error('Change stream error', { error: err.message });
// Reconnect after a delay
changeStream.close();
await new Promise(r => setTimeout(r, 5000));
startTaskChangeStream(onEvent); // restart with saved resume token
});
changeStream.on('end', () => {
logger.warn('Change stream ended — reconnecting');
startTaskChangeStream(onEvent);
});
logger.info('Task change stream started', { resumed: !!tokenDoc?.token });
return changeStream;
}
// ── Change stream event handler ───────────────────────────────────────────
async function handleTaskChange(event) {
const { operationType, fullDocument, documentKey, updateDescription } = event;
switch (operationType) {
case 'insert':
// Broadcast to workspace via Socket.io
broadcastTaskCreated(fullDocument.workspace?.toString(), fullDocument);
// Invalidate user's task list cache
await cache.invalidateTag(`user:${fullDocument.user}`);
break;
case 'update':
case 'replace':
broadcastTaskUpdated(
fullDocument?.workspace?.toString(),
documentKey._id.toString(),
updateDescription?.updatedFields ?? fullDocument
);
await cache.del(`task:${documentKey._id}`);
await cache.invalidateTag(`user:${fullDocument?.user}`);
break;
case 'delete':
broadcastTaskDeleted('', documentKey._id.toString());
await cache.del(`task:${documentKey._id}`);
break;
}
}
// ── Audit trail using change stream ──────────────────────────────────────
const TaskAudit = require('../models/task-audit.model');
async function startAuditStream() {
const stream = Task.watch([
{ $match: { operationType: { $in: ['insert', 'update', 'delete'] } } },
], { fullDocument: 'updateLookup' });
stream.on('change', async event => {
await TaskAudit.create({
taskId: event.documentKey._id,
operation: event.operationType,
userId: event.fullDocument?.user ?? null,
changes: event.updateDescription?.updatedFields ?? null,
snapshot: event.operationType !== 'delete' ? event.fullDocument : null,
timestamp: new Date(),
});
});
return stream;
}
// ── Initialise streams on app startup ─────────────────────────────────────
// src/server.js — after mongoose.connect():
async function initChangeStreams() {
// Wait for replica set to be ready
await mongoose.connection.db.admin().serverStatus();
const taskStream = await startTaskChangeStream(handleTaskChange);
const auditStream = await startAuditStream();
// Graceful shutdown — close streams before disconnecting
process.on('SIGTERM', async () => {
await taskStream.close();
await auditStream.close();
});
}
How It Works
Step 1 — Change Streams Are Built on the Oplog
MongoDB replica sets maintain an operations log (oplog) — a capped collection in the local database that records every write operation. Change Streams subscribe to this oplog and emit filtered events. Because the oplog is ordered and append-only, events are delivered in the exact order they were written. The oplog also provides the resume token — a position marker in the oplog that enables resumption after reconnection.
Step 2 — Resume Tokens Enable Exactly-Once Processing
Storing the resume token after successfully processing each event and passing it on reconnection ensures no events are missed and no events are reprocessed. This is a durable checkpoint mechanism: if the application crashes after processing event #42 but before storing token #42, it will reprocess event #42 on restart — so the handler must be idempotent (processing the same event twice produces the same result as once).
Step 3 — fullDocument: ‘updateLookup’ Fetches the Full Document on Update
By default, update events contain only the changed fields (updateDescription.updatedFields), not the complete document. Setting fullDocument: 'updateLookup' makes MongoDB perform an additional read to fetch the complete current document and include it as fullDocument. This is useful when your event handlers need the full document context (e.g. to find the user ID for cache invalidation). Note that this adds latency to event delivery and the fetched document may be slightly newer than the change itself if another update occurred between the write and the lookup.
Step 4 — Server-Side Pipeline Filtering Saves Network Bandwidth
The pipeline argument to .watch([pipeline], options) is an aggregation pipeline that runs on the MongoDB server before events are sent. Using { $match: { operationType: 'update' } } filters delete and insert events at the source. For high-write collections, filtering server-side is critical — without it, the application receives every event and discards most, consuming significant network bandwidth and application CPU.
Step 5 — Change Stream Error Handling Must Include Reconnection
Change Stream connections can be lost due to network interruptions, replica set failovers, or MongoDB server restarts. The error handler closes the stream and schedules a reconnection with a delay. On reconnection, resumeAfter: savedToken picks up from the last successfully processed event — no events are missed during the downtime. Without reconnection logic, a temporary network issue would permanently stop the change stream.
Quick Reference
| Task | Code |
|---|---|
| Watch collection | Task.watch(pipeline, { fullDocument: 'updateLookup' }) |
| Filter events server-side | [{ $match: { operationType: 'update' } }] |
| Listen for changes | stream.on('change', event => { ... }) |
| Resume after reconnect | Task.watch([], { resumeAfter: savedToken }) |
| Access changed fields | event.updateDescription.updatedFields |
| Access full document | event.fullDocument (with updateLookup) |
| Access deleted ID | event.documentKey._id |
| Close stream | await stream.close() |