Connection Manager — Broadcasting to Multiple Clients

A single WebSocket endpoint serving one client is straightforward — the challenge is serving many clients simultaneously. A real-time chat room, a live notification feed, or a collaborative post editor needs to broadcast messages to multiple connected clients, track who is connected, and clean up when clients disconnect. The ConnectionManager pattern — a class that maintains a registry of active connections — is the standard solution for managing multi-client WebSocket communication in FastAPI.

ConnectionManager Implementation

# app/websockets/manager.py
from fastapi import WebSocket
from typing import Dict, Set
import logging

logger = logging.getLogger(__name__)

class ConnectionManager:
    """
    Manages active WebSocket connections.
    Supports broadcast (all clients) and unicast (specific user).
    Thread-safe for single-process asyncio (no locks needed since
    asyncio is single-threaded).
    """

    def __init__(self):
        # user_id → set of WebSocket connections (one user, multiple tabs)
        self._connections: Dict[int, Set[WebSocket]] = {}

    async def connect(self, websocket: WebSocket, user_id: int) -> None:
        await websocket.accept()
        if user_id not in self._connections:
            self._connections[user_id] = set()
        self._connections[user_id].add(websocket)
        logger.info(f"User {user_id} connected. Active users: {len(self._connections)}")

    def disconnect(self, websocket: WebSocket, user_id: int) -> None:
        if user_id in self._connections:
            self._connections[user_id].discard(websocket)
            if not self._connections[user_id]:
                del self._connections[user_id]
        logger.info(f"User {user_id} disconnected. Active users: {len(self._connections)}")

    async def send_to_user(self, user_id: int, message: dict) -> None:
        """Send a message to all connections of a specific user."""
        if user_id not in self._connections:
            return
        dead: Set[WebSocket] = set()
        for ws in self._connections[user_id].copy():
            try:
                await ws.send_json(message)
            except Exception:
                dead.add(ws)
        # Clean up dead connections
        for ws in dead:
            self._connections[user_id].discard(ws)
        if not self._connections[user_id]:
            del self._connections[user_id]

    async def broadcast(self, message: dict) -> None:
        """Send a message to ALL connected clients."""
        dead: Dict[int, Set[WebSocket]] = {}
        for user_id, websockets in self._connections.copy().items():
            for ws in websockets.copy():
                try:
                    await ws.send_json(message)
                except Exception:
                    if user_id not in dead:
                        dead[user_id] = set()
                    dead[user_id].add(ws)
        for user_id, ws_set in dead.items():
            for ws in ws_set:
                self._connections[user_id].discard(ws)
            if not self._connections[user_id]:
                del self._connections[user_id]

    @property
    def active_user_count(self) -> int:
        return len(self._connections)

# Module-level singleton — shared across all requests in this process
manager = ConnectionManager()
Note: The ConnectionManager is an in-process singleton — it stores connections in Python memory. This works for a single-process deployment. With multiple Uvicorn workers (--workers 4), each worker has its own ConnectionManager with its own set of connections. A message broadcast in worker 1 does not reach clients connected to worker 2. To scale across multiple processes, replace the in-memory store with Redis Pub/Sub (covered in Lesson 4).
Tip: The manager stores a set of WebSocket objects per user (not a single connection) to handle users with multiple open tabs or browser windows. All tabs belong to the same user_id, so a notification to the user reaches all their open tabs simultaneously. When a tab closes, its WebSocket is removed from the set — if it was the last tab, the user entry is deleted from the dict.
Warning: Dead connections (clients that disconnected without a proper close frame — network drop, browser killed) are only detected when a send attempt fails. Without proactive cleanup, the manager accumulates dead connections indefinitely. The manager above handles this by catching send exceptions and removing dead connections. Additionally, implement a heartbeat mechanism that sends periodic pings and removes connections that do not respond within a timeout.

Chat Room Endpoint Using ConnectionManager

from fastapi import WebSocket, WebSocketDisconnect, Query
from app.websockets.manager import manager
import jwt

@app.websocket("/ws/chat")
async def chat_endpoint(
    websocket: WebSocket,
    token:     str = Query(...),
):
    """Global chat room — all connected users see all messages."""
    # Authenticate
    try:
        payload = jwt.decode(token, settings.secret_key, algorithms=["HS256"])
        user_id = int(payload["sub"])
        username = payload.get("name", f"User {user_id}")
    except jwt.InvalidTokenError:
        await websocket.close(code=1008, reason="Invalid token")
        return

    # Connect and announce
    await manager.connect(websocket, user_id)
    await manager.broadcast({
        "type":    "user_joined",
        "user_id": user_id,
        "name":    username,
        "count":   manager.active_user_count,
    })

    try:
        while True:
            data = await websocket.receive_json()

            if data.get("type") == "message":
                # Broadcast chat message to everyone
                await manager.broadcast({
                    "type":    "message",
                    "user_id": user_id,
                    "name":    username,
                    "body":    data.get("body", ""),
                })
            elif data.get("type") == "ping":
                await websocket.send_json({"type": "pong"})

    except WebSocketDisconnect:
        manager.disconnect(websocket, user_id)
        await manager.broadcast({
            "type":    "user_left",
            "user_id": user_id,
            "name":    username,
            "count":   manager.active_user_count,
        })

Notification Push from HTTP Endpoints

# Push a notification to a specific user from any HTTP route handler
@app.post("/posts/{post_id}/like")
async def like_post(
    post_id:      int,
    db:           Session = Depends(get_db),
    current_user: User    = Depends(get_current_user),
):
    post = db.get(Post, post_id)
    # ... toggle like logic ...

    # Real-time notification to the post author
    if post.author_id != current_user.id:
        await manager.send_to_user(post.author_id, {
            "type":       "notification",
            "event":      "post_liked",
            "post_id":    post_id,
            "post_title": post.title,
            "liked_by":   current_user.name,
        })

    return {"liked": True}

Common Mistakes

Mistake 1 — Not removing dead connections after failed sends

❌ Wrong — dead connections accumulate in the set:

async def broadcast(self, msg):
    for ws in self._connections:
        await ws.send_json(msg)   # exception if ws is dead — not caught!

✅ Correct — catch send exceptions and remove dead connections.

Mistake 2 — Broadcasting from a sync function (blocking event loop)

❌ Wrong — calling async manager.broadcast from a sync route:

def sync_route():
    manager.broadcast(msg)   # manager.broadcast is async — not called correctly!

✅ Correct — make the route async, or use asyncio.create_task() from a sync context.

Mistake 3 — Module-level manager not shared (new instance per import)

❌ Wrong — multiple manager instances with separate connection sets:

# router_a.py
manager = ConnectionManager()   # separate instance!
# router_b.py
manager = ConnectionManager()   # different instance — connections not shared

✅ Correct — one module-level singleton imported by all modules.

Quick Reference

Operation Code
Connect user await manager.connect(websocket, user_id)
Disconnect user manager.disconnect(websocket, user_id)
Broadcast to all await manager.broadcast({"type": "...", ...})
Send to one user await manager.send_to_user(user_id, {...})
Active users manager.active_user_count
Push from HTTP route await manager.send_to_user(target_id, msg)

🧠 Test Yourself

User A has two browser tabs open, both connected to the notification WebSocket. When a message is sent to user A’s user_id, how many WebSocket connections receive it?