Server-Sent Events — One-Way Real-Time Streams

Server-Sent Events (SSE) stream data from server to client over a regular HTTP connection. Unlike WebSockets (which require a protocol upgrade), SSE uses text/event-stream content type with plain HTTP — the browser keeps the connection open and processes the event stream. Browsers automatically reconnect if the connection drops. FastAPI implements SSE with StreamingResponse, yielding event lines in the SSE format. SSE is ideal for one-way server-push scenarios: live activity feeds, export progress, log tailing, and any use case where the client only needs to receive, not send.

Basic SSE Endpoint

from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
import asyncio
import json
import time

app = FastAPI()

async def event_stream(request: Request):
    """
    Generator that yields SSE-formatted events.
    Each event is one or more lines ending with a blank line.
    """
    while True:
        # Check if client disconnected
        if await request.is_disconnected():
            break

        # Format SSE event:
        # data: {json}
        # (blank line terminates the event)
        data = json.dumps({"time": time.time(), "status": "alive"})
        yield f"data: {data}\n\n"   # double newline = event boundary

        await asyncio.sleep(5)   # send every 5 seconds

@app.get("/events/heartbeat")
async def heartbeat_sse(request: Request):
    return StreamingResponse(
        event_stream(request),
        media_type = "text/event-stream",
        headers    = {
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no",    # disable Nginx buffering
            "Connection": "keep-alive",
        },
    )
Note: The SSE wire format is simple: each event is one or more lines of the form field: value\n, terminated by a blank line (\n\n). The most common field is data: followed by a JSON string. Other supported fields: event: (custom event name, defaults to “message”), id: (event ID for reconnect resumption), and retry: (milliseconds before the browser retries the connection). The browser’s EventSource API parses these fields automatically.
Tip: Set X-Accel-Buffering: no to disable Nginx’s response buffering for SSE responses. Without this header, Nginx buffers the streamed response and the client does not receive events until the buffer fills up or the connection closes — defeating the purpose of streaming. Similarly, set Cache-Control: no-cache to prevent caching of the event stream by CDNs or browser caches.
Warning: Always check await request.is_disconnected() in SSE generator loops. When a client closes the browser tab or navigates away, the SSE connection drops. Without the disconnect check, the server continues generating events indefinitely, holding server resources and the database connections used for each event query. This is a resource leak that becomes noticeable under load when many clients come and go.

Authenticated SSE with Live Data

from fastapi import Depends, Header, HTTPException
from sqlalchemy.orm import Session
import jwt

async def sse_activity_feed(
    request:    Request,
    token:      str | None = Query(default=None),
    db_factory = Depends(lambda: SessionLocal),
):
    """
    Stream real-time activity events for the authenticated user.
    Token provided as query param: /events/activity?token=...
    """
    # Authenticate
    try:
        payload = jwt.decode(token, settings.secret_key, algorithms=["HS256"])
        user_id = int(payload["sub"])
    except (jwt.InvalidTokenError, TypeError):
        yield f"event: error\ndata: {json.dumps({'error': 'Unauthorized'})}\n\n"
        return

    last_checked = time.time()

    while True:
        if await request.is_disconnected():
            break

        # Poll for new notifications since last check
        with SessionLocal() as db:
            new_notifs = db.scalars(
                select(Notification)
                .where(
                    Notification.user_id   == user_id,
                    Notification.created_at > datetime.fromtimestamp(last_checked),
                )
                .order_by(Notification.created_at.asc())
            ).all()

        for notif in new_notifs:
            event_data = {
                "id":      notif.id,
                "type":    notif.event_type,
                "message": notif.message,
                "time":    notif.created_at.isoformat(),
            }
            # SSE with named event type:
            yield f"event: notification\n"
            yield f"id: {notif.id}\n"
            yield f"data: {json.dumps(event_data)}\n\n"

        last_checked = time.time()
        await asyncio.sleep(2)   # poll every 2 seconds

@app.get("/events/activity")
async def activity_sse(request: Request, token: str = Query(...)):
    return StreamingResponse(
        sse_activity_feed(request, token),
        media_type = "text/event-stream",
        headers    = {"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
    )

SSE on the Client (Browser JavaScript)

// Browser EventSource API — connects and auto-reconnects
const token  = localStorage.getItem('access_token');
const events = new EventSource(`/events/activity?token=${token}`);

// Default handler for "message" events (no named event type)
events.onmessage = (e) => {
    const data = JSON.parse(e.data);
    console.log('Message:', data);
};

// Named event handler — matches "event: notification" in SSE stream
events.addEventListener('notification', (e) => {
    const notif = JSON.parse(e.data);
    showNotificationToast(notif.message);
});

// Error handler (also fires on disconnect, before reconnect attempt)
events.onerror = (e) => {
    console.log('SSE error/disconnect — browser will retry automatically');
};

// Close the connection when done
function cleanup() {
    events.close();
}

SSE vs WebSocket vs Polling Comparison

Aspect SSE (text/event-stream) WebSocket (ws://)
Direction Server → Client only Bidirectional
Protocol HTTP/HTTPS WebSocket (upgrade)
Browser reconnect Automatic (built-in) Must implement manually
Proxy/CDN support Full (standard HTTP) Requires proxy config
Implementation complexity Low (StreamingResponse) Medium (connection manager)
Authentication Query param or cookie Query param during upgrade
Best use cases Notifications, feeds, progress Chat, gaming, collaboration

Common Mistakes

Mistake 1 — Not checking is_disconnected() (resource leak)

❌ Wrong — infinite loop after client disconnects:

async def stream():
    while True:
        yield f"data: {get_data()}\n\n"
        await asyncio.sleep(1)   # runs forever even after client left!

✅ Correct — always check disconnect:

async def stream(request: Request):
    while True:
        if await request.is_disconnected(): break
        yield f"data: {get_data()}\n\n"
        await asyncio.sleep(1)   # ✓

Mistake 2 — Missing X-Accel-Buffering: no header (events delayed by Nginx)

❌ Wrong — events batch up in Nginx buffer, delayed delivery:

return StreamingResponse(stream(), media_type="text/event-stream")
# Behind Nginx: events may not arrive until buffer fills!

✅ Correct — add X-Accel-Buffering: no to headers.

Mistake 3 — Creating a new DB session per event (too frequent)

❌ Wrong — one DB connection per second per connected client:

while True:
    with SessionLocal() as db:
        data = db.query(...)   # opens DB connection every iteration
    yield event; await asyncio.sleep(1)   # 1 DB conn/sec × N clients!

✅ Correct — batch queries, poll every 2–5 seconds, or use Redis Pub/Sub to avoid polling entirely.

Quick Reference

Task Code
SSE response StreamingResponse(generator, media_type="text/event-stream")
Event format f"data: {json_string}\n\n"
Named event f"event: notification\ndata: {json}\n\n"
Event with ID f"id: {notif_id}\ndata: {json}\n\n"
Disconnect check if await request.is_disconnected(): break
Disable Nginx buffer headers={"X-Accel-Buffering": "no"}
Browser client new EventSource("/events/stream?token=...")

🧠 Test Yourself

Your SSE endpoint polls the database every 2 seconds. 500 users are simultaneously connected. How many database queries does your server make per minute?