Real-time features — live task updates when a teammate completes something, instant notifications when a new comment is added, and live presence indicators — require a persistent bidirectional connection between the server and client. Socket.io is the standard WebSocket library for Node.js that provides automatic fallback for older browsers, room-based broadcasting, and reconnection logic. This lesson builds a complete real-time collaboration layer on top of the MEAN Stack task manager, with Express/Socket.io on the server and Angular’s NgZone-aware Socket.io client wrapper.
Socket.io Event Architecture
| Event | Direction | Payload | Use Case |
|---|---|---|---|
task:created |
Server → clients in workspace | { task: Task } |
Show new task in all open task lists |
task:updated |
Server → clients in workspace | { taskId, changes } |
Update task card in real-time |
task:deleted |
Server → clients in workspace | { taskId } |
Remove task from all lists |
user:typing |
Client → server → other clients | { userId, taskId } |
“Alice is editing this task” |
workspace:join |
Client → server | { workspaceId } |
Subscribe to workspace events |
notification |
Server → specific client | { type, message, data } |
Personal notifications |
online_users |
Server → clients in workspace | { users: OnlineUser[] } |
Live presence indicator |
socket.auth = { token } during connection.NgZone.run(() => { ... }) to ensure Angular’s change detection is notified when socket events update component state. Socket.io events fire outside Angular’s zone, meaning signal updates and template bindings triggered by socket events will not re-render unless you enter the zone. A simple wrapper service that calls ngZone.run() on every incoming event handles this transparently.io.use() that reads and verifies the JWT from socket.handshake.auth.token, attaches the decoded user to socket.data.user, and calls next(new Error('Unauthorized')) if verification fails. Unauthenticated socket connections can broadcast events to your user rooms.Complete Real-Time Implementation
// ── Express + Socket.io server setup ─────────────────────────────────────
// server.js
const http = require('http');
const { Server } = require('socket.io');
const jwt = require('jsonwebtoken');
const app = require('./app');
const httpServer = http.createServer(app);
const io = new Server(httpServer, {
cors: {
origin: process.env.ALLOWED_ORIGINS.split(','),
credentials: true,
},
pingTimeout: 60000, // how long to wait before disconnecting an idle client
pingInterval: 25000,
});
// ── Socket.io authentication middleware ───────────────────────────────────
io.use((socket, next) => {
const token = socket.handshake.auth?.token;
if (!token) return next(new Error('Authentication error: no token'));
try {
const decoded = jwt.verify(token, process.env.JWT_SECRET);
socket.data.user = decoded;
next();
} catch {
next(new Error('Authentication error: invalid token'));
}
});
// ── Connection handler ────────────────────────────────────────────────────
io.on('connection', (socket) => {
const userId = socket.data.user.sub;
console.log(`User ${userId} connected: ${socket.id}`);
// Auto-join user's personal room (for direct notifications)
socket.join(`user:${userId}`);
// Join a workspace room
socket.on('workspace:join', ({ workspaceId }) => {
socket.join(`workspace:${workspaceId}`);
socket.data.workspaceId = workspaceId;
// Broadcast updated online users list to workspace
const users = getOnlineUsersInWorkspace(io, workspaceId);
io.to(`workspace:${workspaceId}`).emit('online_users', { users });
});
socket.on('workspace:leave', ({ workspaceId }) => {
socket.leave(`workspace:${workspaceId}`);
});
socket.on('user:typing', ({ taskId }) => {
const workspaceId = socket.data.workspaceId;
if (workspaceId) {
socket.to(`workspace:${workspaceId}`).emit('user:typing', {
userId,
taskId,
userName: socket.data.user.email,
});
}
});
socket.on('disconnect', () => {
const workspaceId = socket.data.workspaceId;
if (workspaceId) {
const users = getOnlineUsersInWorkspace(io, workspaceId);
io.to(`workspace:${workspaceId}`).emit('online_users', { users });
}
});
});
// ── Broadcast helpers used by task controllers ────────────────────────────
function broadcastTaskCreated(workspaceId, task) {
io.to(`workspace:${workspaceId}`).emit('task:created', { task });
}
function broadcastTaskUpdated(workspaceId, taskId, changes) {
io.to(`workspace:${workspaceId}`).emit('task:updated', { taskId, changes });
}
function broadcastTaskDeleted(workspaceId, taskId) {
io.to(`workspace:${workspaceId}`).emit('task:deleted', { taskId });
}
function sendNotification(userId, notification) {
io.to(`user:${userId}`).emit('notification', notification);
}
function getOnlineUsersInWorkspace(io, workspaceId) {
const room = io.sockets.adapter.rooms.get(`workspace:${workspaceId}`);
if (!room) return [];
return [...room].map(socketId => {
const socket = io.sockets.sockets.get(socketId);
return { id: socket?.data.user.sub, email: socket?.data.user.email };
}).filter(Boolean);
}
// In task controller — emit after DB write:
exports.create = asyncHandler(async (req, res) => {
const task = await Task.create({ ...req.body, user: req.user.sub });
broadcastTaskCreated(req.user.workspaceId, task); // broadcast to all workspace members
created(res, task);
});
module.exports = { httpServer, io };
// ── Angular Socket.io service ─────────────────────────────────────────────
// npm install socket.io-client
import { Injectable, inject, NgZone, OnDestroy } from '@angular/core';
import { io, Socket } from 'socket.io-client';
import { Observable } from 'rxjs';
import { AuthStore } from '../stores/auth.store';
import { environment } from '../../../environments/environment';
@Injectable({ providedIn: 'root' })
export class SocketService implements OnDestroy {
private ngZone = inject(NgZone);
private authStore= inject(AuthStore);
private socket!: Socket;
connect(): void {
if (this.socket?.connected) return;
this.socket = io(environment.wsUrl, {
auth: { token: this.authStore.accessToken() },
transports: ['websocket', 'polling'],
reconnection: true,
reconnectionDelay:1000,
reconnectionAttempts: 5,
});
this.socket.on('connect', () =>
console.log('Socket connected:', this.socket.id));
this.socket.on('connect_error', err =>
console.error('Socket error:', err.message));
this.socket.on('disconnect', reason =>
console.warn('Socket disconnected:', reason));
}
joinWorkspace(workspaceId: string): void {
this.socket?.emit('workspace:join', { workspaceId });
}
leaveWorkspace(workspaceId: string): void {
this.socket?.emit('workspace:leave', { workspaceId });
}
emitTyping(taskId: string): void {
this.socket?.emit('user:typing', { taskId });
}
// Generic observable wrapper — runs callback in Angular zone
on<T>(event: string): Observable<T> {
return new Observable(subscriber => {
this.socket?.on(event, (data: T) => {
this.ngZone.run(() => subscriber.next(data));
});
return () => this.socket?.off(event);
});
}
disconnect(): void { this.socket?.disconnect(); }
ngOnDestroy(): void { this.disconnect(); }
}
// ── Task store: integrate real-time updates ───────────────────────────────
@Injectable({ providedIn: 'root' })
export class TaskStore {
private taskService = inject(TaskService);
private socketService= inject(SocketService);
private _tasks = signal<Task[]>([]);
readonly tasks = this._tasks.asReadonly();
connectRealTime(workspaceId: string): void {
this.socketService.connect();
this.socketService.joinWorkspace(workspaceId);
// Listen for real-time events and update local state
this.socketService.on<{ task: Task }>('task:created')
.pipe(takeUntilDestroyed())
.subscribe(({ task }) => {
this._tasks.update(tasks => [task, ...tasks]);
});
this.socketService.on<{ taskId: string; changes: Partial<Task> }>('task:updated')
.pipe(takeUntilDestroyed())
.subscribe(({ taskId, changes }) => {
this._tasks.update(tasks =>
tasks.map(t => t._id === taskId ? { ...t, ...changes } : t)
);
});
this.socketService.on<{ taskId: string }>('task:deleted')
.pipe(takeUntilDestroyed())
.subscribe(({ taskId }) => {
this._tasks.update(tasks => tasks.filter(t => t._id !== taskId));
});
}
disconnectRealTime(workspaceId: string): void {
this.socketService.leaveWorkspace(workspaceId);
}
}
How It Works
Step 1 — Socket.io Upgrades HTTP to WebSocket
Socket.io starts with an HTTP polling connection, then upgrades to a WebSocket if the environment supports it. Both use the same port as the Express server — the same Node.js HTTP server handles both. The upgrade is transparent to the application code. For environments where WebSockets are blocked (corporate firewalls), Socket.io falls back to long-polling automatically.
Step 2 — Rooms Enable Targeted Broadcasting
Socket.io rooms are named groupings of socket connections. When a user joins a workspace, their socket is added to the room workspace:workspaceId. Broadcasting with io.to('workspace:abc').emit('task:created', data) sends to all sockets in that room — only the users who are currently viewing that workspace receive the event. Personal rooms (user:userId) enable sending to one specific user.
Step 3 — NgZone.run() Triggers Angular Change Detection
Socket.io events fire in a JavaScript event loop callback that runs outside Angular’s zone. When a socket event updates a signal, Angular does not know a change occurred and the template does not re-render. Wrapping the update in ngZone.run(() => signal.set(newValue)) enters Angular’s zone, which notifies the change detection system that an update occurred. The on<T>(event) wrapper in SocketService handles this automatically for all events.
Step 4 — JWT Authentication Prevents Unauthorised Room Access
The Socket.io io.use() middleware runs before any connection event handlers. It verifies the JWT from socket.handshake.auth.token — the same JWT the Angular HTTP interceptor attaches to API requests. If verification fails, next(new Error('Unauthorized')) prevents the connection from establishing. The authenticated user’s data is stored on socket.data.user and available in all event handlers.
Step 5 — Broadcast After DB Write Ensures Consistency
The server only broadcasts task:created after the task is successfully saved to MongoDB. This prevents ghost events — scenarios where a client receives a “task created” event but the database write subsequently fails. The broadcast happens in the same request handler, synchronously after the await Task.create() resolves. If the broadcast fails (client disconnected), the data is still in the database — the client will see it on next page load.
Common Mistakes
Mistake 1 — Not authenticating socket connections
❌ Wrong — any visitor can connect and receive task events:
io.on('connection', socket => {
socket.join('workspace:123'); // no auth — anyone can join!
});
✅ Correct — verify JWT in io.use() middleware:
io.use((socket, next) => {
try { socket.data.user = jwt.verify(socket.handshake.auth.token, JWT_SECRET); next(); }
catch { next(new Error('Unauthorized')); }
});
Mistake 2 — Socket events not triggering Angular change detection
❌ Wrong — signal updated outside zone, template does not re-render:
this.socket.on('task:created', (data) => {
this._tasks.update(tasks => [data.task, ...tasks]); // outside Angular zone!
// Template does not update — OnPush and Default both miss this
});
✅ Correct — enter zone before updating signals:
this.socket.on('task:created', (data) => {
this.ngZone.run(() => this._tasks.update(tasks => [data.task, ...tasks]));
});
Mistake 3 — Not disconnecting socket on component destroy
❌ Wrong — socket subscriptions continue after navigation away:
ngOnInit(): void {
this.store.connectRealTime(this.workspaceId);
// No cleanup — events fire on destroyed component
}
✅ Correct — leave workspace room on destroy:
ngOnDestroy(): void { this.store.disconnectRealTime(this.workspaceId); }
Quick Reference
| Task | Code |
|---|---|
| Create Socket.io server | const io = new Server(httpServer, { cors: { ... } }) |
| Auth middleware | io.use((socket, next) => { jwt.verify(socket.handshake.auth.token, ...) }) |
| Join room | socket.join('workspace:id') |
| Broadcast to room | io.to('workspace:id').emit('task:created', data) |
| Broadcast except sender | socket.to('workspace:id').emit('event', data) |
| Connect (Angular) | io(wsUrl, { auth: { token: accessToken() } }) |
| Listen (Angular) | this.on<T>('event').pipe(takeUntilDestroyed()).subscribe(...) |
| Zone-aware update | ngZone.run(() => signal.set(value)) |