Token Refresh and Revocation — Secure Session Management

When a user’s access token expires (typically after 15–30 minutes), the client should silently exchange its refresh token for a new access token without prompting the user to log in again. This token refresh flow maintains long-running sessions while keeping access tokens short-lived (limiting the damage if one is compromised). Implementing this correctly requires: validating the refresh token’s signature and database record, marking the used token as consumed, issuing a new pair of tokens (rotating the refresh token), and detecting reuse attacks where a previously-used token is presented again.

Token Refresh Endpoint

from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from sqlalchemy import select
import jwt

router = APIRouter(prefix="/auth", tags=["Auth"])

class RefreshRequest(BaseModel):
    refresh_token: str

@router.post("/refresh", response_model=TokenResponse)
def refresh_tokens(data: RefreshRequest, db: Session = Depends(get_db)):
    """Exchange a valid refresh token for new access + refresh tokens."""
    # Step 1: Verify JWT signature and expiry
    try:
        payload = jwt.decode(
            data.refresh_token,
            settings.secret_key,
            algorithms=[settings.algorithm],
        )
    except jwt.ExpiredSignatureError:
        raise HTTPException(401, "Refresh token has expired — please log in again")
    except jwt.InvalidTokenError:
        raise HTTPException(401, "Invalid refresh token")

    if payload.get("type") != "refresh":
        raise HTTPException(401, "Not a refresh token")

    jti       = payload["jti"]
    user_id   = int(payload["sub"])
    family_id = payload["family"]

    # Step 2: Look up token in database
    db_token = db.scalars(
        select(RefreshToken).where(RefreshToken.jti == jti)
    ).first()

    if not db_token:
        raise HTTPException(401, "Refresh token not found")

    # Step 3: Detect reuse attack — token already used
    if db_token.used_at is not None:
        # This token was already consumed — potential theft!
        # Revoke the entire family (both attacker and legitimate user are logged out)
        db.execute(
            update(RefreshToken)
            .where(RefreshToken.family_id == family_id)
            .values(revoked_at=datetime.now(timezone.utc))
        )
        db.flush()
        raise HTTPException(
            status_code = 401,
            detail      = "Refresh token already used — possible token theft detected. "
                          "Please log in again.",
        )

    if db_token.revoked_at is not None:
        raise HTTPException(401, "Refresh token has been revoked")

    if db_token.expires_at < datetime.now(timezone.utc):
        raise HTTPException(401, "Refresh token has expired")

    # Step 4: Mark current token as used (cannot be used again)
    db_token.used_at = datetime.now(timezone.utc)

    # Step 5: Issue new tokens (rotate refresh token, same family)
    new_access  = create_access_token(user_id, db_token.user.role)
    new_refresh_str, new_jti = create_refresh_token(user_id, family_id=family_id)

    new_db_token = RefreshToken(
        jti        = new_jti,
        family_id  = family_id,   # same family chain
        user_id    = user_id,
        expires_at = datetime.now(timezone.utc) + timedelta(days=7),
    )
    db.add(new_db_token)
    db.flush()

    return TokenResponse(
        access_token  = new_access,
        refresh_token = new_refresh_str,
        token_type    = "bearer",
        expires_in    = settings.access_token_expire_minutes * 60,
    )
Note: The token rotation approach — issuing a new refresh token on every use and marking the old one as consumed — means refresh tokens are single-use. This is more secure than allowing the same refresh token to be used indefinitely. The family_id groups a chain of rotated tokens so that detecting a reuse of any token in the family can trigger revocation of the entire family (and therefore all sessions derived from that login).
Tip: Clean up expired refresh tokens regularly to prevent the refresh_tokens table from growing indefinitely. Run a scheduled job (using FastAPI’s background tasks or a cron job): DELETE FROM refresh_tokens WHERE expires_at < NOW() - INTERVAL '30 days' OR revoked_at IS NOT NULL. Expired tokens can never be used again, so keeping them serves no security purpose after a grace period for audit logging.
Warning: The reuse-attack response (revoking the entire token family) is deliberately aggressive — it logs out the legitimate user too. This is the correct trade-off: if someone is using a refresh token that should have been consumed, either there was a theft or a client bug. Both cases warrant forcing re-authentication. Inform the user with a clear message (“For your security, please log in again”) rather than a generic 401 to minimise support confusion.

Logout Endpoint

class LogoutRequest(BaseModel):
    refresh_token: str

@router.post("/logout", status_code=204)
def logout(data: LogoutRequest, db: Session = Depends(get_db),
           current_user: User = Depends(get_current_user)):
    """Revoke the provided refresh token (and optionally all user tokens)."""
    try:
        payload = jwt.decode(
            data.refresh_token, settings.secret_key,
            algorithms=[settings.algorithm]
        )
        jti = payload.get("jti")
        if jti:
            db.execute(
                update(RefreshToken)
                .where(RefreshToken.jti == jti,
                       RefreshToken.user_id == current_user.id)
                .values(revoked_at=datetime.now(timezone.utc))
            )
            db.flush()
    except jwt.InvalidTokenError:
        pass   # invalid refresh token on logout — silently ignore

@router.post("/logout-all", status_code=204)
def logout_all_devices(db: Session = Depends(get_db),
                       current_user: User = Depends(get_current_user)):
    """Revoke ALL refresh tokens for the current user (sign out all devices)."""
    db.execute(
        update(RefreshToken)
        .where(RefreshToken.user_id == current_user.id,
               RefreshToken.revoked_at.is_(None))
        .values(revoked_at=datetime.now(timezone.utc))
    )
    db.flush()

Common Mistakes

Mistake 1 — Not marking refresh token as used after issuing new tokens

❌ Wrong — same refresh token usable multiple times:

new_access = create_access_token(user_id, role)
new_refresh, jti = create_refresh_token(user_id)
# Old token never marked as used — can be replayed!

✅ Correct — mark old token as used before issuing new ones.

Mistake 2 — Not revoking the whole family on reuse detection

❌ Wrong — only revoke the specific reused token:

if db_token.used_at:
    db_token.revoked_at = datetime.now()   # revokes only one token
    # Attacker may still hold the newest token in the chain!

✅ Correct — revoke all tokens in the family to invalidate the entire session chain.

Mistake 3 — Accepting expired refresh tokens

❌ Wrong — checking database record but not JWT expiry:

# Skipping jwt.decode entirely and only checking database record
db_token = db.get(RefreshToken, jti)
if db_token and db_token.revoked_at is None:
    issue_new_tokens()   # token could be expired JWT!

✅ Correct — always decode and verify JWT signature and expiry first.

Quick Reference — Token Refresh Flow

Step Action
1 Verify JWT signature and exp (jwt.decode)
2 Check type == "refresh"
3 Look up jti in refresh_tokens table
4 If used_at is set → revoke family → 401
5 If revoked_at is set → 401
6 Mark current token as used (used_at = now())
7 Issue new access + refresh tokens (same family)
8 Store new refresh token in DB

🧠 Test Yourself

An attacker steals user B’s refresh token (RT1) before B uses it. B later refreshes and gets RT2, marking RT1 as used. The attacker then presents RT1. What should happen?