Real-Time with Socket.io — WebSockets, Rooms, and Angular NgZone Integration

Real-time features — live task updates when a teammate completes something, instant notifications when a new comment is added, and live presence indicators — require a persistent bidirectional connection between the server and client. Socket.io is the standard WebSocket library for Node.js that provides automatic fallback for older browsers, room-based broadcasting, and reconnection logic. This lesson builds a complete real-time collaboration layer on top of the MEAN Stack task manager, with Express/Socket.io on the server and Angular’s NgZone-aware Socket.io client wrapper.

Socket.io Event Architecture

Event Direction Payload Use Case
task:created Server → clients in workspace { task: Task } Show new task in all open task lists
task:updated Server → clients in workspace { taskId, changes } Update task card in real-time
task:deleted Server → clients in workspace { taskId } Remove task from all lists
user:typing Client → server → other clients { userId, taskId } “Alice is editing this task”
workspace:join Client → server { workspaceId } Subscribe to workspace events
notification Server → specific client { type, message, data } Personal notifications
online_users Server → clients in workspace { users: OnlineUser[] } Live presence indicator
Note: Socket.io runs alongside the HTTP server — it attaches to the same Node.js HTTP server instance. HTTP requests and WebSocket upgrades share the same port. Socket.io handles the WebSocket upgrade handshake, fallback to long-polling for environments that block WebSockets, and room management. Authentication is handled by Socket.io middleware that reads the JWT from the handshake auth token — the client passes it in socket.auth = { token } during connection.
Tip: In Angular, wrap all Socket.io event emissions and subscriptions in NgZone.run(() => { ... }) to ensure Angular’s change detection is notified when socket events update component state. Socket.io events fire outside Angular’s zone, meaning signal updates and template bindings triggered by socket events will not re-render unless you enter the zone. A simple wrapper service that calls ngZone.run() on every incoming event handles this transparently.
Warning: Always authenticate Socket.io connections using JWT middleware — do not rely on HTTP session authentication. Socket.io connections bypass CORS in many configurations. Add a middleware function in io.use() that reads and verifies the JWT from socket.handshake.auth.token, attaches the decoded user to socket.data.user, and calls next(new Error('Unauthorized')) if verification fails. Unauthenticated socket connections can broadcast events to your user rooms.

Complete Real-Time Implementation

// ── Express + Socket.io server setup ─────────────────────────────────────
// server.js
const http = require('http');
const { Server } = require('socket.io');
const jwt  = require('jsonwebtoken');
const app  = require('./app');

const httpServer = http.createServer(app);

const io = new Server(httpServer, {
    cors: {
        origin:      process.env.ALLOWED_ORIGINS.split(','),
        credentials: true,
    },
    pingTimeout:  60000,   // how long to wait before disconnecting an idle client
    pingInterval: 25000,
});

// ── Socket.io authentication middleware ───────────────────────────────────
io.use((socket, next) => {
    const token = socket.handshake.auth?.token;
    if (!token) return next(new Error('Authentication error: no token'));

    try {
        const decoded = jwt.verify(token, process.env.JWT_SECRET);
        socket.data.user = decoded;
        next();
    } catch {
        next(new Error('Authentication error: invalid token'));
    }
});

// ── Connection handler ────────────────────────────────────────────────────
io.on('connection', (socket) => {
    const userId = socket.data.user.sub;
    console.log(`User ${userId} connected: ${socket.id}`);

    // Auto-join user's personal room (for direct notifications)
    socket.join(`user:${userId}`);

    // Join a workspace room
    socket.on('workspace:join', ({ workspaceId }) => {
        socket.join(`workspace:${workspaceId}`);
        socket.data.workspaceId = workspaceId;

        // Broadcast updated online users list to workspace
        const users = getOnlineUsersInWorkspace(io, workspaceId);
        io.to(`workspace:${workspaceId}`).emit('online_users', { users });
    });

    socket.on('workspace:leave', ({ workspaceId }) => {
        socket.leave(`workspace:${workspaceId}`);
    });

    socket.on('user:typing', ({ taskId }) => {
        const workspaceId = socket.data.workspaceId;
        if (workspaceId) {
            socket.to(`workspace:${workspaceId}`).emit('user:typing', {
                userId,
                taskId,
                userName: socket.data.user.email,
            });
        }
    });

    socket.on('disconnect', () => {
        const workspaceId = socket.data.workspaceId;
        if (workspaceId) {
            const users = getOnlineUsersInWorkspace(io, workspaceId);
            io.to(`workspace:${workspaceId}`).emit('online_users', { users });
        }
    });
});

