A single WebSocket endpoint serving one client is straightforward — the challenge is serving many clients simultaneously. A real-time chat room, a live notification feed, or a collaborative post editor needs to broadcast messages to multiple connected clients, track who is connected, and clean up when clients disconnect. The ConnectionManager pattern — a class that maintains a registry of active connections — is the standard solution for managing multi-client WebSocket communication in FastAPI.
ConnectionManager Implementation
# app/websockets/manager.py
from fastapi import WebSocket
from typing import Dict, Set
import logging
logger = logging.getLogger(__name__)
class ConnectionManager:
"""
Manages active WebSocket connections.
Supports broadcast (all clients) and unicast (specific user).
Thread-safe for single-process asyncio (no locks needed since
asyncio is single-threaded).
"""
def __init__(self):
# user_id → set of WebSocket connections (one user, multiple tabs)
self._connections: Dict[int, Set[WebSocket]] = {}
async def connect(self, websocket: WebSocket, user_id: int) -> None:
await websocket.accept()
if user_id not in self._connections:
self._connections[user_id] = set()
self._connections[user_id].add(websocket)
logger.info(f"User {user_id} connected. Active users: {len(self._connections)}")
def disconnect(self, websocket: WebSocket, user_id: int) -> None:
if user_id in self._connections:
self._connections[user_id].discard(websocket)
if not self._connections[user_id]:
del self._connections[user_id]
logger.info(f"User {user_id} disconnected. Active users: {len(self._connections)}")
async def send_to_user(self, user_id: int, message: dict) -> None:
"""Send a message to all connections of a specific user."""
if user_id not in self._connections:
return
dead: Set[WebSocket] = set()
for ws in self._connections[user_id].copy():
try:
await ws.send_json(message)
except Exception:
dead.add(ws)
# Clean up dead connections
for ws in dead:
self._connections[user_id].discard(ws)
if not self._connections[user_id]:
del self._connections[user_id]
async def broadcast(self, message: dict) -> None:
"""Send a message to ALL connected clients."""
dead: Dict[int, Set[WebSocket]] = {}
for user_id, websockets in self._connections.copy().items():
for ws in websockets.copy():
try:
await ws.send_json(message)
except Exception:
if user_id not in dead:
dead[user_id] = set()
dead[user_id].add(ws)
for user_id, ws_set in dead.items():
for ws in ws_set:
self._connections[user_id].discard(ws)
if not self._connections[user_id]:
del self._connections[user_id]
@property
def active_user_count(self) -> int:
return len(self._connections)
# Module-level singleton — shared across all requests in this process
manager = ConnectionManager()
ConnectionManager is an in-process singleton — it stores connections in Python memory. This works for a single-process deployment. With multiple Uvicorn workers (--workers 4), each worker has its own ConnectionManager with its own set of connections. A message broadcast in worker 1 does not reach clients connected to worker 2. To scale across multiple processes, replace the in-memory store with Redis Pub/Sub (covered in Lesson 4).Chat Room Endpoint Using ConnectionManager
from fastapi import WebSocket, WebSocketDisconnect, Query
from app.websockets.manager import manager
import jwt
@app.websocket("/ws/chat")
async def chat_endpoint(
websocket: WebSocket,
token: str = Query(...),
):
"""Global chat room — all connected users see all messages."""
# Authenticate
try:
payload = jwt.decode(token, settings.secret_key, algorithms=["HS256"])
user_id = int(payload["sub"])
username = payload.get("name", f"User {user_id}")
except jwt.InvalidTokenError:
await websocket.close(code=1008, reason="Invalid token")
return
# Connect and announce
await manager.connect(websocket, user_id)
await manager.broadcast({
"type": "user_joined",
"user_id": user_id,
"name": username,
"count": manager.active_user_count,
})
try:
while True:
data = await websocket.receive_json()
if data.get("type") == "message":
# Broadcast chat message to everyone
await manager.broadcast({
"type": "message",
"user_id": user_id,
"name": username,
"body": data.get("body", ""),
})
elif data.get("type") == "ping":
await websocket.send_json({"type": "pong"})
except WebSocketDisconnect:
manager.disconnect(websocket, user_id)
await manager.broadcast({
"type": "user_left",
"user_id": user_id,
"name": username,
"count": manager.active_user_count,
})
Notification Push from HTTP Endpoints
# Push a notification to a specific user from any HTTP route handler
@app.post("/posts/{post_id}/like")
async def like_post(
post_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
post = db.get(Post, post_id)
# ... toggle like logic ...
# Real-time notification to the post author
if post.author_id != current_user.id:
await manager.send_to_user(post.author_id, {
"type": "notification",
"event": "post_liked",
"post_id": post_id,
"post_title": post.title,
"liked_by": current_user.name,
})
return {"liked": True}
Common Mistakes
Mistake 1 — Not removing dead connections after failed sends
❌ Wrong — dead connections accumulate in the set:
async def broadcast(self, msg):
for ws in self._connections:
await ws.send_json(msg) # exception if ws is dead — not caught!
✅ Correct — catch send exceptions and remove dead connections.
Mistake 2 — Broadcasting from a sync function (blocking event loop)
❌ Wrong — calling async manager.broadcast from a sync route:
def sync_route():
manager.broadcast(msg) # manager.broadcast is async — not called correctly!
✅ Correct — make the route async, or use asyncio.create_task() from a sync context.
Mistake 3 — Module-level manager not shared (new instance per import)
❌ Wrong — multiple manager instances with separate connection sets:
# router_a.py
manager = ConnectionManager() # separate instance!
# router_b.py
manager = ConnectionManager() # different instance — connections not shared
✅ Correct — one module-level singleton imported by all modules.
Quick Reference
| Operation | Code |
|---|---|
| Connect user | await manager.connect(websocket, user_id) |
| Disconnect user | manager.disconnect(websocket, user_id) |
| Broadcast to all | await manager.broadcast({"type": "...", ...}) |
| Send to one user | await manager.send_to_user(user_id, {...}) |
| Active users | manager.active_user_count |
| Push from HTTP route | await manager.send_to_user(target_id, msg) |