Advanced FastAPI Interview Questions and Answers

๐Ÿ“‹ Table of Contents โ–พ
  1. Questions & Answers
  2. 📝 Knowledge Check

⚡ Advanced FastAPI Interview Questions

This lesson targets mid-to-senior backend roles. Topics include Dependency Injection, JWT authentication, SQLAlchemy integration, middleware, WebSockets, testing, caching, rate limiting, and API versioning. These questions separate developers who build FastAPI APIs from those who architect them.

Questions & Answers

01 What is Dependency Injection in FastAPI? How does it work?

DI FastAPI’s dependency injection system is one of its most powerful features. Dependencies are callable objects (functions, classes) that FastAPI calls automatically before your route handler, injecting their return values. They can themselves declare dependencies โ€” forming a dependency tree.

from fastapi import FastAPI, Depends, HTTPException
from typing import Optional

app = FastAPI()

# Simple dependency โ€” reusable query parameter parsing
def common_pagination(skip: int = 0, limit: int = Query(20, le=100)):
    return {"skip": skip, "limit": limit}

@app.get("/users")
async def list_users(pagination: dict = Depends(common_pagination)):
    return db.get_users(**pagination)

@app.get("/orders")
async def list_orders(pagination: dict = Depends(common_pagination)):
    return db.get_orders(**pagination)  # reused across endpoints

# Class-based dependency (maintains state, cleaner for complex deps)
class DatabaseDep:
    def __init__(self, db_url: str = settings.DATABASE_URL):
        self.session = create_session(db_url)

    def __call__(self) -> Session:
        return self.session

get_db = DatabaseDep()

@app.get("/products")
async def list_products(db: Session = Depends(get_db)):
    return db.query(Product).all()

# Dependency chains (nested dependencies)
async def get_current_user(token: str = Depends(oauth2_scheme)):
    user = verify_token(token)
    if not user:
        raise HTTPException(401, "Invalid token")
    return user

async def get_active_user(user: User = Depends(get_current_user)):
    if not user.is_active:
        raise HTTPException(400, "Inactive user")
    return user

@app.get("/me")
async def read_me(user: User = Depends(get_active_user)):
    return user

02 How do you implement JWT authentication in FastAPI?

Auth

pip install python-jose[cryptography] passlib[bcrypt]
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from datetime import datetime, timedelta

SECRET_KEY = "your-secret-key"
ALGORITHM  = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

pwd_context   = CryptContext(schemes=["bcrypt"])
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/token")
app = FastAPI()

# Token creation
def create_access_token(data: dict, expires_delta: timedelta | None = None):
    to_encode = data.copy()
    expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15))
    to_encode.update({"exp": expire})
    return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)

# Auth dependency โ€” extracts and verifies token
async def get_current_user(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"}
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if not username:
            raise credentials_exception
    except JWTError:
        raise credentials_exception
    user = get_user(username)
    if not user:
        raise credentials_exception
    return user

# Login endpoint โ€” returns token
@app.post("/auth/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    user = authenticate_user(form_data.username, form_data.password)
    if not user:
        raise HTTPException(status_code=401, detail="Incorrect credentials")
    token = create_access_token({"sub": user.username},
                                 timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES))
    return {"access_token": token, "token_type": "bearer"}

# Protected endpoint
@app.get("/users/me")
async def read_me(current_user: User = Depends(get_current_user)):
    return current_user

03 How do you integrate SQLAlchemy with FastAPI? What is the async session pattern?

Database

pip install sqlalchemy asyncpg
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from fastapi import FastAPI, Depends
from contextlib import asynccontextmanager

DATABASE_URL = "postgresql+asyncpg://user:pass@localhost/mydb"
engine = create_async_engine(DATABASE_URL, echo=False, pool_size=10)
AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False)

class Base(DeclarativeBase):
    pass

class User(Base):
    __tablename__ = "users"
    id:    Mapped[int]  = mapped_column(primary_key=True)
    name:  Mapped[str]
    email: Mapped[str]  = mapped_column(unique=True)

