⚡ Expert FastAPI Interview Questions
This lesson targets senior engineers and architects. Topics include the ASGI internals, performance benchmarking, Starlette internals, OpenTelemetry, distributed tracing, custom OpenAPI extensions, GraphQL integration, gRPC, event-driven architecture with Kafka, and production deployment patterns. These questions reveal whether you understand FastAPI deeply or just write Python routes.
Questions & Answers
01 What is ASGI and how does FastAPI use it? ►
Internals ASGI (Asynchronous Server Gateway Interface) is the Python standard for async web applications โ the successor to WSGI. It defines a calling convention between async web servers (Uvicorn, Hypercorn, Daphne) and async Python frameworks (FastAPI, Django Channels, Starlette).
ASGI interface:
# An ASGI application is a callable with this signature:
async def app(scope: dict, receive: callable, send: callable) -> None:
# scope: connection info (type, path, headers, method, ...)
# receive: async callable to receive events from the client (request body chunks)
# send: async callable to send events to the client (response start, body chunks)
...
# FastAPI IS an ASGI application โ it can be passed directly to uvicorn
# uvicorn main:app โ uvicorn calls app(scope, receive, send) for each request
# The ASGI lifecycle:
# HTTP: scope["type"] = "http"
# WebSocket: scope["type"] = "websocket"
# Lifespan: scope["type"] = "lifespan" (startup/shutdown events)
# FastAPI wraps Starlette, which implements the ASGI interface:
# Request comes in โ Starlette router โ FastAPI route matching โ
# dependency injection โ pydantic validation โ route handler โ
# response serialisation โ back through middleware โ ASGI send
# Running with different ASGI servers:
# Uvicorn: uvicorn main:app (most popular, uses uvloop)
# Hypercorn: hypercorn main:app (supports HTTP/2, HTTP/3)
# Granian: granian --interface asgi main:app (Rust-based, fastest)
02 How does FastAPI’s dependency injection handle concurrency? What is the dependency cache? ►
DI FastAPI’s DI system caches dependency results within a single request โ if the same dependency appears multiple times in the dependency tree for one request, it is resolved only once.
async def get_db():
async with AsyncSessionLocal() as session:
yield session
# If both dependencies use get_db, it's resolved ONCE per request
async def get_current_user(db: AsyncSession = Depends(get_db)):
...
async def get_permissions(db: AsyncSession = Depends(get_db)):
...
@app.get("/dashboard")
async def dashboard(
user: User = Depends(get_current_user), # uses get_db
permissions: list[str] = Depends(get_permissions) # uses same get_db session
):
# get_db() was called ONCE, not twice โ same session shared
...
# Disable caching for a specific dependency (call it fresh every time)
@app.get("/fresh")
async def endpoint(
data: Data = Depends(get_data, use_cache=False) # always re-run
):
...
# Global dependencies โ applied to all routes
app = FastAPI(dependencies=[Depends(verify_api_key)])
# Or apply to a router
router = APIRouter(dependencies=[Depends(require_auth)])
03 How do you implement OpenTelemetry distributed tracing in FastAPI? ►
Observability
pip install opentelemetry-sdk opentelemetry-instrumentation-fastapi \
opentelemetry-instrumentation-sqlalchemy opentelemetry-exporter-otlp
# instrumentation.py โ must be imported BEFORE app creation
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor
def setup_tracing(app):
provider = TracerProvider()
provider.add_span_processor(
BatchSpanProcessor(OTLPSpanExporter(endpoint="http://otel-collector:4317"))
)
trace.set_tracer_provider(provider)
# Auto-instrument FastAPI (wraps every request in a span)
FastAPIInstrumentor.instrument_app(app)
SQLAlchemyInstrumentor().instrument(engine=engine)
HTTPXClientInstrumentor().instrument()
# main.py
from fastapi import FastAPI
from opentelemetry import trace
app = FastAPI()
setup_tracing(app)
tracer = trace.get_tracer(__name__)
@app.post("/orders")
async def create_order(order: OrderCreate):
with tracer.start_as_current_span("validate-inventory") as span:
span.set_attribute("order.items", len(order.items))
available = await check_inventory(order.items)
span.set_attribute("inventory.available", available)
with tracer.start_as_current_span("create-order"):
result = await order_service.create(order)
return result
04 How do you integrate GraphQL with FastAPI using Strawberry? ►
GraphQL
pip install strawberry-graphql[fastapi]
import strawberry
from strawberry.fastapi import GraphQLRouter
from fastapi import FastAPI, Depends
# Define GraphQL types using Python dataclasses
@strawberry.type
class User:
id: int
name: str
email: str
@strawberry.type
class Query:
@strawberry.field
async def user(self, id: int, info: strawberry.types.Info) -> User | None:
db = info.context["db"]
return await db.get_user(id)
@strawberry.field
async def users(self, info: strawberry.types.Info) -> list[User]:
db = info.context["db"]
return await db.list_users()
@strawberry.type
class Mutation:
@strawberry.mutation
async def create_user(self, name: str, email: str, info: strawberry.types.Info) -> User:
db = info.context["db"]
return await db.create_user(name=name, email=email)
@strawberry.type
class Subscription:
@strawberry.subscription
async def user_created(self, info: strawberry.types.Info):
async for user in info.context["pubsub"].subscribe("user_created"):
yield user
schema = strawberry.Schema(query=Query, mutation=Mutation, subscription=Subscription)
# Mount GraphQL router on FastAPI
async def get_context(db: AsyncSession = Depends(get_db)):
return {"db": db}
graphql_app = GraphQLRouter(schema, context_getter=get_context)
app = FastAPI()
app.include_router(graphql_app, prefix="/graphql")
# Visit /graphql for GraphiQL IDE, POST /graphql for queries
05 How do you implement event-driven architecture with Kafka and FastAPI? ►
Events
pip install aiokafka
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
from fastapi import FastAPI
from contextlib import asynccontextmanager
import json, asyncio
@asynccontextmanager
async def lifespan(app: FastAPI):
# Kafka producer โ started at app startup
app.state.kafka_producer = AIOKafkaProducer(
bootstrap_servers="kafka:9092",
value_serializer=lambda v: json.dumps(v).encode()
)
await app.state.kafka_producer.start()
# Consumer โ runs as a background task
consumer_task = asyncio.create_task(consume_events())
app.state.consumer_task = consumer_task
yield
# Shutdown
await app.state.kafka_producer.stop()
consumer_task.cancel()
app = FastAPI(lifespan=lifespan)
# Publish events from route handlers
@app.post("/orders", status_code=201)
async def create_order(order: OrderCreate, request: Request):
new_order = await order_service.create(order)
# Publish domain event โ other services react to this
await request.app.state.kafka_producer.send(
topic="orders",
key=str(new_order.id).encode(),
value={"event": "order_created", "order_id": new_order.id, "data": order.model_dump()}
)
return new_order
# Consume events (e.g., listen for payment confirmations)
async def consume_events():
consumer = AIOKafkaConsumer(
"payments",
bootstrap_servers="kafka:9092",
group_id="order-service",
value_deserializer=lambda v: json.loads(v.decode())
)
await consumer.start()
try:
async for msg in consumer:
event = msg.value
if event["event"] == "payment_confirmed":
await order_service.mark_paid(event["order_id"])
finally:
await consumer.stop()
06 What is the Repository pattern in FastAPI? How does it improve testability? ►
Architecture The Repository pattern separates data access logic from business logic. Services depend on repository interfaces โ enabling swapping the database implementation in tests without changing business logic.
from abc import ABC, abstractmethod
from typing import Protocol
# Abstract interface (protocol)
class UserRepository(Protocol):
async def get_by_id(self, user_id: int) -> User | None: ...
async def get_by_email(self, email: str) -> User | None: ...
async def create(self, data: UserCreate) -> User: ...
async def update(self, user_id: int, data: UserUpdate) -> User: ...
async def delete(self, user_id: int) -> None: ...
# Concrete implementation โ SQLAlchemy
class SQLUserRepository:
def __init__(self, db: AsyncSession):
self.db = db
async def get_by_id(self, user_id: int) -> User | None:
result = await self.db.execute(select(UserModel).where(UserModel.id == user_id))
return result.scalar_one_or_none()
async def create(self, data: UserCreate) -> User:
user = UserModel(**data.model_dump())
self.db.add(user)
await self.db.flush()
return user
# Business logic โ depends on the protocol, not the implementation
class UserService:
def __init__(self, repo: UserRepository):
self.repo = repo
async def register(self, data: UserCreate) -> User:
existing = await self.repo.get_by_email(data.email)
if existing:
raise ValueError(f"Email {data.email} already registered")
data.password = hash_password(data.password)
return await self.repo.create(data)
# Dependency injection wiring
def get_user_repo(db: AsyncSession = Depends(get_db)) -> UserRepository:
return SQLUserRepository(db)
def get_user_service(repo: UserRepository = Depends(get_user_repo)) -> UserService:
return UserService(repo)
# Test โ inject a mock repository
class InMemoryUserRepository:
def __init__(self): self.users = {}
async def get_by_email(self, email): return self.users.get(email)
async def create(self, data): u = User(**data.model_dump()); self.users[u.email] = u; return u
# Tests use InMemoryUserRepository โ no database needed
07 How do you build and deploy FastAPI with Docker? ►
Deployment
# Dockerfile (multi-stage build for smaller production image)
FROM python:3.12-slim AS builder
WORKDIR /app
RUN pip install uv
COPY pyproject.toml uv.lock ./
RUN uv sync --frozen --no-dev # install only production deps
FROM python:3.12-slim AS runtime
WORKDIR /app
# Non-root user (security best practice)
RUN groupadd -r appuser && useradd -r -g appuser appuser
COPY --from=builder /app/.venv /app/.venv
COPY ./app ./app
ENV PATH="/app/.venv/bin:$PATH"
ENV PYTHONPATH="/app"
USER appuser
EXPOSE 8000
# Use exec form โ receives signals properly
CMD ["uvicorn", "app.main:app",
"--host", "0.0.0.0",
"--port", "8000",
"--workers", "1"] # 1 worker per container; scale with K8s replicas
# docker-compose.yml (development)
services:
api:
build: .
ports: ["8000:8000"]
volumes: ["./app:/app/app"] # hot reload in dev
environment:
DATABASE_URL: postgresql+asyncpg://postgres:pass@db/mydb
REDIS_URL: redis://redis:6379
command: uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload
depends_on: [db, redis]
db:
image: postgres:16-alpine
environment: {POSTGRES_PASSWORD: pass, POSTGRES_DB: mydb}
volumes: [postgres_data:/var/lib/postgresql/data]
redis:
image: redis:7-alpine
08 What is Celery integration with FastAPI? ►
Tasks Celery provides a distributed task queue for CPU-intensive, long-running, or scheduled jobs that should not run in the FastAPI event loop. FastAPI sends tasks to Celery; Celery workers execute them independently.
pip install celery redis
# celery_app.py
from celery import Celery
celery = Celery(
"worker",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/1",
include=["app.tasks"]
)
celery.conf.update(task_serializer="json", result_serializer="json")
# app/tasks.py
from celery_app import celery
import time
@celery.task(bind=True, max_retries=3)
def process_video(self, video_id: int, options: dict):
try:
# Long-running CPU-intensive work
result = encode_video(video_id, **options)
return {"video_id": video_id, "output": result}
except Exception as exc:
raise self.retry(exc=exc, countdown=60)
# FastAPI route โ dispatches task, returns immediately
@app.post("/videos/{video_id}/process", status_code=202)
async def start_processing(video_id: int, options: ProcessOptions):
task = process_video.apply_async(args=[video_id, options.model_dump()])
return {"task_id": task.id, "status": "processing"}
# Check task status
@app.get("/tasks/{task_id}")
async def get_task_status(task_id: str):
task = celery.AsyncResult(task_id)
return {
"task_id": task_id,
"status": task.status,
"result": task.result if task.ready() else None
}
# Start Celery worker
# celery -A celery_app worker --loglevel=info --concurrency=4
09 How do you implement idempotency for POST endpoints in FastAPI? ►
Reliability Idempotency keys prevent duplicate processing when clients retry failed requests โ critical for payments and order creation.
from fastapi import FastAPI, Header, Request, Response
from typing import Optional
import redis.asyncio as aioredis, json
app = FastAPI()
@app.middleware("http")
async def idempotency_middleware(request: Request, call_next):
# Only apply to POST, PUT, PATCH
if request.method not in ("POST", "PUT", "PATCH"):
return await call_next(request)
idempotency_key = request.headers.get("Idempotency-Key")
if not idempotency_key:
return await call_next(request)
redis = request.app.state.redis
cache_key = f"idempotency:{idempotency_key}"
# Check if we've seen this key before
cached = await redis.get(cache_key)
if cached:
data = json.loads(cached)
return Response(
content=json.dumps(data["body"]),
status_code=data["status_code"],
media_type="application/json",
headers={"X-Idempotent-Replayed": "true"}
)
# Process the request
response = await call_next(request)
# Cache the response for 24 hours (only on success)
if 200 <= response.status_code < 300:
body = b""
async for chunk in response.body_iterator:
body += chunk
await redis.setex(cache_key, 86400, json.dumps({
"status_code": response.status_code,
"body": json.loads(body)
}))
return Response(content=body, status_code=response.status_code,
media_type="application/json")
return response
# Client sends: POST /payments Idempotency-Key: uuid4-here
# Retry with same key โ gets same response, charge not duplicated
10 What is the difference between Uvicorn, Gunicorn, and Gunicorn+Uvicorn workers? ►
Deployment
- Uvicorn standalone โ single-process ASGI server. Uses a single event loop. Limited to one CPU core. Best for development (
--reload) or containerised deployments where horizontal scaling is handled externally (Kubernetes). - Gunicorn โ mature WSGI process manager (pre-fork model). Not natively ASGI-compatible. Cannot run FastAPI directly without a worker class.
- Gunicorn + UvicornWorker โ Gunicorn manages multiple Uvicorn worker processes. Each worker has its own event loop. Combines Gunicorn’s process management (restart on crash, graceful shutdown) with Uvicorn’s async performance. Suitable for traditional VM deployments.
# Gunicorn with Uvicorn workers (multi-core VM deployment)
pip install gunicorn
gunicorn app.main:app \
--workers 4 \ # typically 2x or 4x CPU count
--worker-class uvicorn.workers.UvicornWorker \
--bind 0.0.0.0:8000 \
--timeout 120 \
--keep-alive 5 \
--max-requests 1000 \ # restart worker after N requests (prevent memory bloat)
--max-requests-jitter 100 \
--access-logfile - \
--error-logfile -
# In containers (Docker + K8s): prefer single uvicorn process per container
# Scale by increasing replica count โ simpler, more cloud-native
uvicorn app.main:app --host 0.0.0.0 --port 8000 --workers 1
# Newer alternative: Granian (Rust-based, outperforms Gunicorn+Uvicorn)
pip install granian
granian --interface asgi app.main:app --host 0.0.0.0 --port 8000 --workers 4
11 How do you implement a multi-tenant API with FastAPI? ►
Architecture
from fastapi import FastAPI, Depends, Request, HTTPException
from pydantic import BaseModel
app = FastAPI()
class Tenant(BaseModel):
id: int
slug: str
plan: str
db_schema: str # separate schema per tenant
# Tenant resolution dependency โ from subdomain, header, or JWT claim
async def get_current_tenant(
request: Request,
x_tenant_id: str | None = Header(None)
) -> Tenant:
# Method 1: header-based
if x_tenant_id:
tenant = await tenant_service.get_by_id(x_tenant_id)
if not tenant:
raise HTTPException(404, "Tenant not found")
return tenant
# Method 2: subdomain-based (acme.myapp.com)
host = request.headers.get("host", "")
subdomain = host.split(".")[0]
tenant = await tenant_service.get_by_slug(subdomain)
if not tenant:
raise HTTPException(404, "Tenant not found")
return tenant
# DB session with tenant schema
async def get_tenant_db(
tenant: Tenant = Depends(get_current_tenant),
db: AsyncSession = Depends(get_db)
) -> AsyncSession:
# Set the search_path to the tenant's schema
await db.execute(text(f"SET search_path TO {tenant.db_schema}, public"))
return db
@app.get("/orders")
async def list_orders(
tenant: Tenant = Depends(get_current_tenant),
db: AsyncSession = Depends(get_tenant_db)
):
# Query is automatically scoped to tenant's schema
return await db.scalars(select(Order))
# Rate limiting per tenant plan
def check_tenant_rate_limit(tenant: Tenant = Depends(get_current_tenant)):
limits = {"free": "100/day", "pro": "10000/day", "enterprise": "unlimited"}
# Apply limit based on tenant plan
return tenant
12 How do you implement custom Pydantic validators and computed fields? ►
Pydantic
from pydantic import BaseModel, field_validator, model_validator, computed_field
from pydantic import AnyHttpUrl, EmailStr, Field
from typing import Annotated
# Custom type with validation
PositiveDecimal = Annotated[float, Field(gt=0, decimal_places=2)]
class OrderItem(BaseModel):
product_id: int
quantity: int = Field(..., ge=1, le=1000)
unit_price: PositiveDecimal
# Computed field โ calculated from other fields, included in serialisation
@computed_field
@property
def subtotal(self) -> float:
return round(self.quantity * self.unit_price, 2)
class Order(BaseModel):
items: list[OrderItem] = Field(..., min_length=1)
discount_pct: float = Field(0, ge=0, le=100)
delivery_method: str
@field_validator("delivery_method")
@classmethod
def validate_delivery(cls, v: str) -> str:
allowed = {"standard", "express", "next-day"}
if v not in allowed:
raise ValueError(f"delivery_method must be one of: {allowed}")
return v
@model_validator(mode="after")
def validate_minimum_order(self) -> "Order":
total = sum(item.subtotal for item in self.items)
if self.delivery_method == "next-day" and total < 50:
raise ValueError("Next-day delivery requires order total >= ยฃ50")
return self
@computed_field
@property
def total_before_discount(self) -> float:
return round(sum(i.subtotal for i in self.items), 2)
@computed_field
@property
def total(self) -> float:
return round(self.total_before_discount * (1 - self.discount_pct / 100), 2)
13 What are advanced Starlette features used by FastAPI? ►
Starlette FastAPI is built on Starlette. Understanding Starlette gives you access to lower-level features that FastAPI exposes or that you can use directly.
from starlette.applications import Starlette
from starlette.routing import Route, Mount
from starlette.staticfiles import StaticFiles
from starlette.requests import Request
from starlette.responses import JSONResponse
# Mount static files
app.mount("/static", StaticFiles(directory="static"), name="static")
app.mount("/uploads", StaticFiles(directory="uploads"), name="uploads")
# Starlette routing (lower-level than FastAPI routes)
async def homepage(request: Request):
return JSONResponse({"status": "ok"})
# Mount another ASGI app at a sub-path
from fastapi import FastAPI
admin_app = FastAPI()
main_app = FastAPI()
main_app.mount("/admin", admin_app) # admin FastAPI app at /admin
# Starlette middleware (more performant than BaseHTTPMiddleware)
from starlette.middleware import Middleware
from starlette.middleware.trustedhost import TrustedHostMiddleware
from starlette.middleware.httpsredirect import HTTPSRedirectMiddleware
from starlette.middleware.sessions import SessionMiddleware
app.add_middleware(TrustedHostMiddleware, allowed_hosts=["myapp.com", "*.myapp.com"])
app.add_middleware(SessionMiddleware, secret_key="secret") # cookie-based sessions
# Starlette test client (sync) โ used by FastAPI TestClient
from starlette.testclient import TestClient
client = TestClient(app)
# Background tasks via Starlette (same as FastAPI BackgroundTasks)
from starlette.background import BackgroundTask
response = JSONResponse({"result": "ok"})
response.background = BackgroundTask(send_email, to="user@example.com")
14 How do you implement circuit breakers in FastAPI? ►
Resilience
pip install circuitbreaker
from circuitbreaker import circuit, CircuitBreakerError
from fastapi import FastAPI, HTTPException, Depends
import httpx
app = FastAPI()
# Wrap external service calls with circuit breaker
@circuit(failure_threshold=5, recovery_timeout=30, expected_exception=httpx.RequestError)
async def call_payment_service(payload: dict) -> dict:
async with httpx.AsyncClient(timeout=5.0) as client:
response = await client.post("http://payment-service/charge", json=payload)
response.raise_for_status()
return response.json()
@app.post("/checkout")
async def checkout(order: OrderCreate):
try:
payment_result = await call_payment_service(order.payment.model_dump())
return {"order": order, "payment": payment_result}
except CircuitBreakerError:
# Circuit is OPEN โ payment service is down, fail fast
raise HTTPException(
status_code=503,
detail="Payment service temporarily unavailable. Please try again later."
)
except httpx.HTTPStatusError as e:
raise HTTPException(status_code=e.response.status_code, detail=str(e))
# Async circuit breaker with custom logic
from dataclasses import dataclass, field
from datetime import datetime, timedelta
@dataclass
class AsyncCircuitBreaker:
threshold: int = 5
timeout: int = 30
failures: int = 0
last_failure: datetime | None = None
state: str = "closed" # closed, open, half-open
def is_open(self) -> bool:
if self.state == "open":
if self.last_failure and datetime.now() - self.last_failure > timedelta(seconds=self.timeout):
self.state = "half-open"
return False
return True
return False
def record_failure(self):
self.failures += 1
self.last_failure = datetime.now()
if self.failures >= self.threshold:
self.state = "open"
def record_success(self):
self.failures = 0
self.state = "closed"
15 What are the security best practices for a production FastAPI application? ►
Security
- HTTPS always โ terminate TLS at the load balancer or Nginx. Set
HTTPSRedirectMiddlewareto enforce. - Helmet-equivalent headers โ use
securelibrary or manually setX-Frame-Options,X-Content-Type-Options,Strict-Transport-Security,Content-Security-Policy. - Rate limiting โ on all endpoints; stricter on auth (
slowapi). Use Redis store for multi-process deployments. - Input validation โ Pydantic + Field validators prevent injection; never use raw SQL string formatting.
- SQLAlchemy parameterised queries โ always use ORM or
text()with:paramsyntax; never f-string SQL. - Secrets management โ use pydantic-settings; load secrets from AWS Secrets Manager or Vault in production. Never commit
.env. - JWT best practices โ short-lived access tokens (15 min), longer-lived refresh tokens in
HttpOnlycookies. Validateiss,aud,expclaims. - CORS restriction โ never
allow_origins=["*"]in production withallow_credentials=True. - Dependency scanning โ run
pip-audit/ Snyk in CI/CD pipelines. - Error responses โ never expose stack traces or internal details in production error messages. Log them server-side only.
from fastapi.middleware.httpsredirect import HTTPSRedirectMiddleware
from secure import Secure # pip install secure
app.add_middleware(HTTPSRedirectMiddleware)
secure_headers = Secure()
@app.middleware("http")
async def set_secure_headers(request, call_next):
response = await call_next(request)
secure_headers.framework.fastapi(response)
return response
16 How do you profile and optimise FastAPI application performance? ►
Performance
# Load testing with locust or wrk
pip install locust
# locustfile.py
from locust import HttpUser, task, between
class APIUser(HttpUser):
wait_time = between(0.1, 0.5)
@task(3)
def list_products(self):
self.client.get("/products?page=1&size=20")
@task(1)
def create_order(self):
self.client.post("/orders", json={"items": [{"product_id": 1, "qty": 2}]})
# Run: locust -f locustfile.py --host http://localhost:8000
# Profiling with pyinstrument (async-aware)
pip install pyinstrument
from pyinstrument import Profiler
@app.middleware("http")
async def profile_middleware(request: Request, call_next):
if request.query_params.get("profile"): # only when ?profile=true
profiler = Profiler(async_mode="enabled")
with profiler:
response = await call_next(request)
return HTMLResponse(profiler.output_html())
return await call_next(request)
# Key optimisations:
# 1. Async everywhere โ no sync DB calls in async routes
# 2. Connection pooling โ asyncpg pool_size=10 per worker
# 3. N+1 queries โ use selectinload/joinedload (see advanced lesson)
# 4. Redis caching โ cache expensive aggregations and lookups
# 5. Response streaming โ for large payloads, use StreamingResponse
# 6. Pydantic model_config โ use model_config = ConfigDict(frozen=True) for
# immutable models (marginal speedup on serialisation)
# 7. orjson โ faster JSON serialisation
pip install orjson
from fastapi.responses import ORJSONResponse
app = FastAPI(default_response_class=ORJSONResponse)
17 How do you implement event sourcing patterns with FastAPI? ►
Patterns Event sourcing stores the full history of state changes as an immutable event log โ rather than only the current state. Current state is reconstructed by replaying events.
from pydantic import BaseModel
from datetime import datetime
from enum import Enum
from typing import Any
class EventType(str, Enum):
ORDER_CREATED = "order_created"
ITEM_ADDED = "item_added"
PAYMENT_RECEIVED = "payment_received"
ORDER_SHIPPED = "order_shipped"
ORDER_CANCELLED = "order_cancelled"
class DomainEvent(BaseModel):
event_id: str # UUID
event_type: EventType
aggregate_id: int # the entity this event belongs to
payload: dict[str, Any]
occurred_at: datetime = datetime.utcnow()
version: int
# Event store (append-only)
class EventStore:
async def append(self, event: DomainEvent, db: AsyncSession):
row = EventModel(
event_id = event.event_id,
event_type = event.event_type,
aggregate_id = event.aggregate_id,
payload = event.payload,
version = event.version,
occurred_at = event.occurred_at
)
db.add(row)
# Optimistic concurrency: raise if version already exists
await db.flush()
async def get_events(self, aggregate_id: int, db: AsyncSession) -> list[DomainEvent]:
rows = await db.scalars(
select(EventModel)
.where(EventModel.aggregate_id == aggregate_id)
.order_by(EventModel.version)
)
return [DomainEvent.model_validate(r.__dict__) for r in rows]
# Aggregate โ reconstructs state from events
class OrderAggregate:
def __init__(self, order_id: int):
self.id = order_id
self.items = []
self.status = "pending"
self.version = 0
def apply(self, event: DomainEvent):
if event.event_type == EventType.ITEM_ADDED:
self.items.append(event.payload["item"])
elif event.event_type == EventType.PAYMENT_RECEIVED:
self.status = "paid"
elif event.event_type == EventType.ORDER_SHIPPED:
self.status = "shipped"
self.version = event.version
18 How does FastAPI handle request validation errors? How do you customise the 422 response? ►
Validation When request data fails Pydantic validation, FastAPI automatically returns HTTP 422 Unprocessable Entity with a detailed error list. You can customise this response globally.
from fastapi import FastAPI, Request
from fastapi.exceptions import RequestValidationError
from fastapi.responses import JSONResponse
from pydantic import ValidationError
app = FastAPI()
# Default 422 response from FastAPI:
# { "detail": [{ "type": "int_parsing", "loc": ["body", "age"],
# "msg": "Input should be a valid integer",
# "input": "abc", "url": "..." }] }
# Custom 422 handler โ simplified, client-friendly format
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
errors = []
for error in exc.errors():
field = " -> ".join(str(loc) for loc in error["loc"] if loc != "body")
errors.append({
"field": field,
"message": error["msg"],
"type": error["type"]
})
return JSONResponse(
status_code=422,
content={
"error": "VALIDATION_ERROR",
"message": "Request data validation failed",
"errors": errors
}
)
# Validate within a route and return 400 (business rule violation vs schema error)
@app.post("/orders")
async def create_order(order: OrderCreate):
if not await inventory_service.has_stock(order.items):
raise HTTPException(
status_code=400,
detail={
"error": "INSUFFICIENT_STOCK",
"message": "One or more items are out of stock",
"items": [i.product_id for i in order.items if not i.in_stock]
}
)
19 How do you implement a plugin or extension system in FastAPI? ►
Architecture FastAPI’s lifespan, dependency injection, and middleware system make it easy to build pluggable, modular applications where features can be enabled or disabled via configuration.
from fastapi import FastAPI
from contextlib import asynccontextmanager
from typing import Protocol
# Plugin interface (Protocol)
class FastAPIPlugin(Protocol):
async def setup(self, app: FastAPI) -> None: …
async def teardown(self) -> None: …
# Plugin implementations
class RedisPlugin:
def __init__(self, url: str):
self.url = url
self.client = None
async def setup(self, app: FastAPI):
self.client = await aioredis.from_url(self.url)
app.state.redis = self.client
async def teardown(self):
if self.client:
await self.client.close()
class MetricsPlugin:
async def setup(self, app: FastAPI):
from prometheus_client import make_asgi_app
metrics_app = make_asgi_app()
app.mount(“/metrics”, metrics_app)
async def teardown(self): pass
# Plugin registry
class PluginRegistry:
def __init__(self):
self._plugins: list[FastAPIPlugin] = []
def register(self, plugin: FastAPIPlugin) -> “PluginRegistry”:
self._plugins.append(plugin)
return self
def build_lifespan(self):
plugins = self._plugins
@asynccontextmanager
async def lifespan(app: FastAPI):
for plugin in plugins:
await plugin.setup(app)
yield
for plugin in reversed(plugins):
await plugin.teardown()
return lifespan
registry = PluginRegistry()
registry.register(RedisPlugin(settings.redis_url))
registry.register(MetricsPlugin())
app = FastAPI(lifespan=registry.build_lifespan())
20 What is the difference between Annotated dependencies and plain Depends in FastAPI? ►
DI Annotated (Python 3.9+) with Depends allows you to define dependency defaults in type annotations rather than in function signatures โ enabling reusable dependency types that work like type aliases.
from typing import Annotated
from fastapi import Depends, FastAPI, Header, HTTPException
app = FastAPI()
# Traditional Depends โ dependency in function signature
@app.get(“/items”)
async def list_items(db: AsyncSession = Depends(get_db)):
…
# Annotated Depends โ dependency in the type annotation
DBSession = Annotated[AsyncSession, Depends(get_db)]
CurrentUser = Annotated[User, Depends(get_current_user)]
AdminUser = Annotated[User, Depends(require_roles(Role.ADMIN))]
# Now route signatures are cleaner and the type carries the dependency
@app.get(“/items”)
async def list_items(db: DBSession): # same as Depends(get_db)
…
@app.get(“/admin/users”)
async def list_all_users(
db: DBSession,
user: AdminUser # automatically verifies admin role
):
return await user_service.get_all(db)
# Compose annotated types
ActiveUser = Annotated[User, Depends(get_active_user)]
PaginationDep = Annotated[PaginationParams, Depends(common_pagination)]
@app.get(“/orders”)
async def list_orders(user: ActiveUser, pagination: PaginationDep, db: DBSession):
return await order_service.list_by_user(user.id, db, **pagination.model_dump())
21 How do you implement zero-downtime deployments for FastAPI? ►
Ops
- Graceful shutdown โ Uvicorn handles
SIGTERMgracefully: stops accepting new connections, completes in-flight requests, then exits. Set--timeout-graceful-shutdown 30. - Readiness probe โ Kubernetes only sends traffic to pods that return 200 from
/health/ready. New pods don’t receive traffic until fully initialised (DB connected, caches warmed). - Rolling updates โ Kubernetes gradually replaces old pods. Old pods keep running until new ones are ready and traffic is shifted.
- Database migrations โ run
alembic upgrade headas an init container before the new app pods start. Migrations must be backward compatible (additive only โ don’t remove columns until all pods are on the new version).
# Kubernetes deployment with health probes
# deployment.yaml
spec:
strategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 1 # start 1 new pod before terminating old
maxUnavailable: 0 # never have fewer than desired replicas
containers:
- name: api
image: myapp:v2.0
readinessProbe:
httpGet:
path: /health/ready
port: 8000
initialDelaySeconds: 10
periodSeconds: 5
failureThreshold: 3
livenessProbe:
httpGet:
path: /health/live
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
lifecycle:
preStop:
exec:
command: ["sleep", "5"] # drain existing connections before SIGTERM
📝 Knowledge Check
These questions mirror real senior-level FastAPI architecture and systems design interview scenarios.