When a user’s access token expires (typically after 15–30 minutes), the client should silently exchange its refresh token for a new access token without prompting the user to log in again. This token refresh flow maintains long-running sessions while keeping access tokens short-lived (limiting the damage if one is compromised). Implementing this correctly requires: validating the refresh token’s signature and database record, marking the used token as consumed, issuing a new pair of tokens (rotating the refresh token), and detecting reuse attacks where a previously-used token is presented again.
Token Refresh Endpoint
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from sqlalchemy import select
import jwt
router = APIRouter(prefix="/auth", tags=["Auth"])
class RefreshRequest(BaseModel):
refresh_token: str
@router.post("/refresh", response_model=TokenResponse)
def refresh_tokens(data: RefreshRequest, db: Session = Depends(get_db)):
"""Exchange a valid refresh token for new access + refresh tokens."""
# Step 1: Verify JWT signature and expiry
try:
payload = jwt.decode(
data.refresh_token,
settings.secret_key,
algorithms=[settings.algorithm],
)
except jwt.ExpiredSignatureError:
raise HTTPException(401, "Refresh token has expired — please log in again")
except jwt.InvalidTokenError:
raise HTTPException(401, "Invalid refresh token")
if payload.get("type") != "refresh":
raise HTTPException(401, "Not a refresh token")
jti = payload["jti"]
user_id = int(payload["sub"])
family_id = payload["family"]
# Step 2: Look up token in database
db_token = db.scalars(
select(RefreshToken).where(RefreshToken.jti == jti)
).first()
if not db_token:
raise HTTPException(401, "Refresh token not found")
# Step 3: Detect reuse attack — token already used
if db_token.used_at is not None:
# This token was already consumed — potential theft!
# Revoke the entire family (both attacker and legitimate user are logged out)
db.execute(
update(RefreshToken)
.where(RefreshToken.family_id == family_id)
.values(revoked_at=datetime.now(timezone.utc))
)
db.flush()
raise HTTPException(
status_code = 401,
detail = "Refresh token already used — possible token theft detected. "
"Please log in again.",
)
if db_token.revoked_at is not None:
raise HTTPException(401, "Refresh token has been revoked")
if db_token.expires_at < datetime.now(timezone.utc):
raise HTTPException(401, "Refresh token has expired")
# Step 4: Mark current token as used (cannot be used again)
db_token.used_at = datetime.now(timezone.utc)
# Step 5: Issue new tokens (rotate refresh token, same family)
new_access = create_access_token(user_id, db_token.user.role)
new_refresh_str, new_jti = create_refresh_token(user_id, family_id=family_id)
new_db_token = RefreshToken(
jti = new_jti,
family_id = family_id, # same family chain
user_id = user_id,
expires_at = datetime.now(timezone.utc) + timedelta(days=7),
)
db.add(new_db_token)
db.flush()
return TokenResponse(
access_token = new_access,
refresh_token = new_refresh_str,
token_type = "bearer",
expires_in = settings.access_token_expire_minutes * 60,
)
family_id groups a chain of rotated tokens so that detecting a reuse of any token in the family can trigger revocation of the entire family (and therefore all sessions derived from that login).refresh_tokens table from growing indefinitely. Run a scheduled job (using FastAPI’s background tasks or a cron job): DELETE FROM refresh_tokens WHERE expires_at < NOW() - INTERVAL '30 days' OR revoked_at IS NOT NULL. Expired tokens can never be used again, so keeping them serves no security purpose after a grace period for audit logging.Logout Endpoint
class LogoutRequest(BaseModel):
refresh_token: str
@router.post("/logout", status_code=204)
def logout(data: LogoutRequest, db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)):
"""Revoke the provided refresh token (and optionally all user tokens)."""
try:
payload = jwt.decode(
data.refresh_token, settings.secret_key,
algorithms=[settings.algorithm]
)
jti = payload.get("jti")
if jti:
db.execute(
update(RefreshToken)
.where(RefreshToken.jti == jti,
RefreshToken.user_id == current_user.id)
.values(revoked_at=datetime.now(timezone.utc))
)
db.flush()
except jwt.InvalidTokenError:
pass # invalid refresh token on logout — silently ignore
@router.post("/logout-all", status_code=204)
def logout_all_devices(db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)):
"""Revoke ALL refresh tokens for the current user (sign out all devices)."""
db.execute(
update(RefreshToken)
.where(RefreshToken.user_id == current_user.id,
RefreshToken.revoked_at.is_(None))
.values(revoked_at=datetime.now(timezone.utc))
)
db.flush()
Common Mistakes
Mistake 1 — Not marking refresh token as used after issuing new tokens
❌ Wrong — same refresh token usable multiple times:
new_access = create_access_token(user_id, role)
new_refresh, jti = create_refresh_token(user_id)
# Old token never marked as used — can be replayed!
✅ Correct — mark old token as used before issuing new ones.
Mistake 2 — Not revoking the whole family on reuse detection
❌ Wrong — only revoke the specific reused token:
if db_token.used_at:
db_token.revoked_at = datetime.now() # revokes only one token
# Attacker may still hold the newest token in the chain!
✅ Correct — revoke all tokens in the family to invalidate the entire session chain.
Mistake 3 — Accepting expired refresh tokens
❌ Wrong — checking database record but not JWT expiry:
# Skipping jwt.decode entirely and only checking database record
db_token = db.get(RefreshToken, jti)
if db_token and db_token.revoked_at is None:
issue_new_tokens() # token could be expired JWT!
✅ Correct — always decode and verify JWT signature and expiry first.
Quick Reference — Token Refresh Flow
| Step | Action |
|---|---|
| 1 | Verify JWT signature and exp (jwt.decode) |
| 2 | Check type == "refresh" |
| 3 | Look up jti in refresh_tokens table |
| 4 | If used_at is set → revoke family → 401 |
| 5 | If revoked_at is set → 401 |
| 6 | Mark current token as used (used_at = now()) |
| 7 | Issue new access + refresh tokens (same family) |
| 8 | Store new refresh token in DB |