// ── Broadcast helpers used by task controllers ────────────────────────────
function broadcastTaskCreated(workspaceId, task) {
    io.to(`workspace:${workspaceId}`).emit('task:created', { task });
}
function broadcastTaskUpdated(workspaceId, taskId, changes) {
    io.to(`workspace:${workspaceId}`).emit('task:updated', { taskId, changes });
}
function broadcastTaskDeleted(workspaceId, taskId) {
    io.to(`workspace:${workspaceId}`).emit('task:deleted', { taskId });
}
function sendNotification(userId, notification) {
    io.to(`user:${userId}`).emit('notification', notification);
}

function getOnlineUsersInWorkspace(io, workspaceId) {
    const room = io.sockets.adapter.rooms.get(`workspace:${workspaceId}`);
    if (!room) return [];
    return [...room].map(socketId => {
        const socket = io.sockets.sockets.get(socketId);
        return { id: socket?.data.user.sub, email: socket?.data.user.email };
    }).filter(Boolean);
}

// In task controller — emit after DB write:
exports.create = asyncHandler(async (req, res) => {
    const task = await Task.create({ ...req.body, user: req.user.sub });
    broadcastTaskCreated(req.user.workspaceId, task);  // broadcast to all workspace members
    created(res, task);
});

module.exports = { httpServer, io };
// ── Angular Socket.io service ─────────────────────────────────────────────
// npm install socket.io-client
import { Injectable, inject, NgZone, OnDestroy } from '@angular/core';
import { io, Socket }   from 'socket.io-client';
import { Observable }   from 'rxjs';
import { AuthStore }    from '../stores/auth.store';
import { environment }  from '../../../environments/environment';

@Injectable({ providedIn: 'root' })
export class SocketService implements OnDestroy {
    private ngZone   = inject(NgZone);
    private authStore= inject(AuthStore);
    private socket!: Socket;

    connect(): void {
        if (this.socket?.connected) return;

        this.socket = io(environment.wsUrl, {
            auth:             { token: this.authStore.accessToken() },
            transports:       ['websocket', 'polling'],
            reconnection:     true,
            reconnectionDelay:1000,
            reconnectionAttempts: 5,
        });

        this.socket.on('connect', () =>
            console.log('Socket connected:', this.socket.id));
        this.socket.on('connect_error', err =>
            console.error('Socket error:', err.message));
        this.socket.on('disconnect', reason =>
            console.warn('Socket disconnected:', reason));
    }

    joinWorkspace(workspaceId: string): void {
        this.socket?.emit('workspace:join', { workspaceId });
    }

    leaveWorkspace(workspaceId: string): void {
        this.socket?.emit('workspace:leave', { workspaceId });
    }

    emitTyping(taskId: string): void {
        this.socket?.emit('user:typing', { taskId });
    }

    // Generic observable wrapper — runs callback in Angular zone
    on<T>(event: string): Observable<T> {
        return new Observable(subscriber => {
            this.socket?.on(event, (data: T) => {
                this.ngZone.run(() => subscriber.next(data));
            });
            return () => this.socket?.off(event);
        });
    }

    disconnect(): void { this.socket?.disconnect(); }

    ngOnDestroy(): void { this.disconnect(); }
}

// ── Task store: integrate real-time updates ───────────────────────────────
@Injectable({ providedIn: 'root' })
export class TaskStore {
    private taskService  = inject(TaskService);
    private socketService= inject(SocketService);

    private _tasks = signal<Task[]>([]);
    readonly tasks  = this._tasks.asReadonly();

    connectRealTime(workspaceId: string): void {
        this.socketService.connect();
        this.socketService.joinWorkspace(workspaceId);

        // Listen for real-time events and update local state
        this.socketService.on<{ task: Task }>('task:created')
            .pipe(takeUntilDestroyed())
            .subscribe(({ task }) => {
                this._tasks.update(tasks => [task, ...tasks]);
            });

        this.socketService.on<{ taskId: string; changes: Partial<Task> }>('task:updated')
            .pipe(takeUntilDestroyed())
            .subscribe(({ taskId, changes }) => {
                this._tasks.update(tasks =>
                    tasks.map(t => t._id === taskId ? { ...t, ...changes } : t)
                );
            });

        this.socketService.on<{ taskId: string }>('task:deleted')
            .pipe(takeUntilDestroyed())
            .subscribe(({ taskId }) => {
                this._tasks.update(tasks => tasks.filter(t => t._id !== taskId));
            });
    }

