Login and Token Issuance — Access and Refresh Tokens

The login endpoint authenticates credentials and issues tokens. Modern JWT authentication uses two tokens: a short-lived access token (15–30 minutes) sent with every API request, and a long-lived refresh token (7–30 days) used to obtain new access tokens. The access token is stateless — the server verifies it without a database lookup. The refresh token is stored in the database, enabling revocation. When the access token expires, the client uses the refresh token to get a new one silently, maintaining the session without requiring re-login.

Token Creation Functions

# app/auth/tokens.py
import jwt
import uuid
from datetime import datetime, timezone
from app.config import settings

def create_access_token(user_id: int, role: str) -> str:
    """Create a short-lived access token."""
    now = datetime.now(timezone.utc)
    payload = {
        "sub":  str(user_id),
        "role": role,
        "type": "access",
        "iat":  int(now.timestamp()),
        "exp":  int(now.timestamp()) + settings.access_token_expire_minutes * 60,
        "jti":  str(uuid.uuid4()),
    }
    return jwt.encode(payload, settings.secret_key, algorithm=settings.algorithm)

def create_refresh_token(user_id: int, family_id: str | None = None) -> tuple[str, str]:
    """
    Create a long-lived refresh token.
    family_id groups related tokens for rotation attack detection.
    Returns (token_string, jti) — store jti in the database.
    """
    now      = datetime.now(timezone.utc)
    jti      = str(uuid.uuid4())
    family   = family_id or str(uuid.uuid4())   # new family if not provided

    payload = {
        "sub":    str(user_id),
        "type":   "refresh",
        "family": family,
        "iat":    int(now.timestamp()),
        "exp":    int(now.timestamp()) + settings.refresh_token_expire_days * 86400,
        "jti":    jti,
    }
    token = jwt.encode(payload, settings.secret_key, algorithm=settings.algorithm)
    return token, jti
Note: The token family concept groups a chain of refresh tokens. When user logs in, a new family is created. Each token refresh issues a new refresh token in the same family and invalidates the old one. If an attacker steals a refresh token and uses it after it has been rotated, the server detects a “used token” within a known family — and revokes the entire family. This invalidates both the attacker’s token and the legitimate user’s new token, forcing the user to log in again — a safe but recoverable response to a detected attack.
Tip: Store refresh tokens in the database as a RefreshToken model with columns: jti (UUID, unique), user_id, family_id, used_at (timestamp, null = not yet used), revoked_at (null = active), expires_at. The used_at field is the key to reuse detection — if a refresh token is presented and used_at is not null, someone is attempting to reuse a consumed token (rotation attack).
Warning: Never store access tokens in the database. Access tokens are designed to be stateless — storing them defeats their purpose and adds database load. Only refresh tokens need database storage (for revocation). The trade-off: you cannot immediately revoke an individual access token before its expiry. This is acceptable with short access token lifetimes (15 minutes). For applications requiring immediate revocation (account compromise response), maintain a Redis-based access token blacklist.

RefreshToken Model

# app/models/refresh_token.py
from sqlalchemy import String, Boolean, ForeignKey
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.sql import func
from datetime import datetime
from app.database import Base

class RefreshToken(Base):
    __tablename__ = "refresh_tokens"

    id:           Mapped[int]            = mapped_column(primary_key=True)
    jti:          Mapped[str]            = mapped_column(String(36), unique=True, index=True)
    family_id:    Mapped[str]            = mapped_column(String(36), index=True)
    user_id:      Mapped[int]            = mapped_column(
                      ForeignKey("users.id", ondelete="CASCADE"), index=True)
    used_at:      Mapped[datetime | None]= mapped_column(nullable=True, default=None)
    revoked_at:   Mapped[datetime | None]= mapped_column(nullable=True, default=None)
    expires_at:   Mapped[datetime]       = mapped_column()
    created_at:   Mapped[datetime]       = mapped_column(server_default=func.now())

    user: Mapped["User"] = relationship()

    @property
    def is_valid(self) -> bool:
        now = datetime.now()
        return (
            self.revoked_at is None and
            self.used_at    is None and
            self.expires_at  > now
        )

Login Endpoint

from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from pydantic import BaseModel
from app.dependencies import get_db
from app.models.user import User
from app.models.refresh_token import RefreshToken
from app.auth.password import verify_password
from app.auth.tokens import create_access_token, create_refresh_token
from sqlalchemy import select
from datetime import datetime, timezone, timedelta

class LoginRequest(BaseModel):
    email:    str
    password: str

class TokenResponse(BaseModel):
    access_token:  str
    refresh_token: str
    token_type:    str = "bearer"
    expires_in:    int   # seconds until access token expires

@router.post("/login", response_model=TokenResponse)
def login(data: LoginRequest, db: Session = Depends(get_db)):
    # Look up user
    user = db.scalars(
        select(User).where(User.email == data.email.lower())
    ).first()

    # Verify credentials — always run verify_password even if user not found
    # (prevents timing attacks that reveal which emails are registered)
    dummy_hash = "$2b$12$dummyhash.dummyhash.dummyhash.dummyhash.dummyhashdummy"
    password_hash = user.password_hash if user else dummy_hash

    if not user or not verify_password(data.password, password_hash):
        raise HTTPException(
            status_code = status.HTTP_401_UNAUTHORIZED,
            detail      = "Invalid email or password",
            headers     = {"WWW-Authenticate": "Bearer"},
        )

    if not user.is_active:
        raise HTTPException(403, "Account is deactivated")

    # Issue tokens
    access_token = create_access_token(user.id, user.role)
    refresh_token_str, jti = create_refresh_token(user.id)

    # Store refresh token in database
    refresh_token = RefreshToken(
        jti        = jti,
        family_id  = str(uuid.uuid4()),   # new family for new login
        user_id    = user.id,
        expires_at = datetime.now(timezone.utc) + timedelta(days=7),
    )
    db.add(refresh_token)
    db.flush()

    return TokenResponse(
        access_token  = access_token,
        refresh_token = refresh_token_str,
        token_type    = "bearer",
        expires_in    = settings.access_token_expire_minutes * 60,
    )

Common Mistakes

Mistake 1 — Short-circuiting verify_password when user is not found

❌ Wrong — timing attack reveals which emails are registered:

if not user:
    raise HTTPException(401, "Invalid credentials")
# Response in 0.001ms → reveals user does not exist!
# vs: response in 200ms (bcrypt verify time) → user exists but password wrong

✅ Correct — always run bcrypt verify, even for non-existent users.

Mistake 2 — Issuing tokens with no expiry

❌ Wrong — token valid forever:

payload = {"sub": str(user_id)}   # no "exp" claim!

✅ Correct — always include “exp” in every token.

Mistake 3 — Using the same token for both access and refresh

❌ Wrong — one long-lived token that’s both stateless AND revocable:

# Impossible: stateless (no DB check) AND revocable (requires DB check)
token = create_token(user_id, expires_in_days=30)   # 30-day access token!

✅ Correct — short access token (stateless) + long refresh token (database-backed).

Quick Reference

Token Type Lifetime Storage Revocable?
Access token 15–30 minutes Client memory / header Not directly (wait for expiry)
Refresh token 7–30 days Client + database Yes — delete from DB

🧠 Test Yourself

Why should you always call verify_password() even when a user with the given email is not found in the database?