# Database session dependency
async def get_db():
    async with AsyncSessionLocal() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise

# Lifespan creates/drops tables
@asynccontextmanager
async def lifespan(app: FastAPI):
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)
    yield
    await engine.dispose()

app = FastAPI(lifespan=lifespan)

# CRUD endpoints using the async session
from sqlalchemy import select

@app.get("/users/{user_id}")
async def get_user(user_id: int, db: AsyncSession = Depends(get_db)):
    result = await db.execute(select(User).where(User.id == user_id))
    user = result.scalar_one_or_none()
    if not user:
        raise HTTPException(404, "User not found")
    return user

@app.post("/users", status_code=201)
async def create_user(user_in: UserCreate, db: AsyncSession = Depends(get_db)):
    user = User(**user_in.model_dump())
    db.add(user)
    await db.flush()   # get ID without committing
    await db.refresh(user)
    return user

04 What is middleware in FastAPI? How do you write custom middleware?

Middleware Middleware processes every request before it reaches a route handler and every response before it reaches the client. FastAPI uses Starlette’s middleware system.

from fastapi import FastAPI, Request
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import Response
import time, uuid

app = FastAPI()

# Option 1: BaseHTTPMiddleware (simpler, slight overhead)
class RequestIDMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        request_id = str(uuid.uuid4())
        request.state.request_id = request_id

        start_time = time.perf_counter()
        response = await call_next(request)    # call the next handler
        process_time = time.perf_counter() - start_time

        response.headers["X-Request-ID"]    = request_id
        response.headers["X-Process-Time"]  = f"{process_time:.4f}s"
        return response

app.add_middleware(RequestIDMiddleware)

# Option 2: Pure ASGI middleware (more performant, lower-level)
class AuthMiddleware:
    def __init__(self, app):
        self.app = app

    async def __call__(self, scope, receive, send):
        if scope["type"] == "http":
            request = Request(scope, receive)
            token = request.headers.get("authorization")
            # validate token ...
        await self.app(scope, receive, send)

app.add_middleware(AuthMiddleware)

# Order matters: middleware is applied in REVERSE registration order
app.add_middleware(CORSMiddleware, allow_origins=["*"])
app.add_middleware(GZipMiddleware, minimum_size=1000)  # applied first
app.add_middleware(RequestIDMiddleware)                 # applied second

05 How do you test FastAPI applications?

Testing FastAPI provides a TestClient (wrapping httpx) for synchronous testing, and AsyncClient for async tests. No server needs to start.

pip install httpx pytest pytest-asyncio
# main.py
from fastapi import FastAPI
app = FastAPI()

@app.get("/items/{item_id}")
async def get_item(item_id: int):
    return {"id": item_id, "name": "Widget"}

# test_main.py
from fastapi.testclient import TestClient
import pytest

client = TestClient(app)

def test_get_item():
    response = client.get("/items/42")
    assert response.status_code == 200
    assert response.json() == {"id": 42, "name": "Widget"}

def test_get_item_invalid():
    response = client.get("/items/not-an-int")
    assert response.status_code == 422  # validation error

# Async testing with pytest-asyncio
import pytest
from httpx import AsyncClient, ASGITransport

@pytest.mark.asyncio
async def test_create_user_async():
    async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as ac:
        response = await ac.post("/users", json={"name": "Alice", "email": "a@b.com"})
    assert response.status_code == 201
    assert response.json()["name"] == "Alice"

# Override dependencies in tests
from fastapi import Depends
def override_get_db():
    yield test_db_session

app.dependency_overrides[get_db] = override_get_db
# Override auth for testing protected routes
app.dependency_overrides[get_current_user] = lambda: User(id=1, name="Test")

06 How do you implement WebSockets in FastAPI?

WebSockets FastAPI supports WebSockets natively via Starlette. WebSocket connections persist and allow bidirectional real-time communication.

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import Any

app = FastAPI()