    disconnectRealTime(workspaceId: string): void {
        this.socketService.leaveWorkspace(workspaceId);
    }
}

How It Works

Step 1 — Socket.io Upgrades HTTP to WebSocket

Socket.io starts with an HTTP polling connection, then upgrades to a WebSocket if the environment supports it. Both use the same port as the Express server — the same Node.js HTTP server handles both. The upgrade is transparent to the application code. For environments where WebSockets are blocked (corporate firewalls), Socket.io falls back to long-polling automatically.

Step 2 — Rooms Enable Targeted Broadcasting

Socket.io rooms are named groupings of socket connections. When a user joins a workspace, their socket is added to the room workspace:workspaceId. Broadcasting with io.to('workspace:abc').emit('task:created', data) sends to all sockets in that room — only the users who are currently viewing that workspace receive the event. Personal rooms (user:userId) enable sending to one specific user.

Step 3 — NgZone.run() Triggers Angular Change Detection

Socket.io events fire in a JavaScript event loop callback that runs outside Angular’s zone. When a socket event updates a signal, Angular does not know a change occurred and the template does not re-render. Wrapping the update in ngZone.run(() => signal.set(newValue)) enters Angular’s zone, which notifies the change detection system that an update occurred. The on<T>(event) wrapper in SocketService handles this automatically for all events.

Step 4 — JWT Authentication Prevents Unauthorised Room Access

The Socket.io io.use() middleware runs before any connection event handlers. It verifies the JWT from socket.handshake.auth.token — the same JWT the Angular HTTP interceptor attaches to API requests. If verification fails, next(new Error('Unauthorized')) prevents the connection from establishing. The authenticated user’s data is stored on socket.data.user and available in all event handlers.

Step 5 — Broadcast After DB Write Ensures Consistency

The server only broadcasts task:created after the task is successfully saved to MongoDB. This prevents ghost events — scenarios where a client receives a “task created” event but the database write subsequently fails. The broadcast happens in the same request handler, synchronously after the await Task.create() resolves. If the broadcast fails (client disconnected), the data is still in the database — the client will see it on next page load.

Common Mistakes

Mistake 1 — Not authenticating socket connections

❌ Wrong — any visitor can connect and receive task events:

io.on('connection', socket => {
    socket.join('workspace:123');  // no auth — anyone can join!
});

✅ Correct — verify JWT in io.use() middleware:

io.use((socket, next) => {
    try { socket.data.user = jwt.verify(socket.handshake.auth.token, JWT_SECRET); next(); }
    catch { next(new Error('Unauthorized')); }
});

Mistake 2 — Socket events not triggering Angular change detection

❌ Wrong — signal updated outside zone, template does not re-render:

this.socket.on('task:created', (data) => {
    this._tasks.update(tasks => [data.task, ...tasks]);  // outside Angular zone!
    // Template does not update — OnPush and Default both miss this
});

✅ Correct — enter zone before updating signals:

this.socket.on('task:created', (data) => {
    this.ngZone.run(() => this._tasks.update(tasks => [data.task, ...tasks]));
});

Mistake 3 — Not disconnecting socket on component destroy

❌ Wrong — socket subscriptions continue after navigation away:

ngOnInit(): void {
    this.store.connectRealTime(this.workspaceId);
    // No cleanup — events fire on destroyed component
}

✅ Correct — leave workspace room on destroy:

ngOnDestroy(): void { this.store.disconnectRealTime(this.workspaceId); }

Quick Reference

Task Code
Create Socket.io server const io = new Server(httpServer, { cors: { ... } })
Auth middleware io.use((socket, next) => { jwt.verify(socket.handshake.auth.token, ...) })
Join room socket.join('workspace:id')
Broadcast to room io.to('workspace:id').emit('task:created', data)
Broadcast except sender socket.to('workspace:id').emit('event', data)
Connect (Angular) io(wsUrl, { auth: { token: accessToken() } })
Listen (Angular) this.on<T>('event').pipe(takeUntilDestroyed()).subscribe(...)
Zone-aware update ngZone.run(() => signal.set(value))

🧠 Test Yourself

An Angular component updates a signal inside a Socket.io event callback. The component uses ChangeDetectionStrategy.OnPush. The template does not re-render. What is the fix?