FastAPI WebSocket Endpoint — Accept, Send and Receive

FastAPI’s WebSocket support is built into the framework — no additional library required. A WebSocket endpoint is a route decorated with @app.websocket("/path") whose handler receives a WebSocket object. The handler calls await websocket.accept() to complete the handshake, then loops reading messages from and sending messages to the client. The loop ends when the client disconnects, raising WebSocketDisconnect. Authentication is handled by reading a token from query parameters or from the HTTP headers sent during the upgrade request.

Basic WebSocket Endpoint

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import json

app = FastAPI()

@app.websocket("/ws/echo")
async def echo_endpoint(websocket: WebSocket):
    """Echo any received message back to the sender."""
    await websocket.accept()   # complete the HTTP → WebSocket upgrade

    try:
        while True:
            # receive_text blocks until a message arrives (or disconnect)
            message = await websocket.receive_text()
            await websocket.send_text(f"Echo: {message}")

    except WebSocketDisconnect as e:
        print(f"Client disconnected with code {e.code}")

# ── JSON messages ─────────────────────────────────────────────────────────────
@app.websocket("/ws/json")
async def json_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            # receive_json: parses the message as JSON
            data = await websocket.receive_json()
            # send_json: serialises and sends a Python dict/list as JSON
            await websocket.send_json({"received": data, "status": "ok"})
    except WebSocketDisconnect:
        pass
Note: FastAPI’s WebSocket object provides several receive methods: receive_text() returns a str, receive_bytes() returns bytes, receive_json() parses JSON and returns a Python object, and the low-level receive() returns a dict with a type key. Use receive_json() for structured messages (your app’s protocol), receive_text() for plain text, and receive_bytes() for binary data (images, audio). Similarly: send_text(), send_bytes(), send_json().
Tip: Send heartbeat (ping/pong) messages to detect stale connections — connections that were broken without a proper close frame (e.g., mobile device going offline, network interruption). Send a ping every 30 seconds and close the connection if no pong is received within 10 seconds. FastAPI’s WebSocket supports await websocket.send({"type": "websocket.ping"}), or you can implement application-level heartbeats (JSON messages with {"type": "ping"}).
Warning: WebSocket connections are long-lived — never let a WebSocket handler run indefinitely without checking for disconnect. The WebSocketDisconnect exception is raised when the client disconnects, but only when you try to receive or send. If your handler is doing other work (sleeping, waiting on a queue), the disconnect is not detected until the next receive/send attempt. Always use asyncio.wait_for() with a timeout or check connection state periodically in long-running loops.

Authenticated WebSocket Endpoint

from fastapi import WebSocket, WebSocketDisconnect, Query, HTTPException
from fastapi import status as http_status
import jwt

@app.websocket("/ws/notifications")
async def notifications_ws(
    websocket: WebSocket,
    token: str = Query(...),   # client sends token as ?token=... query param
):
    """
    Authenticated WebSocket. Client connects with:
    ws://localhost:8000/ws/notifications?token=eyJhbGc...
    """
    # Authenticate before accepting the connection
    try:
        payload  = jwt.decode(token, settings.secret_key, algorithms=["HS256"])
        user_id  = int(payload["sub"])
    except (jwt.InvalidTokenError, KeyError, ValueError):
        # Close with 1008 Policy Violation before accepting
        await websocket.close(code=1008, reason="Invalid token")
        return

    # Only accept after successful authentication
    await websocket.accept()

    try:
        while True:
            # Wait for any message (or disconnect)
            # For notification push, we mainly send — receive is just for keepalive
            data = await websocket.receive_json()
            if data.get("type") == "ping":
                await websocket.send_json({"type": "pong"})

    except WebSocketDisconnect:
        print(f"User {user_id} disconnected")

Using Dependencies in WebSocket Endpoints

from fastapi import WebSocket, Depends
from sqlalchemy.orm import Session

# Note: Depends() works in WebSocket endpoints but the dependency lifecycle
# is different — the session stays open for the entire WebSocket connection,
# not just a single request.

# Better pattern for WebSockets: create DB sessions manually per message
@app.websocket("/ws/live-comments/{post_id}")
async def live_comments(
    websocket: WebSocket,
    post_id:   int,
    token:     str = Query(...),
):
    # Authenticate
    try:
        payload = jwt.decode(token, settings.secret_key, algorithms=["HS256"])
        user_id = int(payload["sub"])
    except jwt.InvalidTokenError:
        await websocket.close(code=1008); return

    await websocket.accept()

    try:
        while True:
            data = await websocket.receive_json()

            if data.get("type") == "new_comment":
                # Create a new session for this specific DB operation
                with SessionLocal() as db:
                    comment = Comment(
                        post_id   = post_id,
                        author_id = user_id,
                        body      = data["body"],
                    )
                    db.add(comment)
                    db.commit()
                    db.refresh(comment)

                # Send confirmation back
                await websocket.send_json({
                    "type":       "comment_created",
                    "comment_id": comment.id,
                })

    except WebSocketDisconnect:
        pass

Common Mistakes

Mistake 1 — Accepting connection before authenticating

❌ Wrong — 101 upgrade sent before auth check:

await websocket.accept()   # connection accepted!
token = await websocket.receive_text()
if not valid_token(token):
    await websocket.close()   # too late — connection was already accepted

✅ Correct — authenticate first, accept only after:

if not valid_token(token_query_param):
    await websocket.close(code=1008, reason="Unauthorized"); return
await websocket.accept()   # ✓ only after auth passes

Mistake 2 — Not catching WebSocketDisconnect (endpoint crashes)

❌ Wrong — unhandled disconnect raises and logs an error:

async def ws_handler(ws: WebSocket):
    await ws.accept()
    while True:
        msg = await ws.receive_text()   # raises WebSocketDisconnect on close
        # No except → unhandled exception, handler crashes with 500 log

✅ Correct — always wrap the message loop in try/except WebSocketDisconnect.

Mistake 3 — Using a FastAPI DB session across long WebSocket lifetimes

❌ Wrong — one session for the entire multi-minute connection:

async def ws_handler(ws, db: Session = Depends(get_db)):
    # db is one session for the entire connection — stale data, long-held transactions!

✅ Correct — create per-operation sessions within the WebSocket handler loop.

Quick Reference

Operation Code
Accept connection await websocket.accept()
Receive text msg = await websocket.receive_text()
Receive JSON data = await websocket.receive_json()
Send text await websocket.send_text("message")
Send JSON await websocket.send_json({"key": "value"})
Close with code await websocket.close(code=1000)
Handle disconnect except WebSocketDisconnect as e: ...
Auth via query param token: str = Query(...)

🧠 Test Yourself

A client connects to /ws/chat?token=invalid_token. Your endpoint calls await websocket.accept() first, then validates the token, then calls await websocket.close(code=1008). What is wrong with this flow?