Change Streams — Real-Time Data Events and Resumable Subscriptions

Change Streams are MongoDB’s real-time data change notification system — a subscribable cursor that emits events whenever documents in a collection, database, or entire cluster are inserted, updated, replaced, or deleted. They are built on the replica set oplog, providing exactly-once delivery with resumability after connection interruptions. Change Streams are the MongoDB-native alternative to polling, enabling event-driven architectures where the database pushes changes rather than the application pulling them. In a MEAN Stack application, Change Streams power real-time task updates, audit trails, cache invalidation, and event-sourcing patterns.

Change Stream Event Types

Operation When Fired Key Fields
insert New document created fullDocument — the inserted document
update Document updated updateDescription.updatedFields, fullDocument (if fullDocument: 'updateLookup')
replace Document fully replaced fullDocument — the new complete document
delete Document deleted documentKey._id — only the ID is available after deletion
drop Collection dropped Collection namespace
invalidate Change stream invalidated (collection dropped/renamed) Stream should be closed
Note: Change Streams are resumable — each event has a _id field called the resume token. Store the most recently processed resume token persistently (in a database or file). If your application restarts or loses its connection, pass the stored resume token to collection.watch(pipeline, { resumeAfter: savedToken }) to pick up from exactly where you left off — no events are missed and no events are reprocessed. Without this, a reconnection starts the stream from the current point, missing any events that occurred during the downtime.
Tip: Use a match pipeline in the Change Stream to filter events server-side. collection.watch([{ $match: { 'fullDocument.user': userId, operationType: { $in: ['insert', 'update'] } } }]) sends only the events your application cares about. Without a pipeline filter, MongoDB sends every change event to your application — your application discards most of them, wasting network bandwidth and application CPU. The $match runs on the MongoDB server, before events are sent over the network.
Warning: Change Streams require a replica set or sharded cluster — they are not available on standalone MongoDB instances. In production this is fine (you should never run a standalone MongoDB). In development, configure your Docker MongoDB container as a single-node replica set: command: ["mongod", "--replSet", "rs0"] in docker-compose.yml, then run rs.initiate() in the mongo shell once. The mongodb-memory-server package also supports replica sets for testing Change Streams.

Complete Change Stream Implementation

const mongoose = require('mongoose');
const Task     = require('../models/task.model');
const ResumeToken = require('../models/resume-token.model');
const { logger }  = require('../config/logger');

// ── Start a resumable change stream ───────────────────────────────────────
async function startTaskChangeStream(onEvent) {
    // Load the last saved resume token
    const tokenDoc  = await ResumeToken.findOne({ stream: 'tasks' });
    const options   = { fullDocument: 'updateLookup' };
    if (tokenDoc?.token) options.resumeAfter = tokenDoc.token;

    // Pipeline: only watch inserts and updates on non-deleted tasks
    const pipeline = [
        { $match: {
            operationType: { $in: ['insert', 'update', 'replace', 'delete'] },
        }},
    ];

    const changeStream = Task.watch(pipeline, options);

    changeStream.on('change', async event => {
        try {
            // Process the event
            await onEvent(event);

            // Persist the resume token after successful processing
            await ResumeToken.findOneAndUpdate(
                { stream: 'tasks' },
                { token: event._id, updatedAt: new Date() },
                { upsert: true }
            );
        } catch (err) {
            logger.error('Change stream handler failed', {
                eventId:   event._id,
                operation: event.operationType,
                error:     err.message,
            });
        }
    });

    changeStream.on('error', async err => {
        logger.error('Change stream error', { error: err.message });
        // Reconnect after a delay
        changeStream.close();
        await new Promise(r => setTimeout(r, 5000));
        startTaskChangeStream(onEvent);   // restart with saved resume token
    });

    changeStream.on('end', () => {
        logger.warn('Change stream ended — reconnecting');
        startTaskChangeStream(onEvent);
    });

    logger.info('Task change stream started', { resumed: !!tokenDoc?.token });
    return changeStream;
}

