⚡ Advanced FastAPI Interview Questions
This lesson targets mid-to-senior backend roles. Topics include Dependency Injection, JWT authentication, SQLAlchemy integration, middleware, WebSockets, testing, caching, rate limiting, and API versioning. These questions separate developers who build FastAPI APIs from those who architect them.
Questions & Answers
01 What is Dependency Injection in FastAPI? How does it work? ►
DI FastAPI’s dependency injection system is one of its most powerful features. Dependencies are callable objects (functions, classes) that FastAPI calls automatically before your route handler, injecting their return values. They can themselves declare dependencies โ forming a dependency tree.
from fastapi import FastAPI, Depends, HTTPException
from typing import Optional
app = FastAPI()
# Simple dependency โ reusable query parameter parsing
def common_pagination(skip: int = 0, limit: int = Query(20, le=100)):
return {"skip": skip, "limit": limit}
@app.get("/users")
async def list_users(pagination: dict = Depends(common_pagination)):
return db.get_users(**pagination)
@app.get("/orders")
async def list_orders(pagination: dict = Depends(common_pagination)):
return db.get_orders(**pagination) # reused across endpoints
# Class-based dependency (maintains state, cleaner for complex deps)
class DatabaseDep:
def __init__(self, db_url: str = settings.DATABASE_URL):
self.session = create_session(db_url)
def __call__(self) -> Session:
return self.session
get_db = DatabaseDep()
@app.get("/products")
async def list_products(db: Session = Depends(get_db)):
return db.query(Product).all()
# Dependency chains (nested dependencies)
async def get_current_user(token: str = Depends(oauth2_scheme)):
user = verify_token(token)
if not user:
raise HTTPException(401, "Invalid token")
return user
async def get_active_user(user: User = Depends(get_current_user)):
if not user.is_active:
raise HTTPException(400, "Inactive user")
return user
@app.get("/me")
async def read_me(user: User = Depends(get_active_user)):
return user
02 How do you implement JWT authentication in FastAPI? ►
Auth
pip install python-jose[cryptography] passlib[bcrypt]
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from datetime import datetime, timedelta
SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
pwd_context = CryptContext(schemes=["bcrypt"])
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/auth/token")
app = FastAPI()
# Token creation
def create_access_token(data: dict, expires_delta: timedelta | None = None):
to_encode = data.copy()
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15))
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
# Auth dependency โ extracts and verifies token
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"}
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if not username:
raise credentials_exception
except JWTError:
raise credentials_exception
user = get_user(username)
if not user:
raise credentials_exception
return user
# Login endpoint โ returns token
@app.post("/auth/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(form_data.username, form_data.password)
if not user:
raise HTTPException(status_code=401, detail="Incorrect credentials")
token = create_access_token({"sub": user.username},
timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES))
return {"access_token": token, "token_type": "bearer"}
# Protected endpoint
@app.get("/users/me")
async def read_me(current_user: User = Depends(get_current_user)):
return current_user
03 How do you integrate SQLAlchemy with FastAPI? What is the async session pattern? ►
Database
pip install sqlalchemy asyncpg
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from fastapi import FastAPI, Depends
from contextlib import asynccontextmanager
DATABASE_URL = "postgresql+asyncpg://user:pass@localhost/mydb"
engine = create_async_engine(DATABASE_URL, echo=False, pool_size=10)
AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False)
class Base(DeclarativeBase):
pass
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str]
email: Mapped[str] = mapped_column(unique=True)
# Database session dependency
async def get_db():
async with AsyncSessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
# Lifespan creates/drops tables
@asynccontextmanager
async def lifespan(app: FastAPI):
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield
await engine.dispose()
app = FastAPI(lifespan=lifespan)
# CRUD endpoints using the async session
from sqlalchemy import select
@app.get("/users/{user_id}")
async def get_user(user_id: int, db: AsyncSession = Depends(get_db)):
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user:
raise HTTPException(404, "User not found")
return user
@app.post("/users", status_code=201)
async def create_user(user_in: UserCreate, db: AsyncSession = Depends(get_db)):
user = User(**user_in.model_dump())
db.add(user)
await db.flush() # get ID without committing
await db.refresh(user)
return user
04 What is middleware in FastAPI? How do you write custom middleware? ►
Middleware Middleware processes every request before it reaches a route handler and every response before it reaches the client. FastAPI uses Starlette’s middleware system.
from fastapi import FastAPI, Request
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.responses import Response
import time, uuid
app = FastAPI()
# Option 1: BaseHTTPMiddleware (simpler, slight overhead)
class RequestIDMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
request_id = str(uuid.uuid4())
request.state.request_id = request_id
start_time = time.perf_counter()
response = await call_next(request) # call the next handler
process_time = time.perf_counter() - start_time
response.headers["X-Request-ID"] = request_id
response.headers["X-Process-Time"] = f"{process_time:.4f}s"
return response
app.add_middleware(RequestIDMiddleware)
# Option 2: Pure ASGI middleware (more performant, lower-level)
class AuthMiddleware:
def __init__(self, app):
self.app = app
async def __call__(self, scope, receive, send):
if scope["type"] == "http":
request = Request(scope, receive)
token = request.headers.get("authorization")
# validate token ...
await self.app(scope, receive, send)
app.add_middleware(AuthMiddleware)
# Order matters: middleware is applied in REVERSE registration order
app.add_middleware(CORSMiddleware, allow_origins=["*"])
app.add_middleware(GZipMiddleware, minimum_size=1000) # applied first
app.add_middleware(RequestIDMiddleware) # applied second
05 How do you test FastAPI applications? ►
Testing FastAPI provides a TestClient (wrapping httpx) for synchronous testing, and AsyncClient for async tests. No server needs to start.
pip install httpx pytest pytest-asyncio
# main.py
from fastapi import FastAPI
app = FastAPI()
@app.get("/items/{item_id}")
async def get_item(item_id: int):
return {"id": item_id, "name": "Widget"}
# test_main.py
from fastapi.testclient import TestClient
import pytest
client = TestClient(app)
def test_get_item():
response = client.get("/items/42")
assert response.status_code == 200
assert response.json() == {"id": 42, "name": "Widget"}
def test_get_item_invalid():
response = client.get("/items/not-an-int")
assert response.status_code == 422 # validation error
# Async testing with pytest-asyncio
import pytest
from httpx import AsyncClient, ASGITransport
@pytest.mark.asyncio
async def test_create_user_async():
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as ac:
response = await ac.post("/users", json={"name": "Alice", "email": "a@b.com"})
assert response.status_code == 201
assert response.json()["name"] == "Alice"
# Override dependencies in tests
from fastapi import Depends
def override_get_db():
yield test_db_session
app.dependency_overrides[get_db] = override_get_db
# Override auth for testing protected routes
app.dependency_overrides[get_current_user] = lambda: User(id=1, name="Test")
06 How do you implement WebSockets in FastAPI? ►
WebSockets FastAPI supports WebSockets natively via Starlette. WebSocket connections persist and allow bidirectional real-time communication.
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import Any
app = FastAPI()
# Simple WebSocket echo server
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: int):
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Client {client_id} sent: {data}")
except WebSocketDisconnect:
print(f"Client {client_id} disconnected")
# Connection manager for broadcasting (chat room example)
class ConnectionManager:
def __init__(self):
self.active_connections: dict[str, WebSocket] = {}
async def connect(self, room: str, websocket: WebSocket):
await websocket.accept()
self.active_connections.setdefault(room, []).append(websocket)
def disconnect(self, room: str, websocket: WebSocket):
if room in self.active_connections:
self.active_connections[room].remove(websocket)
async def broadcast(self, room: str, message: str):
for ws in self.active_connections.get(room, []):
await ws.send_text(message)
async def send_json(self, websocket: WebSocket, data: Any):
await websocket.send_json(data)
manager = ConnectionManager()
@app.websocket("/chat/{room_id}")
async def chat_room(websocket: WebSocket, room_id: str):
await manager.connect(room_id, websocket)
try:
while True:
msg = await websocket.receive_text()
await manager.broadcast(room_id, f"Room {room_id}: {msg}")
except WebSocketDisconnect:
manager.disconnect(room_id, websocket)
07 How do you implement caching in FastAPI? ►
Performance
pip install redis[asyncio] fastapi-cache2
from fastapi import FastAPI, Request, Response
from fastapi_cache import FastAPICache
from fastapi_cache.backends.redis import RedisBackend
from fastapi_cache.decorator import cache
from redis import asyncio as aioredis
from contextlib import asynccontextmanager
@asynccontextmanager
async def lifespan(app: FastAPI):
redis = aioredis.from_url("redis://localhost")
FastAPICache.init(RedisBackend(redis), prefix="myapp-cache:")
yield
app = FastAPI(lifespan=lifespan)
# Cache a route for 60 seconds
@app.get("/products")
@cache(expire=60)
async def list_products():
return await db.fetch_all("SELECT * FROM products")
# Cache with a custom key builder (per-user cache)
def user_key_builder(func, namespace, request: Request, *args, **kwargs):
return f"{namespace}:{request.state.user_id}:{request.url.path}"
@app.get("/dashboard")
@cache(expire=300, key_builder=user_key_builder)
async def get_dashboard(current_user = Depends(get_current_user)):
return await build_dashboard(current_user.id)
# Manual Redis caching (more control)
import json
@app.get("/stats")
async def get_stats(redis: aioredis.Redis = Depends(get_redis)):
cached = await redis.get("stats")
if cached:
return json.loads(cached)
data = await compute_expensive_stats()
await redis.setex("stats", 60, json.dumps(data)) # cache 60s
return data
# HTTP cache headers (browser/CDN caching)
@app.get("/public-data")
async def public_data(response: Response):
response.headers["Cache-Control"] = "public, max-age=300"
return {"data": "..."}
08 How do you implement rate limiting in FastAPI? ►
Security
pip install slowapi
from fastapi import FastAPI, Request
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
from slowapi.middleware import SlowAPIMiddleware
limiter = Limiter(key_func=get_remote_address) # limit per IP
app = FastAPI()
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
app.add_middleware(SlowAPIMiddleware)
# Per-endpoint rate limits
@app.get("/items")
@limiter.limit("100/minute") # 100 requests per minute
async def list_items(request: Request):
return {"items": [...]}
@app.post("/auth/login")
@limiter.limit("5/minute") # strict: 5 login attempts/minute
async def login(request: Request, form_data: LoginForm):
return authenticate(form_data)
@app.get("/search")
@limiter.limit("30/minute;300/hour") # multiple limits
async def search(request: Request, q: str):
return search_db(q)
# Custom key function (limit per authenticated user, not IP)
def get_user_id(request: Request) -> str:
return str(request.state.user.id) if hasattr(request.state, "user") else get_remote_address(request)
user_limiter = Limiter(key_func=get_user_id)
@app.get("/api/data")
@user_limiter.limit("1000/day")
async def get_data(request: Request, user = Depends(get_current_user)):
request.state.user = user
return fetch_data()
09 How do you handle API versioning in FastAPI? ►
Architecture
from fastapi import FastAPI
from fastapi.routing import APIRouter
# Approach 1: URL path versioning (most common)
app = FastAPI()
v1_router = APIRouter(prefix="/api/v1")
v2_router = APIRouter(prefix="/api/v2")
@v1_router.get("/users")
async def list_users_v1():
return [{"id": 1, "name": "Alice"}] # old format
@v2_router.get("/users")
async def list_users_v2():
return {"data": [{"id": 1, "name": "Alice"}], "meta": {"total": 1}} # new format
app.include_router(v1_router)
app.include_router(v2_router)
# Approach 2: Separate FastAPI apps mounted at sub-paths
from fastapi import FastAPI
app_v1 = FastAPI(title="API v1", version="1.0")
app_v2 = FastAPI(title="API v2", version="2.0")
# ... define routes on app_v1 and app_v2 ...
main = FastAPI()
main.mount("/v1", app_v1)
main.mount("/v2", app_v2)
# Approach 3: Header versioning
from fastapi import Header
@app.get("/users")
async def list_users(api_version: str = Header(default="1")):
if api_version == "2":
return v2_response()
return v1_response()
# Approach 4: Query parameter
@app.get("/users")
async def list_users(version: str = "1"):
return v1_response() if version == "1" else v2_response()
10 What are yield dependencies in FastAPI? When do you use them? ►
DI Yield dependencies allow setup and teardown code โ everything before yield runs before the route handler; the yielded value is injected; everything after yield runs after the response is sent (cleanup).
from fastapi import FastAPI, Depends
from sqlalchemy.ext.asyncio import AsyncSession
app = FastAPI()
# Database session with automatic cleanup
async def get_db():
async with AsyncSessionLocal() as session:
try:
yield session # inject this into the route handler
await session.commit() # commit after successful response
except Exception:
await session.rollback()
raise
# session is closed automatically when the context manager exits
@app.post("/users")
async def create_user(user: UserCreate, db: AsyncSession = Depends(get_db)):
new_user = User(**user.model_dump())
db.add(new_user)
return new_user # commit happens automatically after this returns
# Resource cleanup โ file handle, HTTP client, etc.
async def get_http_client():
async with httpx.AsyncClient() as client:
yield client # inject to route handler
# client is closed after response (even if an exception occurred)
@app.get("/external-data")
async def fetch_external(client: httpx.AsyncClient = Depends(get_http_client)):
response = await client.get("https://api.example.com/data")
return response.json()
# Nested yield dependencies
async def get_transaction(db: AsyncSession = Depends(get_db)):
async with db.begin():
yield db # all DB operations in route handler are wrapped in a transaction
11 How do you implement role-based access control (RBAC) in FastAPI? ►
Auth
from fastapi import FastAPI, Depends, HTTPException, status
from enum import Enum
from functools import lru_cache
class Role(str, Enum):
USER = "user"
MANAGER = "manager"
ADMIN = "admin"
# Permission checker factory
def require_roles(*roles: Role):
"""Returns a dependency that checks if the current user has the required role."""
async def role_checker(current_user: User = Depends(get_current_user)):
if current_user.role not in roles:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Requires one of: {[r.value for r in roles]}"
)
return current_user
return role_checker
# Apply to routes
@app.get("/admin/users", dependencies=[Depends(require_roles(Role.ADMIN))])
async def list_all_users():
return db.get_all_users()
@app.delete("/users/{id}")
async def delete_user(
id: int,
current_user: User = Depends(require_roles(Role.ADMIN, Role.MANAGER))
):
db.delete_user(id)
return {"deleted": id}
# Permission-based (fine-grained)
class Permission(str, Enum):
READ_USERS = "read:users"
WRITE_USERS = "write:users"
DELETE_USERS = "delete:users"
def require_permission(permission: Permission):
async def checker(user: User = Depends(get_current_user)):
if permission not in user.permissions:
raise HTTPException(403, f"Missing permission: {permission}")
return user
return checker
@app.delete("/posts/{id}")
async def delete_post(
id: int,
user: User = Depends(require_permission(Permission.DELETE_USERS))
):
return db.delete_post(id)
12 How do you handle database migrations with Alembic in a FastAPI project? ►
Database
pip install alembic
alembic init alembic # creates alembic/ directory and alembic.ini
# alembic/env.py โ configure to use your SQLAlchemy models
from app.database import Base
from app.models import * # import all models so Alembic can detect them
target_metadata = Base.metadata
# alembic.ini
# sqlalchemy.url = postgresql+psycopg2://user:pass@localhost/mydb
# Generate a migration from your model changes
alembic revision --autogenerate -m "add users table"
# Alembic compares models to current DB schema and generates a migration file
# alembic/versions/abc123_add_users_table.py (generated)
def upgrade():
op.create_table(
"users",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("name", sa.String(), nullable=False),
sa.Column("email", sa.String(), nullable=False, unique=True),
sa.PrimaryKeyConstraint("id")
)
def downgrade():
op.drop_table("users")
# Apply migrations
alembic upgrade head # run all pending migrations
alembic upgrade +1 # run next migration only
alembic downgrade -1 # roll back one migration
alembic current # show current migration
alembic history # show migration history
# Run migrations on startup (in lifespan or a startup script)
import subprocess
subprocess.run(["alembic", "upgrade", "head"], check=True)
13 What is Server-Sent Events (SSE) in FastAPI? ►
Realtime SSE is a one-way, server-to-client streaming protocol over HTTP โ useful for live updates, notifications, and progress streaming without WebSocket complexity.
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
import asyncio
app = FastAPI()
async def event_generator(request: Request):
"""Async generator that yields SSE-formatted events."""
counter = 0
while True:
if await request.is_disconnected():
break
counter += 1
# SSE format: data: ...\n\n (double newline terminates an event)
yield f"data: {{'counter': {counter}, 'time': '{datetime.now().isoformat()}'}}\n\n"
await asyncio.sleep(1)
@app.get("/events")
async def sse_events(request: Request):
return StreamingResponse(
event_generator(request),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no" # disable Nginx buffering
}
)
# Named events and IDs (proper SSE format)
async def notifications_generator(user_id: int, request: Request):
event_id = 0
while not await request.is_disconnected():
event_id += 1
notifications = await get_new_notifications(user_id)
for n in notifications:
yield f"id: {event_id}\n"
yield f"event: notification\n"
yield f"data: {n.model_dump_json()}\n\n"
await asyncio.sleep(5)
# Client JS: const es = new EventSource('/events');
# es.addEventListener('notification', e => console.log(JSON.parse(e.data)));
14 How do you implement pagination in FastAPI? ►
API Design
from fastapi import FastAPI, Depends, Query
from pydantic import BaseModel
from typing import TypeVar, Generic
T = TypeVar("T")
class Page(BaseModel, Generic[T]):
"""Generic paginated response."""
data: list[T]
total: int
page: int
size: int
total_pages: int
has_next: bool
has_prev: bool
class PaginationParams:
def __init__(
self,
page: int = Query(1, ge=1, description="Page number"),
size: int = Query(20, ge=1, le=100, description="Items per page")
):
self.page = page
self.size = size
self.offset = (page - 1) * size
# Reusable dependency
def paginate(params: PaginationParams = Depends()):
return params
@app.get("/users", response_model=Page[UserOut])
async def list_users(
db: AsyncSession = Depends(get_db),
p: PaginationParams = Depends(paginate)
):
total = await db.scalar(select(func.count()).select_from(User))
users = await db.scalars(select(User).offset(p.offset).limit(p.size))
return Page(
data = list(users),
total = total,
page = p.page,
size = p.size,
total_pages = (total + p.size - 1) // p.size,
has_next = p.page * p.size < total,
has_prev = p.page > 1
)
# Cursor-based pagination (for large datasets โ avoids expensive COUNT)
@app.get("/feed")
async def get_feed(after_id: int | None = None, limit: int = Query(20, le=100)):
query = select(Post).order_by(Post.id.desc()).limit(limit + 1)
if after_id:
query = query.where(Post.id < after_id)
posts = await db.scalars(query)
posts = list(posts)
has_more = len(posts) > limit
return {"data": posts[:limit], "next_cursor": posts[limit - 1].id if has_more else None}
15 How do you validate environment variables and application settings in FastAPI? ►
Config FastAPI projects typically use pydantic-settings to validate configuration from environment variables with type safety.
pip install pydantic-settings
from pydantic_settings import BaseSettings, SettingsConfigDict
from pydantic import AnyHttpUrl, field_validator
from functools import lru_cache
from typing import Literal
class Settings(BaseSettings):
# Server
app_name: str = "My FastAPI App"
debug: bool = False
environment: Literal["development", "staging", "production"] = "development"
port: int = 8000
# Database
database_url: str
db_pool_size: int = 5
# Auth
secret_key: str
access_token_expire_min: int = 30
algorithm: str = "HS256"
# External services
redis_url: str = "redis://localhost:6379"
smtp_host: str = "localhost"
model_config = SettingsConfigDict(
env_file=".env", # read from .env file
env_file_encoding="utf-8",
case_sensitive=False # DATABASE_URL and database_url are the same
)
@field_validator("secret_key")
@classmethod
def validate_secret_key(cls, v: str) -> str:
if len(v) < 32:
raise ValueError("SECRET_KEY must be at least 32 characters")
return v
# Cached singleton โ reads .env once
@lru_cache
def get_settings() -> Settings:
return Settings()
settings = get_settings()
# Use as a dependency (enables override in tests)
from fastapi import Depends
@app.get("/info")
async def app_info(settings: Settings = Depends(get_settings)):
return {"app": settings.app_name, "env": settings.environment}
16 How do you structure a production FastAPI project? ►
Architecture
my_api/
app/
api/ # HTTP layer only
v1/
routers/
users.py # APIRouter with routes
products.py
orders.py
__init__.py # mounts all routers
core/ # cross-cutting concerns
config.py # pydantic-settings
security.py # JWT, password hashing
exceptions.py # custom exception classes + handlers
logging.py # structured logging setup
db/ # database layer
session.py # engine, AsyncSessionLocal
base.py # DeclarativeBase
migrations/ # alembic
models/ # SQLAlchemy ORM models
user.py
product.py
schemas/ # Pydantic request/response models
user.py # UserCreate, UserOut, UserUpdate
product.py
services/ # business logic (no HTTP awareness)
user_service.py # get_user, create_user, update_user
product_service.py
repositories/ # data access layer (optional)
user_repo.py
middleware/
request_id.py
timing.py
main.py # FastAPI app + lifespan
tests/
conftest.py # fixtures, test DB, app overrides
test_users.py
test_products.py
.env
.env.example
Dockerfile
docker-compose.yml
pyproject.toml
Key separation: Routers handle HTTP (request โ response). Services hold business logic (no FastAPI imports โ pure Python). Repositories handle data access. This makes services fully testable without HTTP or DB setup.
17 How do you implement OAuth2 with scopes in FastAPI? ►
Auth
from fastapi import FastAPI, Depends, Security, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, SecurityScopes
from jose import JWTError, jwt
# Define scopes and their descriptions
app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(
tokenUrl="/auth/token",
scopes={
"users:read": "Read user information",
"users:write": "Create and update users",
"admin": "Full admin access"
}
)
# Auth dependency that validates required scopes
async def get_current_user(
security_scopes: SecurityScopes, # injected with required scopes
token: str = Depends(oauth2_scheme)
):
authenticate_value = f'Bearer scope="{security_scopes.scope_str}"' \
if security_scopes.scopes else "Bearer"
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": authenticate_value}
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
token_scopes = payload.get("scopes", [])
username: str = payload.get("sub")
if not username:
raise credentials_exception
except JWTError:
raise credentials_exception
# Check all required scopes are present in the token
for scope in security_scopes.scopes:
if scope not in token_scopes:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Not enough permissions โ requires scope: {scope}"
)
return get_user(username)
# Protect routes with specific scopes
@app.get("/users/me")
async def read_me(user = Security(get_current_user, scopes=["users:read"])):
return user
@app.post("/users")
async def create_user(user: UserCreate,
_ = Security(get_current_user, scopes=["users:write"])):
return db.create_user(user)
18 What is OpenAPI schema customisation in FastAPI? ►
Docs
from fastapi import FastAPI
from fastapi.openapi.utils import get_openapi
app = FastAPI()
# Custom OpenAPI schema
def custom_openapi():
if app.openapi_schema:
return app.openapi_schema
openapi_schema = get_openapi(
title="My Awesome API",
version="3.0.0",
description="API for doing awesome things",
routes=app.routes,
tags=[
{"name": "users", "description": "User management operations"},
{"name": "products", "description": "Product catalogue"}
]
)
# Add security scheme
openapi_schema["components"]["securitySchemes"] = {
"BearerAuth": {
"type": "http",
"scheme": "bearer",
"bearerFormat": "JWT"
}
}
app.openapi_schema = openapi_schema
return app.openapi_schema
app.openapi = custom_openapi
# Customise docs UI
app = FastAPI(
docs_url="/api/docs",
redoc_url="/api/redoc",
swagger_ui_parameters={
"defaultModelsExpandDepth": -1, # hide schemas section by default
"persistAuthorization": True # keep auth token between refreshes
}
)
# Exclude endpoints from docs
@app.get("/internal/debug", include_in_schema=False)
async def internal_debug():
return {"debug": True}
19 How do you handle async database queries efficiently in FastAPI? ►
Database
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func, and_
from sqlalchemy.orm import selectinload, joinedload
# Avoid N+1 queries โ use eager loading
# โ N+1 โ queries DB once per order
@app.get("/orders")
async def list_orders_bad(db: AsyncSession = Depends(get_db)):
orders = (await db.scalars(select(Order))).all()
for order in orders:
_ = order.customer # triggers N separate queries!
return orders
# โ
Eager load with selectinload (multiple queries, no JOIN cartesian)
@app.get("/orders")
async def list_orders_good(db: AsyncSession = Depends(get_db)):
result = await db.scalars(
select(Order)
.options(selectinload(Order.customer)) # one query for all customers
.options(selectinload(Order.items).selectinload(OrderItem.product))
)
return result.all()
# โ
joinedload (single JOIN query โ good for many-to-one)
result = await db.scalars(
select(Order).options(joinedload(Order.customer))
)
# Parallel async queries
async def get_dashboard_data(user_id: int, db: AsyncSession):
orders_q = db.scalars(select(Order).where(Order.user_id == user_id))
stats_q = db.execute(select(func.count(), func.sum(Order.total))
.where(Order.user_id == user_id))
# Execute both queries in parallel
orders, stats = await asyncio.gather(orders_q, stats_q)
return {"orders": list(orders), "stats": stats.one()}
20 How do you implement logging in a FastAPI application? ►
Observability
pip install structlog
import logging, structlog, time, uuid
from fastapi import FastAPI, Request
from starlette.middleware.base import BaseHTTPMiddleware
# Configure structlog (structured JSON logging)
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.JSONRenderer() # JSON output in production
],
wrapper_class=structlog.make_filtering_bound_logger(logging.INFO),
)
logger = structlog.get_logger()
app = FastAPI()
# Request logging middleware
class LoggingMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next):
request_id = str(uuid.uuid4())
structlog.contextvars.clear_contextvars()
structlog.contextvars.bind_contextvars(
request_id = request_id,
method = request.method,
path = request.url.path,
client_ip = request.client.host
)
start = time.perf_counter()
try:
response = await call_next(request)
logger.info("request_completed",
status_code=response.status_code,
duration_ms=round((time.perf_counter()-start)*1000, 2))
response.headers["X-Request-ID"] = request_id
return response
except Exception as e:
logger.error("request_failed", error=str(e))
raise
app.add_middleware(LoggingMiddleware)
# Use structured logging in route handlers
@app.post("/users")
async def create_user(user: UserCreate):
logger.info("creating_user", email=user.email)
new_user = await user_service.create(user)
logger.info("user_created", user_id=new_user.id)
return new_user
21 How do you implement health check endpoints in FastAPI? ►
Ops
from fastapi import FastAPI, Depends
from sqlalchemy.ext.asyncio import AsyncSession
import redis.asyncio as aioredis
app = FastAPI()
# Liveness โ is the process running? (Kubernetes restarts if this fails)
@app.get("/health/live", tags=["health"])
async def liveness():
return {"status": "alive"}
# Readiness โ can it accept traffic? (K8s stops routing if this fails)
@app.get("/health/ready", tags=["health"])
async def readiness(db: AsyncSession = Depends(get_db)):
checks = {}
overall = "ready"
# Check database
try:
await db.execute(text("SELECT 1"))
checks["database"] = "ok"
except Exception as e:
checks["database"] = f"error: {e}"
overall = "not_ready"
# Check Redis
try:
redis_client = aioredis.from_url(settings.redis_url)
await redis_client.ping()
checks["redis"] = "ok"
await redis_client.close()
except Exception as e:
checks["redis"] = f"error: {e}"
overall = "not_ready"
status_code = 200 if overall == "ready" else 503
return JSONResponse(
status_code=status_code,
content={"status": overall, "checks": checks, "version": settings.version}
)
# Detailed health info (internal only โ not exposed to public)
@app.get("/health/info", include_in_schema=False)
async def health_info():
import psutil
return {
"cpu_percent": psutil.cpu_percent(),
"memory_mb": psutil.Process().memory_info().rss / 1024 / 1024,
"uptime_secs": time.time() - start_time
}
📝 Knowledge Check
Test your understanding of advanced FastAPI patterns and production techniques.