# Simple WebSocket echo server
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: int):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_text()
            await websocket.send_text(f"Client {client_id} sent: {data}")
    except WebSocketDisconnect:
        print(f"Client {client_id} disconnected")

# Connection manager for broadcasting (chat room example)
class ConnectionManager:
    def __init__(self):
        self.active_connections: dict[str, WebSocket] = {}

    async def connect(self, room: str, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.setdefault(room, []).append(websocket)

    def disconnect(self, room: str, websocket: WebSocket):
        if room in self.active_connections:
            self.active_connections[room].remove(websocket)

    async def broadcast(self, room: str, message: str):
        for ws in self.active_connections.get(room, []):
            await ws.send_text(message)

    async def send_json(self, websocket: WebSocket, data: Any):
        await websocket.send_json(data)

manager = ConnectionManager()

@app.websocket("/chat/{room_id}")
async def chat_room(websocket: WebSocket, room_id: str):
    await manager.connect(room_id, websocket)
    try:
        while True:
            msg = await websocket.receive_text()
            await manager.broadcast(room_id, f"Room {room_id}: {msg}")
    except WebSocketDisconnect:
        manager.disconnect(room_id, websocket)

07 How do you implement caching in FastAPI?

Performance

pip install redis[asyncio] fastapi-cache2
from fastapi import FastAPI, Request, Response
from fastapi_cache import FastAPICache
from fastapi_cache.backends.redis import RedisBackend
from fastapi_cache.decorator import cache
from redis import asyncio as aioredis
from contextlib import asynccontextmanager

@asynccontextmanager
async def lifespan(app: FastAPI):
    redis = aioredis.from_url("redis://localhost")
    FastAPICache.init(RedisBackend(redis), prefix="myapp-cache:")
    yield

app = FastAPI(lifespan=lifespan)

# Cache a route for 60 seconds
@app.get("/products")
@cache(expire=60)
async def list_products():
    return await db.fetch_all("SELECT * FROM products")

# Cache with a custom key builder (per-user cache)
def user_key_builder(func, namespace, request: Request, *args, **kwargs):
    return f"{namespace}:{request.state.user_id}:{request.url.path}"

@app.get("/dashboard")
@cache(expire=300, key_builder=user_key_builder)
async def get_dashboard(current_user = Depends(get_current_user)):
    return await build_dashboard(current_user.id)

# Manual Redis caching (more control)
import json
@app.get("/stats")
async def get_stats(redis: aioredis.Redis = Depends(get_redis)):
    cached = await redis.get("stats")
    if cached:
        return json.loads(cached)
    data = await compute_expensive_stats()
    await redis.setex("stats", 60, json.dumps(data))  # cache 60s
    return data

# HTTP cache headers (browser/CDN caching)
@app.get("/public-data")
async def public_data(response: Response):
    response.headers["Cache-Control"] = "public, max-age=300"
    return {"data": "..."}

08 How do you implement rate limiting in FastAPI?

Security

pip install slowapi
from fastapi import FastAPI, Request
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
from slowapi.middleware import SlowAPIMiddleware

limiter = Limiter(key_func=get_remote_address)  # limit per IP
app = FastAPI()
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
app.add_middleware(SlowAPIMiddleware)

# Per-endpoint rate limits
@app.get("/items")
@limiter.limit("100/minute")           # 100 requests per minute
async def list_items(request: Request):
    return {"items": [...]}

@app.post("/auth/login")
@limiter.limit("5/minute")             # strict: 5 login attempts/minute
async def login(request: Request, form_data: LoginForm):
    return authenticate(form_data)

@app.get("/search")
@limiter.limit("30/minute;300/hour")  # multiple limits
async def search(request: Request, q: str):
    return search_db(q)

# Custom key function (limit per authenticated user, not IP)
def get_user_id(request: Request) -> str:
    return str(request.state.user.id) if hasattr(request.state, "user") else get_remote_address(request)

user_limiter = Limiter(key_func=get_user_id)

@app.get("/api/data")
@user_limiter.limit("1000/day")
async def get_data(request: Request, user = Depends(get_current_user)):
    request.state.user = user
    return fetch_data()

09 How do you handle API versioning in FastAPI?

Architecture

from fastapi import FastAPI
from fastapi.routing import APIRouter

# Approach 1: URL path versioning (most common)
app = FastAPI()

v1_router = APIRouter(prefix="/api/v1")
v2_router = APIRouter(prefix="/api/v2")

@v1_router.get("/users")
async def list_users_v1():
    return [{"id": 1, "name": "Alice"}]   # old format

@v2_router.get("/users")
async def list_users_v2():
    return {"data": [{"id": 1, "name": "Alice"}], "meta": {"total": 1}}  # new format

app.include_router(v1_router)
app.include_router(v2_router)

# Approach 2: Separate FastAPI apps mounted at sub-paths
from fastapi import FastAPI
app_v1 = FastAPI(title="API v1", version="1.0")
app_v2 = FastAPI(title="API v2", version="2.0")

# ... define routes on app_v1 and app_v2 ...

main = FastAPI()
main.mount("/v1", app_v1)
main.mount("/v2", app_v2)

# Approach 3: Header versioning
from fastapi import Header
@app.get("/users")
async def list_users(api_version: str = Header(default="1")):
    if api_version == "2":
        return v2_response()
    return v1_response()

# Approach 4: Query parameter
@app.get("/users")
async def list_users(version: str = "1"):
    return v1_response() if version == "1" else v2_response()

10 What are yield dependencies in FastAPI? When do you use them?

DI Yield dependencies allow setup and teardown code โ€” everything before yield runs before the route handler; the yielded value is injected; everything after yield runs after the response is sent (cleanup).

from fastapi import FastAPI, Depends
from sqlalchemy.ext.asyncio import AsyncSession

app = FastAPI()

# Database session with automatic cleanup
async def get_db():
    async with AsyncSessionLocal() as session:
        try:
            yield session          # inject this into the route handler
            await session.commit() # commit after successful response
        except Exception:
            await session.rollback()
            raise
        # session is closed automatically when the context manager exits

@app.post("/users")
async def create_user(user: UserCreate, db: AsyncSession = Depends(get_db)):
    new_user = User(**user.model_dump())
    db.add(new_user)
    return new_user  # commit happens automatically after this returns

# Resource cleanup โ€” file handle, HTTP client, etc.
async def get_http_client():
    async with httpx.AsyncClient() as client:
        yield client               # inject to route handler
    # client is closed after response (even if an exception occurred)

@app.get("/external-data")
async def fetch_external(client: httpx.AsyncClient = Depends(get_http_client)):
    response = await client.get("https://api.example.com/data")
    return response.json()

# Nested yield dependencies
async def get_transaction(db: AsyncSession = Depends(get_db)):
    async with db.begin():
        yield db    # all DB operations in route handler are wrapped in a transaction

11 How do you implement role-based access control (RBAC) in FastAPI?

Auth

from fastapi import FastAPI, Depends, HTTPException, status
from enum import Enum
from functools import lru_cache

class Role(str, Enum):
    USER    = "user"
    MANAGER = "manager"
    ADMIN   = "admin"

# Permission checker factory
def require_roles(*roles: Role):
    """Returns a dependency that checks if the current user has the required role."""
    async def role_checker(current_user: User = Depends(get_current_user)):
        if current_user.role not in roles:
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail=f"Requires one of: {[r.value for r in roles]}"
            )
        return current_user
    return role_checker

