Authenticated WebSocket Rooms — Per-User and Per-Resource Channels

Most real-time features are scoped to specific resources — live comment counts for a specific post, a chat room for a specific channel, the edit status of a specific document. Rather than broadcasting all events to all connected clients (which does not scale and leaks data), organise connections into rooms keyed by resource ID. Clients subscribe to a room when they open the relevant page and unsubscribe when they leave. To scale rooms across multiple server processes (multiple Uvicorn workers), use Redis Pub/Sub as a message bus — each process subscribes to the relevant Redis channels and forwards messages to its local WebSocket connections.

Room-Based ConnectionManager

# app/websockets/room_manager.py
from fastapi import WebSocket
from collections import defaultdict
from typing import Dict, Set
import logging

logger = logging.getLogger(__name__)

class RoomManager:
    """
    Manages WebSocket connections organised into rooms.
    Each room is keyed by a string (e.g. "post:42", "user:5", "channel:general").
    """

    def __init__(self):
        # room_key → set of (user_id, WebSocket) tuples
        self._rooms: Dict[str, Set[tuple]] = defaultdict(set)

    async def join(self, room: str, user_id: int, ws: WebSocket) -> None:
        await ws.accept()
        self._rooms[room].add((user_id, ws))
        logger.debug(f"User {user_id} joined room {room}. Members: {self.room_size(room)}")

    def leave(self, room: str, user_id: int, ws: WebSocket) -> None:
        self._rooms[room].discard((user_id, ws))
        if not self._rooms[room]:
            del self._rooms[room]

    async def broadcast_to_room(self, room: str, message: dict) -> None:
        """Send a message to all clients in a specific room."""
        dead = set()
        for (uid, ws) in list(self._rooms.get(room, set())):
            try:
                await ws.send_json(message)
            except Exception:
                dead.add((uid, ws))
        for item in dead:
            self._rooms[room].discard(item)

    def room_size(self, room: str) -> int:
        return len(self._rooms.get(room, set()))

room_manager = RoomManager()
Note: Room keys are strings with a convention like "post:42", "user:5", or "channel:general" — the resource type and ID. This naming convention prevents collisions between different resource types using the same numeric ID. A client viewing post 42 joins "post:42"; a client viewing user profile 42 joins "user:42". The two rooms are independent despite sharing the same ID number.
Tip: Combine rooms with HTTP endpoints to send room-specific updates from regular REST handlers. When a new comment is posted to post 42 via POST /posts/42/comments, the HTTP handler can call await room_manager.broadcast_to_room("post:42", {...}) to notify all clients currently viewing that post. This pattern connects your REST API to your real-time layer without requiring clients to actively request updates.
Warning: The in-process room manager only works within a single process. With multiple Uvicorn workers, a client on worker 1 viewing post 42 is in worker 1’s room; a client on worker 2 viewing post 42 is in worker 2’s room. When the HTTP handler on worker 3 broadcasts to room “post:42”, only worker 3’s local room receives the message — the other workers’ clients miss the update. Redis Pub/Sub solves this by letting all workers subscribe to and publish on the same channel.

Room Endpoint with Authentication

from fastapi import WebSocket, WebSocketDisconnect, Query
from app.websockets.room_manager import room_manager
import jwt

@app.websocket("/ws/posts/{post_id}/live")
async def post_live_feed(
    websocket: WebSocket,
    post_id:   int,
    token:     str = Query(...),
):
    """
    Clients connect here to receive live updates for a specific post:
    - New comments
    - Like count changes
    - View count updates
    """
    try:
        payload = jwt.decode(token, settings.secret_key, algorithms=["HS256"])
        user_id = int(payload["sub"])
    except jwt.InvalidTokenError:
        await websocket.close(code=1008, reason="Invalid token")
        return

    room = f"post:{post_id}"
    await room_manager.join(room, user_id, websocket)

    try:
        # Send current state on connect
        await websocket.send_json({
            "type":    "room_joined",
            "post_id": post_id,
            "viewers": room_manager.room_size(room),
        })

        while True:
            # Mostly listening — clients can send presence info
            data = await websocket.receive_json()
            if data.get("type") == "ping":
                await websocket.send_json({"type": "pong"})

    except WebSocketDisconnect:
        room_manager.leave(room, user_id, websocket)
        await room_manager.broadcast_to_room(room, {
            "type":    "viewer_left",
            "viewers": room_manager.room_size(room),
        })

# In the REST comment creation handler:
@app.post("/posts/{post_id}/comments")
async def create_comment(post_id: int, comment: CommentCreate, ...):
    new_comment = save_comment(...)
    # Push real-time update to all clients viewing this post
    await room_manager.broadcast_to_room(f"post:{post_id}", {
        "type":    "new_comment",
        "comment": CommentResponse.model_validate(new_comment).model_dump(),
    })
    return new_comment

Scaling with Redis Pub/Sub

import redis.asyncio as aioredis
import asyncio
import json

# Redis Pub/Sub bridge — runs in each Uvicorn worker process
redis = aioredis.from_url(settings.redis_url)

async def redis_listener(channel_pattern: str):
    """
    Subscribe to Redis channel and forward messages to local WebSocket rooms.
    Each worker runs this listener task on startup.
    """
    pubsub = redis.pubsub()
    await pubsub.psubscribe(channel_pattern)   # e.g. "ws:post:*"

    async for message in pubsub.listen():
        if message["type"] != "pmessage":
            continue
        try:
            channel = message["channel"].decode()    # "ws:post:42"
            room    = channel.removeprefix("ws:")    # "post:42"
            data    = json.loads(message["data"])
            await room_manager.broadcast_to_room(room, data)
        except Exception as e:
            logger.error(f"Redis listener error: {e}")

# Publish from any HTTP handler or any worker:
async def publish_to_room(room: str, message: dict) -> None:
    """Publish a message to all workers listening for this room."""
    await redis.publish(f"ws:{room}", json.dumps(message))

Common Mistakes

Mistake 1 — Using in-process rooms with multiple workers

❌ Wrong — rooms not shared across workers:

uvicorn app.main:app --workers 4
# Worker 1: client A in "post:42"
# Worker 3: new comment broadcast
# → Only Worker 3's local clients receive the broadcast

✅ Correct — use Redis Pub/Sub to bridge across worker processes.

Mistake 2 — Not broadcasting viewer count on join/leave

❌ Wrong — clients never know how many others are viewing:

await room_manager.join(room, user_id, ws)
# No announcement — existing clients don't know someone joined

✅ Correct — broadcast viewer_joined and viewer_left events.

Mistake 3 — Large message payloads to all room members

❌ Wrong — sending full post HTML to 500 concurrent viewers on every edit:

await room_manager.broadcast_to_room(room, {"content": full_html})   # MBs × 500!

✅ Correct — send only the changed delta or a “refresh” signal, not full content.

Quick Reference

Operation Code
Join room await room_manager.join("post:42", user_id, ws)
Leave room room_manager.leave("post:42", user_id, ws)
Broadcast to room await room_manager.broadcast_to_room("post:42", msg)
Room key convention "type:id" e.g. "post:42", "user:5"
Scale with Redis Publish to "ws:room-key"; workers subscribe and forward

🧠 Test Yourself

You have 4 Uvicorn workers. 10 clients are connected to room “post:42” — 3 on worker 1, 4 on worker 2, 3 on worker 3. A new comment is created in an HTTP handler on worker 4. Using the in-process room manager, how many WebSocket clients receive the real-time notification?