WebSockets with Socket.io — Real-Time Bidirectional Communication

HTTP is request-response: the client asks, the server answers. But many modern features require the server to push data to clients without a client request — live notifications, collaborative editing, real-time task updates, chat, presence indicators, and dashboard counters. WebSockets establish a persistent, bidirectional connection between client and server. Socket.io is the most popular WebSocket library for Node.js — it adds rooms, namespaces, automatic reconnection, event broadcasting, middleware, and graceful fallback to HTTP long-polling. In the MEAN Stack, Socket.io runs alongside Express on the same server, sharing the same port and authentication infrastructure.

Socket.io Core Concepts

Concept Description
io The Socket.io server instance — manages all connections
socket An individual client connection — one per connected client
Event Named message type — emitted by client or server with optional data
Room Named channel — sockets can join/leave; emit to all sockets in a room
Namespace Multiplexing — separate logical channels on one connection (/chat, /tasks)
Broadcast Emit to all connected sockets except the sender
Acknowledgement Callback-based response to confirm an event was received

Emit Targets

Target Code Reaches
One socket socket.emit('event', data) Only this client
All clients io.emit('event', data) Every connected client
All except sender socket.broadcast.emit('event', data) Everyone except this socket
One room io.to('roomName').emit('event', data) All sockets in the room
Room except sender socket.to('roomName').emit('event', data) Room members except this socket
Namespace io.of('/tasks').emit('event', data) All clients in namespace

Built-in Events

Event Side Fired When
connection Server A client successfully connects
disconnect Server A client disconnects (any reason)
disconnecting Server Client is about to disconnect — still in rooms
connect Client Socket connected to server
connect_error Client Connection attempt failed
reconnect Client Successfully reconnected
Note: Socket.io is not a pure WebSocket library — it adds a transport negotiation layer and its own protocol on top. A Socket.io client cannot connect to a plain WebSocket server, and a plain WebSocket client cannot connect to a Socket.io server. Both the server (socket.io npm package) and client (socket.io-client) must use the Socket.io library. In Angular, install the official client: npm install socket.io-client.
Tip: Always authenticate WebSocket connections in the Socket.io middleware — do not wait until after the connection is established. Pass the JWT in the auth object during connection: io = io('http://localhost:3000', { auth: { token: accessToken } }). On the server, the Socket.io io.use() middleware receives the token in socket.handshake.auth.token, verifies it, and attaches the user to the socket before any event handlers run.
Warning: WebSocket connections are persistent — a client that connects stays connected until it explicitly disconnects or a network error occurs. In a multi-instance deployment (multiple Node.js servers), Socket.io broadcasts (like io.emit()) only reach clients connected to the same server instance. To broadcast across all instances, configure the Socket.io Redis Adapter: io.adapter(createAdapter(pubClient, subClient)). Without it, a task update made on instance A will not notify the user if they are connected to instance B.

Complete Socket.io Implementation

// npm install socket.io
// npm install @socket.io/redis-adapter ioredis  (for multi-instance)

const { Server }        = require('socket.io');
const { createAdapter } = require('@socket.io/redis-adapter');
const jwt               = require('jsonwebtoken');
const logger            = require('./utils/logger');

function createSocketServer(httpServer) {
    const io = new Server(httpServer, {
        cors: {
            origin:      process.env.FRONTEND_URL || 'http://localhost:4200',
            methods:     ['GET', 'POST'],
            credentials: true,
        },
        transports:        ['websocket', 'polling'],
        pingTimeout:       20000,
        pingInterval:      25000,
        upgradeTimeout:    30000,
        maxHttpBufferSize: 1e6,   // 1 MB max message size
    });

    // ── Redis adapter for multi-instance broadcasts ───────────────────────
    if (process.env.REDIS_URL) {
        const Redis = require('ioredis');
        const pub   = new Redis(process.env.REDIS_URL);
        const sub   = pub.duplicate();
        io.adapter(createAdapter(pub, sub));
        logger.info('Socket.io: Redis adapter configured');
    }

    // ── Authentication middleware ─────────────────────────────────────────
    io.use(async (socket, next) => {
        try {
            const token = socket.handshake.auth?.token;
            if (!token) return next(new Error('Authentication required'));

            const decoded  = jwt.verify(token, process.env.JWT_SECRET);
            const user     = await User.findById(decoded.id).select('_id name email role').lean();
            if (!user) return next(new Error('User not found'));

            socket.user = user;   // attach user to socket for all event handlers
            next();
        } catch (err) {
            next(new Error('Invalid or expired token'));
        }
    });

    // ── Connection handler ────────────────────────────────────────────────
    io.on('connection', (socket) => {
        const { id: socketId, user } = socket;
        logger.info('Socket connected', { socketId, userId: user._id });

        // Join a personal room for targeted notifications
        socket.join(`user:${user._id}`);

        // ── Task events ───────────────────────────────────────────────────
        socket.on('task:subscribe', ({ projectId } = {}) => {
            if (projectId) {
                socket.join(`project:${projectId}`);
                logger.info('Socket joined project room', { socketId, projectId });
            }
        });

        socket.on('task:unsubscribe', ({ projectId } = {}) => {
            if (projectId) socket.leave(`project:${projectId}`);
        });

        // ── Presence tracking ─────────────────────────────────────────────
        socket.on('presence:online', () => {
            socket.to(`user:${user._id}`).emit('presence:update', {
                userId: user._id,
                status: 'online',
            });
        });

        // ── Disconnect handler ────────────────────────────────────────────
        socket.on('disconnect', (reason) => {
            logger.info('Socket disconnected', { socketId, userId: user._id, reason });
        });

        socket.on('error', (err) => {
            logger.error('Socket error', { socketId, userId: user._id, error: err.message });
        });
    });

    return io;
}