# Apply to routes
@app.get("/admin/users", dependencies=[Depends(require_roles(Role.ADMIN))])
async def list_all_users():
    return db.get_all_users()

@app.delete("/users/{id}")
async def delete_user(
    id: int,
    current_user: User = Depends(require_roles(Role.ADMIN, Role.MANAGER))
):
    db.delete_user(id)
    return {"deleted": id}

# Permission-based (fine-grained)
class Permission(str, Enum):
    READ_USERS   = "read:users"
    WRITE_USERS  = "write:users"
    DELETE_USERS = "delete:users"

def require_permission(permission: Permission):
    async def checker(user: User = Depends(get_current_user)):
        if permission not in user.permissions:
            raise HTTPException(403, f"Missing permission: {permission}")
        return user
    return checker

@app.delete("/posts/{id}")
async def delete_post(
    id: int,
    user: User = Depends(require_permission(Permission.DELETE_USERS))
):
    return db.delete_post(id)

12 How do you handle database migrations with Alembic in a FastAPI project?

Database

pip install alembic
alembic init alembic   # creates alembic/ directory and alembic.ini
# alembic/env.py โ€” configure to use your SQLAlchemy models
from app.database import Base
from app.models import *   # import all models so Alembic can detect them

target_metadata = Base.metadata