// ── Change stream event handler ───────────────────────────────────────────
async function handleTaskChange(event) {
    const { operationType, fullDocument, documentKey, updateDescription } = event;

    switch (operationType) {
        case 'insert':
            // Broadcast to workspace via Socket.io
            broadcastTaskCreated(fullDocument.workspace?.toString(), fullDocument);
            // Invalidate user's task list cache
            await cache.invalidateTag(`user:${fullDocument.user}`);
            break;

        case 'update':
        case 'replace':
            broadcastTaskUpdated(
                fullDocument?.workspace?.toString(),
                documentKey._id.toString(),
                updateDescription?.updatedFields ?? fullDocument
            );
            await cache.del(`task:${documentKey._id}`);
            await cache.invalidateTag(`user:${fullDocument?.user}`);
            break;

        case 'delete':
            broadcastTaskDeleted('', documentKey._id.toString());
            await cache.del(`task:${documentKey._id}`);
            break;
    }
}

// ── Audit trail using change stream ──────────────────────────────────────
const TaskAudit = require('../models/task-audit.model');

async function startAuditStream() {
    const stream = Task.watch([
        { $match: { operationType: { $in: ['insert', 'update', 'delete'] } } },
    ], { fullDocument: 'updateLookup' });

    stream.on('change', async event => {
        await TaskAudit.create({
            taskId:    event.documentKey._id,
            operation: event.operationType,
            userId:    event.fullDocument?.user ?? null,
            changes:   event.updateDescription?.updatedFields ?? null,
            snapshot:  event.operationType !== 'delete' ? event.fullDocument : null,
            timestamp: new Date(),
        });
    });

    return stream;
}

// ── Initialise streams on app startup ─────────────────────────────────────
// src/server.js — after mongoose.connect():
async function initChangeStreams() {
    // Wait for replica set to be ready
    await mongoose.connection.db.admin().serverStatus();

    const taskStream  = await startTaskChangeStream(handleTaskChange);
    const auditStream = await startAuditStream();

    // Graceful shutdown — close streams before disconnecting
    process.on('SIGTERM', async () => {
        await taskStream.close();
        await auditStream.close();
    });
}

How It Works

Step 1 — Change Streams Are Built on the Oplog

MongoDB replica sets maintain an operations log (oplog) — a capped collection in the local database that records every write operation. Change Streams subscribe to this oplog and emit filtered events. Because the oplog is ordered and append-only, events are delivered in the exact order they were written. The oplog also provides the resume token — a position marker in the oplog that enables resumption after reconnection.

Step 2 — Resume Tokens Enable Exactly-Once Processing

Storing the resume token after successfully processing each event and passing it on reconnection ensures no events are missed and no events are reprocessed. This is a durable checkpoint mechanism: if the application crashes after processing event #42 but before storing token #42, it will reprocess event #42 on restart — so the handler must be idempotent (processing the same event twice produces the same result as once).

Step 3 — fullDocument: ‘updateLookup’ Fetches the Full Document on Update

By default, update events contain only the changed fields (updateDescription.updatedFields), not the complete document. Setting fullDocument: 'updateLookup' makes MongoDB perform an additional read to fetch the complete current document and include it as fullDocument. This is useful when your event handlers need the full document context (e.g. to find the user ID for cache invalidation). Note that this adds latency to event delivery and the fetched document may be slightly newer than the change itself if another update occurred between the write and the lookup.

Step 4 — Server-Side Pipeline Filtering Saves Network Bandwidth

The pipeline argument to .watch([pipeline], options) is an aggregation pipeline that runs on the MongoDB server before events are sent. Using { $match: { operationType: 'update' } } filters delete and insert events at the source. For high-write collections, filtering server-side is critical — without it, the application receives every event and discards most, consuming significant network bandwidth and application CPU.

Step 5 — Change Stream Error Handling Must Include Reconnection

Change Stream connections can be lost due to network interruptions, replica set failovers, or MongoDB server restarts. The error handler closes the stream and schedules a reconnection with a delay. On reconnection, resumeAfter: savedToken picks up from the last successfully processed event — no events are missed during the downtime. Without reconnection logic, a temporary network issue would permanently stop the change stream.

Quick Reference

Task Code
Watch collection Task.watch(pipeline, { fullDocument: 'updateLookup' })
Filter events server-side [{ $match: { operationType: 'update' } }]
Listen for changes stream.on('change', event => { ... })
Resume after reconnect Task.watch([], { resumeAfter: savedToken })
Access changed fields event.updateDescription.updatedFields
Access full document event.fullDocument (with updateLookup)
Access deleted ID event.documentKey._id
Close stream await stream.close()

🧠 Test Yourself

A Change Stream processes 1000 events, stores the resume token after each, then the application restarts. What token does it pass to resumeAfter and what events does it receive after reconnection?