// ── Helper: emit events from Express controllers ──────────────────────────
// Make io accessible in controllers
function createSocketEmitter(io) {
    return {
        // Notify a specific user
        notifyUser: (userId, event, data) => {
            io.to(`user:${userId}`).emit(event, data);
        },

        // Broadcast to a project room
        notifyProject: (projectId, event, data) => {
            io.to(`project:${projectId}`).emit(event, data);
        },

        // Broadcast to all connected clients
        broadcast: (event, data) => {
            io.emit(event, data);
        },
    };
}

module.exports = { createSocketServer, createSocketEmitter };

// ── index.js — attach Socket.io to the HTTP server ────────────────────────
const express = require('express');
const http    = require('http');
const app     = require('./app');
const { createSocketServer, createSocketEmitter } = require('./socket');

const httpServer = http.createServer(app);
const io         = createSocketServer(httpServer);
const socketEmitter = createSocketEmitter(io);

// Make socketEmitter available to controllers via app.locals
app.locals.socketEmitter = socketEmitter;

httpServer.listen(3000, () => console.log('Server + WebSocket on port 3000'));

Emitting Events from Express Controllers

// controllers/task.controller.js — emit socket events after mutations
exports.create = asyncHandler(async (req, res) => {
    const task = await taskService.createTask(req.user.id, req.body);

    // Emit to the user's personal room and any project rooms
    const { socketEmitter } = req.app.locals;
    socketEmitter.notifyUser(req.user.id, 'task:created', {
        task,
        timestamp: new Date().toISOString(),
    });

    res.status(201).json({ success: true, data: task });
});

exports.update = asyncHandler(async (req, res) => {
    const task = await taskService.updateTask(req.params.id, req.user.id, req.body);

    const { socketEmitter } = req.app.locals;
    socketEmitter.notifyUser(req.user.id, 'task:updated', { task });

    res.json({ success: true, data: task });
});

exports.remove = asyncHandler(async (req, res) => {
    await taskService.deleteTask(req.params.id, req.user.id);

    const { socketEmitter } = req.app.locals;
    socketEmitter.notifyUser(req.user.id, 'task:deleted', { taskId: req.params.id });

    res.status(204).end();
});

Angular Socket.io Client

// npm install socket.io-client
// frontend/src/app/core/services/socket.service.ts

import { Injectable, OnDestroy }     from '@angular/core';
import { io, Socket }                from 'socket.io-client';
import { Observable, fromEvent, Subject } from 'rxjs';
import { takeUntil }                 from 'rxjs/operators';
import { environment }               from '../../../environments/environment';
import { AuthService }               from './auth.service';

@Injectable({ providedIn: 'root' })
export class SocketService implements OnDestroy {
    private socket: Socket | null = null;
    private destroy$ = new Subject<void>();

    constructor(private auth: AuthService) {}

    connect(): void {
        if (this.socket?.connected) return;

        this.socket = io(environment.wsUrl || 'http://localhost:3000', {
            auth:       { token: this.auth.getAccessToken() },
            transports: ['websocket', 'polling'],
            autoConnect: true,
        });

        this.socket.on('connect', () => {
            console.log('[Socket] Connected:', this.socket?.id);
        });

        this.socket.on('connect_error', (err) => {
            console.error('[Socket] Connection error:', err.message);
            if (err.message === 'Invalid or expired token') {
                this.auth.refreshToken().subscribe(() => this.reconnect());
            }
        });
    }

    disconnect(): void {
        this.socket?.disconnect();
        this.socket = null;
    }

    reconnect(): void {
        this.disconnect();
        this.connect();
    }

    // Subscribe to a Socket.io room for project updates
    subscribeToProject(projectId: string): void {
        this.socket?.emit('task:subscribe', { projectId });
    }

    // Listen to an event — returns Observable
    on<T>(event: string): Observable<T> {
        return new Observable(observer => {
            this.socket?.on(event, (data: T) => observer.next(data));
            return () => this.socket?.off(event);
        }).pipe(takeUntil(this.destroy$));
    }

    emit(event: string, data?: unknown): void {
        this.socket?.emit(event, data);
    }

    ngOnDestroy(): void {
        this.destroy$.next();
        this.destroy$.complete();
        this.disconnect();
    }
}