# alembic.ini
# sqlalchemy.url = postgresql+psycopg2://user:pass@localhost/mydb

# Generate a migration from your model changes
alembic revision --autogenerate -m "add users table"
# Alembic compares models to current DB schema and generates a migration file

# alembic/versions/abc123_add_users_table.py (generated)
def upgrade():
    op.create_table(
        "users",
        sa.Column("id",    sa.Integer(), nullable=False),
        sa.Column("name",  sa.String(),  nullable=False),
        sa.Column("email", sa.String(),  nullable=False, unique=True),
        sa.PrimaryKeyConstraint("id")
    )

def downgrade():
    op.drop_table("users")

# Apply migrations
alembic upgrade head        # run all pending migrations
alembic upgrade +1          # run next migration only
alembic downgrade -1        # roll back one migration
alembic current             # show current migration
alembic history             # show migration history

# Run migrations on startup (in lifespan or a startup script)
import subprocess
subprocess.run(["alembic", "upgrade", "head"], check=True)

13 What is Server-Sent Events (SSE) in FastAPI?

Realtime SSE is a one-way, server-to-client streaming protocol over HTTP โ€” useful for live updates, notifications, and progress streaming without WebSocket complexity.

from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
import asyncio

app = FastAPI()

async def event_generator(request: Request):
    """Async generator that yields SSE-formatted events."""
    counter = 0
    while True:
        if await request.is_disconnected():
            break
        counter += 1
        # SSE format: data: ...\n\n (double newline terminates an event)
        yield f"data: {{'counter': {counter}, 'time': '{datetime.now().isoformat()}'}}\n\n"
        await asyncio.sleep(1)

@app.get("/events")
async def sse_events(request: Request):
    return StreamingResponse(
        event_generator(request),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection":    "keep-alive",
            "X-Accel-Buffering": "no"   # disable Nginx buffering
        }
    )

# Named events and IDs (proper SSE format)
async def notifications_generator(user_id: int, request: Request):
    event_id = 0
    while not await request.is_disconnected():
        event_id += 1
        notifications = await get_new_notifications(user_id)
        for n in notifications:
            yield f"id: {event_id}\n"
            yield f"event: notification\n"
            yield f"data: {n.model_dump_json()}\n\n"
        await asyncio.sleep(5)

# Client JS: const es = new EventSource('/events');
#            es.addEventListener('notification', e => console.log(JSON.parse(e.data)));

14 How do you implement pagination in FastAPI?

API Design

from fastapi import FastAPI, Depends, Query
from pydantic import BaseModel
from typing import TypeVar, Generic

T = TypeVar("T")

class Page(BaseModel, Generic[T]):
    """Generic paginated response."""
    data:        list[T]
    total:       int
    page:        int
    size:        int
    total_pages: int
    has_next:    bool
    has_prev:    bool

class PaginationParams:
    def __init__(
        self,
        page: int  = Query(1, ge=1, description="Page number"),
        size: int  = Query(20, ge=1, le=100, description="Items per page")
    ):
        self.page   = page
        self.size   = size
        self.offset = (page - 1) * size

# Reusable dependency
def paginate(params: PaginationParams = Depends()):
    return params

