Most real-time features are scoped to specific resources — live comment counts for a specific post, a chat room for a specific channel, the edit status of a specific document. Rather than broadcasting all events to all connected clients (which does not scale and leaks data), organise connections into rooms keyed by resource ID. Clients subscribe to a room when they open the relevant page and unsubscribe when they leave. To scale rooms across multiple server processes (multiple Uvicorn workers), use Redis Pub/Sub as a message bus — each process subscribes to the relevant Redis channels and forwards messages to its local WebSocket connections.
Room-Based ConnectionManager
# app/websockets/room_manager.py
from fastapi import WebSocket
from collections import defaultdict
from typing import Dict, Set
import logging
logger = logging.getLogger(__name__)
class RoomManager:
"""
Manages WebSocket connections organised into rooms.
Each room is keyed by a string (e.g. "post:42", "user:5", "channel:general").
"""
def __init__(self):
# room_key → set of (user_id, WebSocket) tuples
self._rooms: Dict[str, Set[tuple]] = defaultdict(set)
async def join(self, room: str, user_id: int, ws: WebSocket) -> None:
await ws.accept()
self._rooms[room].add((user_id, ws))
logger.debug(f"User {user_id} joined room {room}. Members: {self.room_size(room)}")
def leave(self, room: str, user_id: int, ws: WebSocket) -> None:
self._rooms[room].discard((user_id, ws))
if not self._rooms[room]:
del self._rooms[room]
async def broadcast_to_room(self, room: str, message: dict) -> None:
"""Send a message to all clients in a specific room."""
dead = set()
for (uid, ws) in list(self._rooms.get(room, set())):
try:
await ws.send_json(message)
except Exception:
dead.add((uid, ws))
for item in dead:
self._rooms[room].discard(item)
def room_size(self, room: str) -> int:
return len(self._rooms.get(room, set()))
room_manager = RoomManager()
"post:42", "user:5", or "channel:general" — the resource type and ID. This naming convention prevents collisions between different resource types using the same numeric ID. A client viewing post 42 joins "post:42"; a client viewing user profile 42 joins "user:42". The two rooms are independent despite sharing the same ID number.POST /posts/42/comments, the HTTP handler can call await room_manager.broadcast_to_room("post:42", {...}) to notify all clients currently viewing that post. This pattern connects your REST API to your real-time layer without requiring clients to actively request updates.Room Endpoint with Authentication
from fastapi import WebSocket, WebSocketDisconnect, Query
from app.websockets.room_manager import room_manager
import jwt
@app.websocket("/ws/posts/{post_id}/live")
async def post_live_feed(
websocket: WebSocket,
post_id: int,
token: str = Query(...),
):
"""
Clients connect here to receive live updates for a specific post:
- New comments
- Like count changes
- View count updates
"""
try:
payload = jwt.decode(token, settings.secret_key, algorithms=["HS256"])
user_id = int(payload["sub"])
except jwt.InvalidTokenError:
await websocket.close(code=1008, reason="Invalid token")
return
room = f"post:{post_id}"
await room_manager.join(room, user_id, websocket)
try:
# Send current state on connect
await websocket.send_json({
"type": "room_joined",
"post_id": post_id,
"viewers": room_manager.room_size(room),
})
while True:
# Mostly listening — clients can send presence info
data = await websocket.receive_json()
if data.get("type") == "ping":
await websocket.send_json({"type": "pong"})
except WebSocketDisconnect:
room_manager.leave(room, user_id, websocket)
await room_manager.broadcast_to_room(room, {
"type": "viewer_left",
"viewers": room_manager.room_size(room),
})
# In the REST comment creation handler:
@app.post("/posts/{post_id}/comments")
async def create_comment(post_id: int, comment: CommentCreate, ...):
new_comment = save_comment(...)
# Push real-time update to all clients viewing this post
await room_manager.broadcast_to_room(f"post:{post_id}", {
"type": "new_comment",
"comment": CommentResponse.model_validate(new_comment).model_dump(),
})
return new_comment
Scaling with Redis Pub/Sub
import redis.asyncio as aioredis
import asyncio
import json
# Redis Pub/Sub bridge — runs in each Uvicorn worker process
redis = aioredis.from_url(settings.redis_url)
async def redis_listener(channel_pattern: str):
"""
Subscribe to Redis channel and forward messages to local WebSocket rooms.
Each worker runs this listener task on startup.
"""
pubsub = redis.pubsub()
await pubsub.psubscribe(channel_pattern) # e.g. "ws:post:*"
async for message in pubsub.listen():
if message["type"] != "pmessage":
continue
try:
channel = message["channel"].decode() # "ws:post:42"
room = channel.removeprefix("ws:") # "post:42"
data = json.loads(message["data"])
await room_manager.broadcast_to_room(room, data)
except Exception as e:
logger.error(f"Redis listener error: {e}")
# Publish from any HTTP handler or any worker:
async def publish_to_room(room: str, message: dict) -> None:
"""Publish a message to all workers listening for this room."""
await redis.publish(f"ws:{room}", json.dumps(message))
Common Mistakes
Mistake 1 — Using in-process rooms with multiple workers
❌ Wrong — rooms not shared across workers:
uvicorn app.main:app --workers 4
# Worker 1: client A in "post:42"
# Worker 3: new comment broadcast
# → Only Worker 3's local clients receive the broadcast
✅ Correct — use Redis Pub/Sub to bridge across worker processes.
Mistake 2 — Not broadcasting viewer count on join/leave
❌ Wrong — clients never know how many others are viewing:
await room_manager.join(room, user_id, ws)
# No announcement — existing clients don't know someone joined
✅ Correct — broadcast viewer_joined and viewer_left events.
Mistake 3 — Large message payloads to all room members
❌ Wrong — sending full post HTML to 500 concurrent viewers on every edit:
await room_manager.broadcast_to_room(room, {"content": full_html}) # MBs × 500!
✅ Correct — send only the changed delta or a “refresh” signal, not full content.
Quick Reference
| Operation | Code |
|---|---|
| Join room | await room_manager.join("post:42", user_id, ws) |
| Leave room | room_manager.leave("post:42", user_id, ws) |
| Broadcast to room | await room_manager.broadcast_to_room("post:42", msg) |
| Room key convention | "type:id" e.g. "post:42", "user:5" |
| Scale with Redis | Publish to "ws:room-key"; workers subscribe and forward |