// Usage in a component
@Component({ ... })
export class TaskListComponent implements OnInit, OnDestroy {
    tasks = signal<Task[]>([]);

    constructor(private socket: SocketService, private taskService: TaskService) {}

    ngOnInit() {
        this.socket.connect();

        // Listen for real-time task events
        this.socket.on<{ task: Task }>('task:created').subscribe(({ task }) => {
            this.tasks.update(tasks => [task, ...tasks]);
        });

        this.socket.on<{ task: Task }>('task:updated').subscribe(({ task }) => {
            this.tasks.update(tasks => tasks.map(t => t._id === task._id ? task : t));
        });

        this.socket.on<{ taskId: string }>('task:deleted').subscribe(({ taskId }) => {
            this.tasks.update(tasks => tasks.filter(t => t._id !== taskId));
        });
    }
}

How It Works

Step 1 — Socket.io Upgrades HTTP to WebSocket

When the Angular client connects, it starts with an HTTP long-polling request. If WebSocket is supported (almost always), Socket.io upgrades to a persistent WebSocket connection. Both transports are fully functional — Socket.io will automatically fall back to polling if WebSocket is blocked by a corporate firewall or proxy. The upgrade happens transparently without any application code changes.

Step 2 — Rooms Enable Targeted Broadcasting

A room is a named channel. When a socket joins user:64a1f..., it subscribes to that channel. When the server calls io.to('user:64a1f...').emit('task:updated', data), every socket in that room receives the event — which might be multiple browser tabs for the same user. Rooms are created automatically on first join and destroyed automatically when the last socket leaves.

Step 3 — Authentication Middleware Runs Before Any Event

The io.use(fn) middleware runs for every new connection before the connection event fires. Rejecting the connection in middleware (by calling next(new Error(...))) prevents the socket from ever entering the connection handler. This is the correct place to verify the JWT — not inside each event handler — so unauthenticated connections are rejected at the door.

Step 4 — Emitting From Controllers Bridges HTTP and WebSocket

When a task is updated via the REST API (a PUT request from one user’s browser), other clients watching that task need to know. By storing the io instance in app.locals and emitting after each mutation, the HTTP controller acts as a bridge between the REST API and the WebSocket layer. The Angular client receives both the HTTP response (confirming the update for the requestor) and the WebSocket event (notifying all other connected clients).

Step 5 — Redis Adapter Synchronises Multi-Instance Broadcasts

Without a Redis adapter, io.to('user:alice').emit() only reaches clients connected to the current Node.js instance. With the Redis adapter, Socket.io publishes the event to a Redis pub/sub channel. All other instances subscribe to the channel and forward the event to their local clients. The result is that broadcasts reach all connected clients regardless of which server instance they are on.

Common Mistakes

Mistake 1 — Not authenticating socket connections

❌ Wrong — anyone can connect and receive events:

io.on('connection', socket => {
    socket.join('user:' + socket.handshake.query.userId);  // client supplies their own userId!
    // Attacker supplies any userId to receive other users' events
});

✅ Correct — authenticate in io.use() middleware before connection:

io.use((socket, next) => {
    const token = socket.handshake.auth?.token;
    const user  = jwt.verify(token, process.env.JWT_SECRET);
    socket.user = user;  // server-assigned, not client-supplied
    next();
});

Mistake 2 — Memory leaks from missing event listener cleanup in Angular

❌ Wrong — listener accumulates on every component init:

ngOnInit() {
    this.socket.on('task:updated', data => this.tasks = [...]);
    // Component destroyed and re-created 10 times = 10 listeners
}

✅ Correct — use takeUntil and destroy$ in OnDestroy:

ngOnInit() {
    this.socket.on<Task>('task:updated').pipe(takeUntil(this.destroy$)).subscribe(...);
}
ngOnDestroy() { this.destroy$.next(); this.destroy$.complete(); }

Mistake 3 — Using io.emit() in multi-instance deployment without Redis adapter

❌ Wrong — only clients on the current instance receive the broadcast:

io.emit('announcement', data);   // only reaches clients on THIS server instance

✅ Correct — configure Redis adapter for cluster-wide broadcasts:

io.adapter(createAdapter(pubClient, subClient));
io.emit('announcement', data);   // reaches ALL instances via Redis pub/sub

Quick Reference

Task Server Code
Create server const io = new Server(httpServer, { cors: {...} })
Auth middleware io.use((socket, next) => { /* verify JWT */ next() })
On connection io.on('connection', socket => { ... })
Join room socket.join('roomName')
Emit to user io.to('user:id').emit('event', data)
Emit to all io.emit('event', data)
Broadcast socket.broadcast.emit('event', data)
Listen to event socket.on('eventName', (data) => { ... })
On disconnect socket.on('disconnect', reason => { ... })

🧠 Test Yourself

You have 3 Node.js instances. User A is on instance 1 and User B is on instance 3. When User A updates a shared task, what must be configured for User B to receive the WebSocket notification?