@app.get("/users", response_model=Page[UserOut])
async def list_users(
    db:   AsyncSession    = Depends(get_db),
    p:    PaginationParams = Depends(paginate)
):
    total = await db.scalar(select(func.count()).select_from(User))
    users = await db.scalars(select(User).offset(p.offset).limit(p.size))
    return Page(
        data        = list(users),
        total       = total,
        page        = p.page,
        size        = p.size,
        total_pages = (total + p.size - 1) // p.size,
        has_next    = p.page * p.size < total,
        has_prev    = p.page > 1
    )

# Cursor-based pagination (for large datasets โ€” avoids expensive COUNT)
@app.get("/feed")
async def get_feed(after_id: int | None = None, limit: int = Query(20, le=100)):
    query = select(Post).order_by(Post.id.desc()).limit(limit + 1)
    if after_id:
        query = query.where(Post.id < after_id)
    posts = await db.scalars(query)
    posts = list(posts)
    has_more = len(posts) > limit
    return {"data": posts[:limit], "next_cursor": posts[limit - 1].id if has_more else None}

15 How do you validate environment variables and application settings in FastAPI?

Config FastAPI projects typically use pydantic-settings to validate configuration from environment variables with type safety.

pip install pydantic-settings
from pydantic_settings import BaseSettings, SettingsConfigDict
from pydantic import AnyHttpUrl, field_validator
from functools import lru_cache
from typing import Literal

class Settings(BaseSettings):
    # Server
    app_name:    str = "My FastAPI App"
    debug:       bool = False
    environment: Literal["development", "staging", "production"] = "development"
    port:        int = 8000

    # Database
    database_url: str
    db_pool_size: int = 5

    # Auth
    secret_key:              str
    access_token_expire_min: int = 30
    algorithm:               str = "HS256"

    # External services
    redis_url: str = "redis://localhost:6379"
    smtp_host: str = "localhost"

    model_config = SettingsConfigDict(
        env_file=".env",              # read from .env file
        env_file_encoding="utf-8",
        case_sensitive=False          # DATABASE_URL and database_url are the same
    )

    @field_validator("secret_key")
    @classmethod
    def validate_secret_key(cls, v: str) -> str:
        if len(v) < 32:
            raise ValueError("SECRET_KEY must be at least 32 characters")
        return v

# Cached singleton โ€” reads .env once
@lru_cache
def get_settings() -> Settings:
    return Settings()

settings = get_settings()

# Use as a dependency (enables override in tests)
from fastapi import Depends
@app.get("/info")
async def app_info(settings: Settings = Depends(get_settings)):
    return {"app": settings.app_name, "env": settings.environment}

16 How do you structure a production FastAPI project?

Architecture

my_api/
  app/
    api/                    # HTTP layer only
      v1/
        routers/
          users.py          # APIRouter with routes
          products.py
          orders.py
        __init__.py         # mounts all routers
    core/                   # cross-cutting concerns
      config.py             # pydantic-settings
      security.py           # JWT, password hashing
      exceptions.py         # custom exception classes + handlers
      logging.py            # structured logging setup
    db/                     # database layer
      session.py            # engine, AsyncSessionLocal
      base.py               # DeclarativeBase
      migrations/           # alembic
    models/                 # SQLAlchemy ORM models
      user.py
      product.py
    schemas/                # Pydantic request/response models
      user.py               # UserCreate, UserOut, UserUpdate
      product.py
    services/               # business logic (no HTTP awareness)
      user_service.py       # get_user, create_user, update_user
      product_service.py
    repositories/           # data access layer (optional)
      user_repo.py
    middleware/
      request_id.py
      timing.py
    main.py                 # FastAPI app + lifespan
  tests/
    conftest.py             # fixtures, test DB, app overrides
    test_users.py
    test_products.py
  .env
  .env.example
  Dockerfile
  docker-compose.yml
  pyproject.toml

Key separation: Routers handle HTTP (request โ†’ response). Services hold business logic (no FastAPI imports โ€” pure Python). Repositories handle data access. This makes services fully testable without HTTP or DB setup.

17 How do you implement OAuth2 with scopes in FastAPI?

Auth

from fastapi import FastAPI, Depends, Security, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, SecurityScopes
from jose import JWTError, jwt

# Define scopes and their descriptions
app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(
    tokenUrl="/auth/token",
    scopes={
        "users:read":   "Read user information",
        "users:write":  "Create and update users",
        "admin":        "Full admin access"
    }
)

# Auth dependency that validates required scopes
async def get_current_user(
    security_scopes: SecurityScopes,     # injected with required scopes
    token: str = Depends(oauth2_scheme)
):
    authenticate_value = f'Bearer scope="{security_scopes.scope_str}"' \
        if security_scopes.scopes else "Bearer"

    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": authenticate_value}
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        token_scopes = payload.get("scopes", [])
        username:  str = payload.get("sub")
        if not username:
            raise credentials_exception
    except JWTError:
        raise credentials_exception

    # Check all required scopes are present in the token
    for scope in security_scopes.scopes:
        if scope not in token_scopes:
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail=f"Not enough permissions โ€” requires scope: {scope}"
            )
    return get_user(username)

# Protect routes with specific scopes
@app.get("/users/me")
async def read_me(user = Security(get_current_user, scopes=["users:read"])):
    return user

@app.post("/users")
async def create_user(user: UserCreate,
                      _    = Security(get_current_user, scopes=["users:write"])):
    return db.create_user(user)

18 What is OpenAPI schema customisation in FastAPI?

Docs

from fastapi import FastAPI
from fastapi.openapi.utils import get_openapi

app = FastAPI()

# Custom OpenAPI schema
def custom_openapi():
    if app.openapi_schema:
        return app.openapi_schema
    openapi_schema = get_openapi(
        title="My Awesome API",
        version="3.0.0",
        description="API for doing awesome things",
        routes=app.routes,
        tags=[
            {"name": "users",    "description": "User management operations"},
            {"name": "products", "description": "Product catalogue"}
        ]
    )
    # Add security scheme
    openapi_schema["components"]["securitySchemes"] = {
        "BearerAuth": {
            "type": "http",
            "scheme": "bearer",
            "bearerFormat": "JWT"
        }
    }
    app.openapi_schema = openapi_schema
    return app.openapi_schema

app.openapi = custom_openapi

# Customise docs UI
app = FastAPI(
    docs_url="/api/docs",
    redoc_url="/api/redoc",
    swagger_ui_parameters={
        "defaultModelsExpandDepth": -1,   # hide schemas section by default
        "persistAuthorization": True       # keep auth token between refreshes
    }
)

# Exclude endpoints from docs
@app.get("/internal/debug", include_in_schema=False)
async def internal_debug():
    return {"debug": True}

19 How do you handle async database queries efficiently in FastAPI?

Database

from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func, and_
from sqlalchemy.orm import selectinload, joinedload

# Avoid N+1 queries โ€” use eager loading
# โŒ N+1 โ€” queries DB once per order
@app.get("/orders")
async def list_orders_bad(db: AsyncSession = Depends(get_db)):
    orders = (await db.scalars(select(Order))).all()
    for order in orders:
        _ = order.customer  # triggers N separate queries!
    return orders

# โœ… Eager load with selectinload (multiple queries, no JOIN cartesian)
@app.get("/orders")
async def list_orders_good(db: AsyncSession = Depends(get_db)):
    result = await db.scalars(
        select(Order)
        .options(selectinload(Order.customer))   # one query for all customers
        .options(selectinload(Order.items).selectinload(OrderItem.product))
    )
    return result.all()

# โœ… joinedload (single JOIN query โ€” good for many-to-one)
result = await db.scalars(
    select(Order).options(joinedload(Order.customer))
)

# Parallel async queries
async def get_dashboard_data(user_id: int, db: AsyncSession):
    orders_q  = db.scalars(select(Order).where(Order.user_id == user_id))
    stats_q   = db.execute(select(func.count(), func.sum(Order.total))
                           .where(Order.user_id == user_id))
    # Execute both queries in parallel
    orders, stats = await asyncio.gather(orders_q, stats_q)
    return {"orders": list(orders), "stats": stats.one()}

20 How do you implement logging in a FastAPI application?

Observability

pip install structlog
import logging, structlog, time, uuid
from fastapi import FastAPI, Request
from starlette.middleware.base import BaseHTTPMiddleware

# Configure structlog (structured JSON logging)
structlog.configure(
    processors=[
        structlog.contextvars.merge_contextvars,
        structlog.processors.add_log_level,
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.JSONRenderer()   # JSON output in production
    ],
    wrapper_class=structlog.make_filtering_bound_logger(logging.INFO),
)
logger = structlog.get_logger()

app = FastAPI()

# Request logging middleware
class LoggingMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request: Request, call_next):
        request_id = str(uuid.uuid4())
        structlog.contextvars.clear_contextvars()
        structlog.contextvars.bind_contextvars(
            request_id = request_id,
            method     = request.method,
            path       = request.url.path,
            client_ip  = request.client.host
        )
        start = time.perf_counter()
        try:
            response = await call_next(request)
            logger.info("request_completed",
                        status_code=response.status_code,
                        duration_ms=round((time.perf_counter()-start)*1000, 2))
            response.headers["X-Request-ID"] = request_id
            return response
        except Exception as e:
            logger.error("request_failed", error=str(e))
            raise

app.add_middleware(LoggingMiddleware)

# Use structured logging in route handlers
@app.post("/users")
async def create_user(user: UserCreate):
    logger.info("creating_user", email=user.email)
    new_user = await user_service.create(user)
    logger.info("user_created", user_id=new_user.id)
    return new_user

21 How do you implement health check endpoints in FastAPI?

Ops

from fastapi import FastAPI, Depends
from sqlalchemy.ext.asyncio import AsyncSession
import redis.asyncio as aioredis

app = FastAPI()

# Liveness โ€” is the process running? (Kubernetes restarts if this fails)
@app.get("/health/live", tags=["health"])
async def liveness():
    return {"status": "alive"}

# Readiness โ€” can it accept traffic? (K8s stops routing if this fails)
@app.get("/health/ready", tags=["health"])
async def readiness(db: AsyncSession = Depends(get_db)):
    checks = {}
    overall = "ready"

    # Check database
    try:
        await db.execute(text("SELECT 1"))
        checks["database"] = "ok"
    except Exception as e:
        checks["database"] = f"error: {e}"
        overall = "not_ready"

    # Check Redis
    try:
        redis_client = aioredis.from_url(settings.redis_url)
        await redis_client.ping()
        checks["redis"] = "ok"
        await redis_client.close()
    except Exception as e:
        checks["redis"] = f"error: {e}"
        overall = "not_ready"

    status_code = 200 if overall == "ready" else 503
    return JSONResponse(
        status_code=status_code,
        content={"status": overall, "checks": checks, "version": settings.version}
    )

# Detailed health info (internal only โ€” not exposed to public)
@app.get("/health/info", include_in_schema=False)
async def health_info():
    import psutil
    return {
        "cpu_percent":  psutil.cpu_percent(),
        "memory_mb":    psutil.Process().memory_info().rss / 1024 / 1024,
        "uptime_secs":  time.time() - start_time
    }

📝 Knowledge Check

Test your understanding of advanced FastAPI patterns and production techniques.

🧠 Quiz Question 1 of 5

What is the main advantage of using FastAPI’s Dependency Injection system over manually calling functions?





🧠 Quiz Question 2 of 5

What does a yield dependency in FastAPI do with the code after the yield statement?





🧠 Quiz Question 3 of 5

How do you override a FastAPI dependency in tests to replace it with a mock?





🧠 Quiz Question 4 of 5

What does BaseHTTPMiddleware’s dispatch method receive as its second argument, and what must it do with it?





🧠 Quiz Question 5 of 5

Why is pydantic-settings used for configuration in FastAPI projects instead of reading os.environ directly?