Expert FastAPI Interview Questions and Answers

๐Ÿ“‹ Table of Contents โ–พ
  1. Questions & Answers
  2. 📝 Knowledge Check

⚡ Expert FastAPI Interview Questions

This lesson targets senior engineers and architects. Topics include the ASGI internals, performance benchmarking, Starlette internals, OpenTelemetry, distributed tracing, custom OpenAPI extensions, GraphQL integration, gRPC, event-driven architecture with Kafka, and production deployment patterns. These questions reveal whether you understand FastAPI deeply or just write Python routes.

Questions & Answers

01 What is ASGI and how does FastAPI use it?

Internals ASGI (Asynchronous Server Gateway Interface) is the Python standard for async web applications โ€” the successor to WSGI. It defines a calling convention between async web servers (Uvicorn, Hypercorn, Daphne) and async Python frameworks (FastAPI, Django Channels, Starlette).

ASGI interface:

# An ASGI application is a callable with this signature:
async def app(scope: dict, receive: callable, send: callable) -> None:
    # scope: connection info (type, path, headers, method, ...)
    # receive: async callable to receive events from the client (request body chunks)
    # send: async callable to send events to the client (response start, body chunks)
    ...

# FastAPI IS an ASGI application โ€” it can be passed directly to uvicorn
# uvicorn main:app  โ†’  uvicorn calls app(scope, receive, send) for each request

# The ASGI lifecycle:
# HTTP: scope["type"] = "http"
# WebSocket: scope["type"] = "websocket"
# Lifespan: scope["type"] = "lifespan" (startup/shutdown events)

# FastAPI wraps Starlette, which implements the ASGI interface:
# Request comes in โ†’ Starlette router โ†’ FastAPI route matching โ†’
#   dependency injection โ†’ pydantic validation โ†’ route handler โ†’
#   response serialisation โ†’ back through middleware โ†’ ASGI send

# Running with different ASGI servers:
# Uvicorn:   uvicorn main:app (most popular, uses uvloop)
# Hypercorn: hypercorn main:app (supports HTTP/2, HTTP/3)
# Granian:   granian --interface asgi main:app (Rust-based, fastest)

02 How does FastAPI’s dependency injection handle concurrency? What is the dependency cache?

DI FastAPI’s DI system caches dependency results within a single request โ€” if the same dependency appears multiple times in the dependency tree for one request, it is resolved only once.

async def get_db():
    async with AsyncSessionLocal() as session:
        yield session

# If both dependencies use get_db, it's resolved ONCE per request
async def get_current_user(db: AsyncSession = Depends(get_db)):
    ...

async def get_permissions(db: AsyncSession = Depends(get_db)):
    ...

@app.get("/dashboard")
async def dashboard(
    user:        User        = Depends(get_current_user),  # uses get_db
    permissions: list[str]  = Depends(get_permissions)    # uses same get_db session
):
    # get_db() was called ONCE, not twice โ€” same session shared
    ...

# Disable caching for a specific dependency (call it fresh every time)
@app.get("/fresh")
async def endpoint(
    data: Data = Depends(get_data, use_cache=False)   # always re-run
):
    ...

# Global dependencies โ€” applied to all routes
app = FastAPI(dependencies=[Depends(verify_api_key)])

# Or apply to a router
router = APIRouter(dependencies=[Depends(require_auth)])

03 How do you implement OpenTelemetry distributed tracing in FastAPI?

Observability

pip install opentelemetry-sdk opentelemetry-instrumentation-fastapi \
            opentelemetry-instrumentation-sqlalchemy opentelemetry-exporter-otlp
# instrumentation.py โ€” must be imported BEFORE app creation
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor

def setup_tracing(app):
    provider = TracerProvider()
    provider.add_span_processor(
        BatchSpanProcessor(OTLPSpanExporter(endpoint="http://otel-collector:4317"))
    )
    trace.set_tracer_provider(provider)

    # Auto-instrument FastAPI (wraps every request in a span)
    FastAPIInstrumentor.instrument_app(app)
    SQLAlchemyInstrumentor().instrument(engine=engine)
    HTTPXClientInstrumentor().instrument()

# main.py
from fastapi import FastAPI
from opentelemetry import trace

app = FastAPI()
setup_tracing(app)

tracer = trace.get_tracer(__name__)

@app.post("/orders")
async def create_order(order: OrderCreate):
    with tracer.start_as_current_span("validate-inventory") as span:
        span.set_attribute("order.items", len(order.items))
        available = await check_inventory(order.items)
        span.set_attribute("inventory.available", available)

    with tracer.start_as_current_span("create-order"):
        result = await order_service.create(order)
    return result

04 How do you integrate GraphQL with FastAPI using Strawberry?

GraphQL

pip install strawberry-graphql[fastapi]
import strawberry
from strawberry.fastapi import GraphQLRouter
from fastapi import FastAPI, Depends

# Define GraphQL types using Python dataclasses
@strawberry.type
class User:
    id:    int
    name:  str
    email: str

@strawberry.type
class Query:
    @strawberry.field
    async def user(self, id: int, info: strawberry.types.Info) -> User | None:
        db = info.context["db"]
        return await db.get_user(id)

    @strawberry.field
    async def users(self, info: strawberry.types.Info) -> list[User]:
        db = info.context["db"]
        return await db.list_users()

@strawberry.type
class Mutation:
    @strawberry.mutation
    async def create_user(self, name: str, email: str, info: strawberry.types.Info) -> User:
        db = info.context["db"]
        return await db.create_user(name=name, email=email)

@strawberry.type
class Subscription:
    @strawberry.subscription
    async def user_created(self, info: strawberry.types.Info):
        async for user in info.context["pubsub"].subscribe("user_created"):
            yield user

schema = strawberry.Schema(query=Query, mutation=Mutation, subscription=Subscription)

# Mount GraphQL router on FastAPI
async def get_context(db: AsyncSession = Depends(get_db)):
    return {"db": db}

graphql_app = GraphQLRouter(schema, context_getter=get_context)
app = FastAPI()
app.include_router(graphql_app, prefix="/graphql")

# Visit /graphql for GraphiQL IDE, POST /graphql for queries

05 How do you implement event-driven architecture with Kafka and FastAPI?

Events

pip install aiokafka
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
from fastapi import FastAPI
from contextlib import asynccontextmanager
import json, asyncio

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Kafka producer โ€” started at app startup
    app.state.kafka_producer = AIOKafkaProducer(
        bootstrap_servers="kafka:9092",
        value_serializer=lambda v: json.dumps(v).encode()
    )
    await app.state.kafka_producer.start()

    # Consumer โ€” runs as a background task
    consumer_task = asyncio.create_task(consume_events())
    app.state.consumer_task = consumer_task

    yield

    # Shutdown
    await app.state.kafka_producer.stop()
    consumer_task.cancel()

app = FastAPI(lifespan=lifespan)

# Publish events from route handlers
@app.post("/orders", status_code=201)
async def create_order(order: OrderCreate, request: Request):
    new_order = await order_service.create(order)
    # Publish domain event โ€” other services react to this
    await request.app.state.kafka_producer.send(
        topic="orders",
        key=str(new_order.id).encode(),
        value={"event": "order_created", "order_id": new_order.id, "data": order.model_dump()}
    )
    return new_order

# Consume events (e.g., listen for payment confirmations)
async def consume_events():
    consumer = AIOKafkaConsumer(
        "payments",
        bootstrap_servers="kafka:9092",
        group_id="order-service",
        value_deserializer=lambda v: json.loads(v.decode())
    )
    await consumer.start()
    try:
        async for msg in consumer:
            event = msg.value
            if event["event"] == "payment_confirmed":
                await order_service.mark_paid(event["order_id"])
    finally:
        await consumer.stop()

06 What is the Repository pattern in FastAPI? How does it improve testability?

Architecture The Repository pattern separates data access logic from business logic. Services depend on repository interfaces โ€” enabling swapping the database implementation in tests without changing business logic.

from abc import ABC, abstractmethod
from typing import Protocol

# Abstract interface (protocol)
class UserRepository(Protocol):
    async def get_by_id(self, user_id: int) -> User | None: ...
    async def get_by_email(self, email: str) -> User | None: ...
    async def create(self, data: UserCreate) -> User: ...
    async def update(self, user_id: int, data: UserUpdate) -> User: ...
    async def delete(self, user_id: int) -> None: ...

# Concrete implementation โ€” SQLAlchemy
class SQLUserRepository:
    def __init__(self, db: AsyncSession):
        self.db = db

    async def get_by_id(self, user_id: int) -> User | None:
        result = await self.db.execute(select(UserModel).where(UserModel.id == user_id))
        return result.scalar_one_or_none()

    async def create(self, data: UserCreate) -> User:
        user = UserModel(**data.model_dump())
        self.db.add(user)
        await self.db.flush()
        return user

# Business logic โ€” depends on the protocol, not the implementation
class UserService:
    def __init__(self, repo: UserRepository):
        self.repo = repo

    async def register(self, data: UserCreate) -> User:
        existing = await self.repo.get_by_email(data.email)
        if existing:
            raise ValueError(f"Email {data.email} already registered")
        data.password = hash_password(data.password)
        return await self.repo.create(data)

# Dependency injection wiring
def get_user_repo(db: AsyncSession = Depends(get_db)) -> UserRepository:
    return SQLUserRepository(db)

def get_user_service(repo: UserRepository = Depends(get_user_repo)) -> UserService:
    return UserService(repo)

# Test โ€” inject a mock repository
class InMemoryUserRepository:
    def __init__(self): self.users = {}
    async def get_by_email(self, email): return self.users.get(email)
    async def create(self, data): u = User(**data.model_dump()); self.users[u.email] = u; return u

# Tests use InMemoryUserRepository โ€” no database needed

07 How do you build and deploy FastAPI with Docker?

Deployment

# Dockerfile (multi-stage build for smaller production image)
FROM python:3.12-slim AS builder
WORKDIR /app
RUN pip install uv
COPY pyproject.toml uv.lock ./
RUN uv sync --frozen --no-dev     # install only production deps

FROM python:3.12-slim AS runtime
WORKDIR /app

# Non-root user (security best practice)
RUN groupadd -r appuser && useradd -r -g appuser appuser

COPY --from=builder /app/.venv /app/.venv
COPY ./app ./app

ENV PATH="/app/.venv/bin:$PATH"
ENV PYTHONPATH="/app"

USER appuser
EXPOSE 8000

# Use exec form โ€” receives signals properly
CMD ["uvicorn", "app.main:app",
     "--host", "0.0.0.0",
     "--port", "8000",
     "--workers", "1"]   # 1 worker per container; scale with K8s replicas
# docker-compose.yml (development)
services:
  api:
    build: .
    ports: ["8000:8000"]
    volumes: ["./app:/app/app"]   # hot reload in dev
    environment:
      DATABASE_URL: postgresql+asyncpg://postgres:pass@db/mydb
      REDIS_URL: redis://redis:6379
    command: uvicorn app.main:app --host 0.0.0.0 --port 8000 --reload
    depends_on: [db, redis]

  db:
    image: postgres:16-alpine
    environment: {POSTGRES_PASSWORD: pass, POSTGRES_DB: mydb}
    volumes: [postgres_data:/var/lib/postgresql/data]

  redis:
    image: redis:7-alpine

08 What is Celery integration with FastAPI?

Tasks Celery provides a distributed task queue for CPU-intensive, long-running, or scheduled jobs that should not run in the FastAPI event loop. FastAPI sends tasks to Celery; Celery workers execute them independently.

pip install celery redis
# celery_app.py
from celery import Celery

celery = Celery(
    "worker",
    broker="redis://localhost:6379/0",
    backend="redis://localhost:6379/1",
    include=["app.tasks"]
)
celery.conf.update(task_serializer="json", result_serializer="json")

# app/tasks.py
from celery_app import celery
import time

@celery.task(bind=True, max_retries=3)
def process_video(self, video_id: int, options: dict):
    try:
        # Long-running CPU-intensive work
        result = encode_video(video_id, **options)
        return {"video_id": video_id, "output": result}
    except Exception as exc:
        raise self.retry(exc=exc, countdown=60)

# FastAPI route โ€” dispatches task, returns immediately
@app.post("/videos/{video_id}/process", status_code=202)
async def start_processing(video_id: int, options: ProcessOptions):
    task = process_video.apply_async(args=[video_id, options.model_dump()])
    return {"task_id": task.id, "status": "processing"}

# Check task status
@app.get("/tasks/{task_id}")
async def get_task_status(task_id: str):
    task = celery.AsyncResult(task_id)
    return {
        "task_id": task_id,
        "status":  task.status,
        "result":  task.result if task.ready() else None
    }

# Start Celery worker
# celery -A celery_app worker --loglevel=info --concurrency=4

09 How do you implement idempotency for POST endpoints in FastAPI?

Reliability Idempotency keys prevent duplicate processing when clients retry failed requests โ€” critical for payments and order creation.

from fastapi import FastAPI, Header, Request, Response
from typing import Optional
import redis.asyncio as aioredis, json

app = FastAPI()

@app.middleware("http")
async def idempotency_middleware(request: Request, call_next):
    # Only apply to POST, PUT, PATCH
    if request.method not in ("POST", "PUT", "PATCH"):
        return await call_next(request)

    idempotency_key = request.headers.get("Idempotency-Key")
    if not idempotency_key:
        return await call_next(request)

    redis = request.app.state.redis
    cache_key = f"idempotency:{idempotency_key}"

    # Check if we've seen this key before
    cached = await redis.get(cache_key)
    if cached:
        data = json.loads(cached)
        return Response(
            content=json.dumps(data["body"]),
            status_code=data["status_code"],
            media_type="application/json",
            headers={"X-Idempotent-Replayed": "true"}
        )

    # Process the request
    response = await call_next(request)

    # Cache the response for 24 hours (only on success)
    if 200 <= response.status_code < 300:
        body = b""
        async for chunk in response.body_iterator:
            body += chunk
        await redis.setex(cache_key, 86400, json.dumps({
            "status_code": response.status_code,
            "body": json.loads(body)
        }))
        return Response(content=body, status_code=response.status_code,
                       media_type="application/json")
    return response

# Client sends: POST /payments  Idempotency-Key: uuid4-here
# Retry with same key โ†’ gets same response, charge not duplicated

10 What is the difference between Uvicorn, Gunicorn, and Gunicorn+Uvicorn workers?

Deployment

  • Uvicorn standalone โ€” single-process ASGI server. Uses a single event loop. Limited to one CPU core. Best for development (--reload) or containerised deployments where horizontal scaling is handled externally (Kubernetes).
  • Gunicorn โ€” mature WSGI process manager (pre-fork model). Not natively ASGI-compatible. Cannot run FastAPI directly without a worker class.
  • Gunicorn + UvicornWorker โ€” Gunicorn manages multiple Uvicorn worker processes. Each worker has its own event loop. Combines Gunicorn’s process management (restart on crash, graceful shutdown) with Uvicorn’s async performance. Suitable for traditional VM deployments.
# Gunicorn with Uvicorn workers (multi-core VM deployment)
pip install gunicorn

gunicorn app.main:app \
    --workers 4 \                  # typically 2x or 4x CPU count
    --worker-class uvicorn.workers.UvicornWorker \
    --bind 0.0.0.0:8000 \
    --timeout 120 \
    --keep-alive 5 \
    --max-requests 1000 \          # restart worker after N requests (prevent memory bloat)
    --max-requests-jitter 100 \
    --access-logfile - \
    --error-logfile -

# In containers (Docker + K8s): prefer single uvicorn process per container
# Scale by increasing replica count โ€” simpler, more cloud-native
uvicorn app.main:app --host 0.0.0.0 --port 8000 --workers 1

# Newer alternative: Granian (Rust-based, outperforms Gunicorn+Uvicorn)
pip install granian
granian --interface asgi app.main:app --host 0.0.0.0 --port 8000 --workers 4

11 How do you implement a multi-tenant API with FastAPI?

Architecture

from fastapi import FastAPI, Depends, Request, HTTPException
from pydantic import BaseModel

app = FastAPI()

class Tenant(BaseModel):
    id:        int
    slug:      str
    plan:      str
    db_schema: str  # separate schema per tenant

# Tenant resolution dependency โ€” from subdomain, header, or JWT claim
async def get_current_tenant(
    request: Request,
    x_tenant_id: str | None = Header(None)
) -> Tenant:
    # Method 1: header-based
    if x_tenant_id:
        tenant = await tenant_service.get_by_id(x_tenant_id)
        if not tenant:
            raise HTTPException(404, "Tenant not found")
        return tenant

    # Method 2: subdomain-based (acme.myapp.com)
    host = request.headers.get("host", "")
    subdomain = host.split(".")[0]
    tenant = await tenant_service.get_by_slug(subdomain)
    if not tenant:
        raise HTTPException(404, "Tenant not found")
    return tenant

# DB session with tenant schema
async def get_tenant_db(
    tenant: Tenant = Depends(get_current_tenant),
    db:     AsyncSession = Depends(get_db)
) -> AsyncSession:
    # Set the search_path to the tenant's schema
    await db.execute(text(f"SET search_path TO {tenant.db_schema}, public"))
    return db

@app.get("/orders")
async def list_orders(
    tenant: Tenant     = Depends(get_current_tenant),
    db:     AsyncSession = Depends(get_tenant_db)
):
    # Query is automatically scoped to tenant's schema
    return await db.scalars(select(Order))

# Rate limiting per tenant plan
def check_tenant_rate_limit(tenant: Tenant = Depends(get_current_tenant)):
    limits = {"free": "100/day", "pro": "10000/day", "enterprise": "unlimited"}
    # Apply limit based on tenant plan
    return tenant

12 How do you implement custom Pydantic validators and computed fields?

Pydantic

from pydantic import BaseModel, field_validator, model_validator, computed_field
from pydantic import AnyHttpUrl, EmailStr, Field
from typing import Annotated

# Custom type with validation
PositiveDecimal = Annotated[float, Field(gt=0, decimal_places=2)]

class OrderItem(BaseModel):
    product_id: int
    quantity:   int   = Field(..., ge=1, le=1000)
    unit_price: PositiveDecimal

    # Computed field โ€” calculated from other fields, included in serialisation
    @computed_field
    @property
    def subtotal(self) -> float:
        return round(self.quantity * self.unit_price, 2)

class Order(BaseModel):
    items:           list[OrderItem] = Field(..., min_length=1)
    discount_pct:    float = Field(0, ge=0, le=100)
    delivery_method: str

    @field_validator("delivery_method")
    @classmethod
    def validate_delivery(cls, v: str) -> str:
        allowed = {"standard", "express", "next-day"}
        if v not in allowed:
            raise ValueError(f"delivery_method must be one of: {allowed}")
        return v

    @model_validator(mode="after")
    def validate_minimum_order(self) -> "Order":
        total = sum(item.subtotal for item in self.items)
        if self.delivery_method == "next-day" and total < 50:
            raise ValueError("Next-day delivery requires order total >= ยฃ50")
        return self

    @computed_field
    @property
    def total_before_discount(self) -> float:
        return round(sum(i.subtotal for i in self.items), 2)

    @computed_field
    @property
    def total(self) -> float:
        return round(self.total_before_discount * (1 - self.discount_pct / 100), 2)

13 What are advanced Starlette features used by FastAPI?

Starlette FastAPI is built on Starlette. Understanding Starlette gives you access to lower-level features that FastAPI exposes or that you can use directly.

from starlette.applications import Starlette
from starlette.routing import Route, Mount
from starlette.staticfiles import StaticFiles
from starlette.requests import Request
from starlette.responses import JSONResponse

# Mount static files
app.mount("/static", StaticFiles(directory="static"), name="static")
app.mount("/uploads", StaticFiles(directory="uploads"), name="uploads")

# Starlette routing (lower-level than FastAPI routes)
async def homepage(request: Request):
    return JSONResponse({"status": "ok"})

# Mount another ASGI app at a sub-path
from fastapi import FastAPI
admin_app = FastAPI()
main_app  = FastAPI()
main_app.mount("/admin", admin_app)  # admin FastAPI app at /admin

# Starlette middleware (more performant than BaseHTTPMiddleware)
from starlette.middleware import Middleware
from starlette.middleware.trustedhost import TrustedHostMiddleware
from starlette.middleware.httpsredirect import HTTPSRedirectMiddleware
from starlette.middleware.sessions import SessionMiddleware

app.add_middleware(TrustedHostMiddleware, allowed_hosts=["myapp.com", "*.myapp.com"])
app.add_middleware(SessionMiddleware, secret_key="secret")  # cookie-based sessions

# Starlette test client (sync) โ€” used by FastAPI TestClient
from starlette.testclient import TestClient
client = TestClient(app)

# Background tasks via Starlette (same as FastAPI BackgroundTasks)
from starlette.background import BackgroundTask
response = JSONResponse({"result": "ok"})
response.background = BackgroundTask(send_email, to="user@example.com")

14 How do you implement circuit breakers in FastAPI?

Resilience

pip install circuitbreaker
from circuitbreaker import circuit, CircuitBreakerError
from fastapi import FastAPI, HTTPException, Depends
import httpx

app = FastAPI()

# Wrap external service calls with circuit breaker
@circuit(failure_threshold=5, recovery_timeout=30, expected_exception=httpx.RequestError)
async def call_payment_service(payload: dict) -> dict:
    async with httpx.AsyncClient(timeout=5.0) as client:
        response = await client.post("http://payment-service/charge", json=payload)
        response.raise_for_status()
        return response.json()

@app.post("/checkout")
async def checkout(order: OrderCreate):
    try:
        payment_result = await call_payment_service(order.payment.model_dump())
        return {"order": order, "payment": payment_result}
    except CircuitBreakerError:
        # Circuit is OPEN โ€” payment service is down, fail fast
        raise HTTPException(
            status_code=503,
            detail="Payment service temporarily unavailable. Please try again later."
        )
    except httpx.HTTPStatusError as e:
        raise HTTPException(status_code=e.response.status_code, detail=str(e))

# Async circuit breaker with custom logic
from dataclasses import dataclass, field
from datetime import datetime, timedelta

@dataclass
class AsyncCircuitBreaker:
    threshold:   int = 5
    timeout:     int = 30
    failures:    int = 0
    last_failure: datetime | None = None
    state:       str = "closed"   # closed, open, half-open

    def is_open(self) -> bool:
        if self.state == "open":
            if self.last_failure and datetime.now() - self.last_failure > timedelta(seconds=self.timeout):
                self.state = "half-open"
                return False
            return True
        return False

    def record_failure(self):
        self.failures += 1
        self.last_failure = datetime.now()
        if self.failures >= self.threshold:
            self.state = "open"

    def record_success(self):
        self.failures = 0
        self.state = "closed"

15 What are the security best practices for a production FastAPI application?

Security

  • HTTPS always โ€” terminate TLS at the load balancer or Nginx. Set HTTPSRedirectMiddleware to enforce.
  • Helmet-equivalent headers โ€” use secure library or manually set X-Frame-Options, X-Content-Type-Options, Strict-Transport-Security, Content-Security-Policy.
  • Rate limiting โ€” on all endpoints; stricter on auth (slowapi). Use Redis store for multi-process deployments.
  • Input validation โ€” Pydantic + Field validators prevent injection; never use raw SQL string formatting.
  • SQLAlchemy parameterised queries โ€” always use ORM or text() with :param syntax; never f-string SQL.
  • Secrets management โ€” use pydantic-settings; load secrets from AWS Secrets Manager or Vault in production. Never commit .env.
  • JWT best practices โ€” short-lived access tokens (15 min), longer-lived refresh tokens in HttpOnly cookies. Validate iss, aud, exp claims.
  • CORS restriction โ€” never allow_origins=["*"] in production with allow_credentials=True.
  • Dependency scanning โ€” run pip-audit / Snyk in CI/CD pipelines.
  • Error responses โ€” never expose stack traces or internal details in production error messages. Log them server-side only.
from fastapi.middleware.httpsredirect import HTTPSRedirectMiddleware
from secure import Secure  # pip install secure

app.add_middleware(HTTPSRedirectMiddleware)
secure_headers = Secure()

@app.middleware("http")
async def set_secure_headers(request, call_next):
    response = await call_next(request)
    secure_headers.framework.fastapi(response)
    return response

16 How do you profile and optimise FastAPI application performance?

Performance

# Load testing with locust or wrk
pip install locust

# locustfile.py
from locust import HttpUser, task, between

class APIUser(HttpUser):
    wait_time = between(0.1, 0.5)

    @task(3)
    def list_products(self):
        self.client.get("/products?page=1&size=20")

    @task(1)
    def create_order(self):
        self.client.post("/orders", json={"items": [{"product_id": 1, "qty": 2}]})

# Run: locust -f locustfile.py --host http://localhost:8000
# Profiling with pyinstrument (async-aware)
pip install pyinstrument

from pyinstrument import Profiler

@app.middleware("http")
async def profile_middleware(request: Request, call_next):
    if request.query_params.get("profile"):  # only when ?profile=true
        profiler = Profiler(async_mode="enabled")
        with profiler:
            response = await call_next(request)
        return HTMLResponse(profiler.output_html())
    return await call_next(request)

# Key optimisations:
# 1. Async everywhere โ€” no sync DB calls in async routes
# 2. Connection pooling โ€” asyncpg pool_size=10 per worker
# 3. N+1 queries โ€” use selectinload/joinedload (see advanced lesson)
# 4. Redis caching โ€” cache expensive aggregations and lookups
# 5. Response streaming โ€” for large payloads, use StreamingResponse
# 6. Pydantic model_config โ€” use model_config = ConfigDict(frozen=True) for
#    immutable models (marginal speedup on serialisation)
# 7. orjson โ€” faster JSON serialisation
pip install orjson
from fastapi.responses import ORJSONResponse
app = FastAPI(default_response_class=ORJSONResponse)

17 How do you implement event sourcing patterns with FastAPI?

Patterns Event sourcing stores the full history of state changes as an immutable event log โ€” rather than only the current state. Current state is reconstructed by replaying events.

from pydantic import BaseModel
from datetime import datetime
from enum import Enum
from typing import Any

class EventType(str, Enum):
    ORDER_CREATED   = "order_created"
    ITEM_ADDED      = "item_added"
    PAYMENT_RECEIVED = "payment_received"
    ORDER_SHIPPED   = "order_shipped"
    ORDER_CANCELLED = "order_cancelled"

class DomainEvent(BaseModel):
    event_id:   str          # UUID
    event_type: EventType
    aggregate_id: int        # the entity this event belongs to
    payload:    dict[str, Any]
    occurred_at: datetime = datetime.utcnow()
    version:    int

# Event store (append-only)
class EventStore:
    async def append(self, event: DomainEvent, db: AsyncSession):
        row = EventModel(
            event_id     = event.event_id,
            event_type   = event.event_type,
            aggregate_id = event.aggregate_id,
            payload      = event.payload,
            version      = event.version,
            occurred_at  = event.occurred_at
        )
        db.add(row)
        # Optimistic concurrency: raise if version already exists
        await db.flush()

    async def get_events(self, aggregate_id: int, db: AsyncSession) -> list[DomainEvent]:
        rows = await db.scalars(
            select(EventModel)
            .where(EventModel.aggregate_id == aggregate_id)
            .order_by(EventModel.version)
        )
        return [DomainEvent.model_validate(r.__dict__) for r in rows]

# Aggregate โ€” reconstructs state from events
class OrderAggregate:
    def __init__(self, order_id: int):
        self.id    = order_id
        self.items = []
        self.status = "pending"
        self.version = 0

    def apply(self, event: DomainEvent):
        if event.event_type == EventType.ITEM_ADDED:
            self.items.append(event.payload["item"])
        elif event.event_type == EventType.PAYMENT_RECEIVED:
            self.status = "paid"
        elif event.event_type == EventType.ORDER_SHIPPED:
            self.status = "shipped"
        self.version = event.version

18 How does FastAPI handle request validation errors? How do you customise the 422 response?

Validation When request data fails Pydantic validation, FastAPI automatically returns HTTP 422 Unprocessable Entity with a detailed error list. You can customise this response globally.

from fastapi import FastAPI, Request
from fastapi.exceptions import RequestValidationError
from fastapi.responses import JSONResponse
from pydantic import ValidationError

app = FastAPI()

# Default 422 response from FastAPI:
# { "detail": [{ "type": "int_parsing", "loc": ["body", "age"],
#                "msg": "Input should be a valid integer",
#                "input": "abc", "url": "..." }] }

# Custom 422 handler โ€” simplified, client-friendly format
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
    errors = []
    for error in exc.errors():
        field = " -> ".join(str(loc) for loc in error["loc"] if loc != "body")
        errors.append({
            "field":   field,
            "message": error["msg"],
            "type":    error["type"]
        })
    return JSONResponse(
        status_code=422,
        content={
            "error":   "VALIDATION_ERROR",
            "message": "Request data validation failed",
            "errors":  errors
        }
    )

# Validate within a route and return 400 (business rule violation vs schema error)
@app.post("/orders")
async def create_order(order: OrderCreate):
    if not await inventory_service.has_stock(order.items):
        raise HTTPException(
            status_code=400,
            detail={
                "error":   "INSUFFICIENT_STOCK",
                "message": "One or more items are out of stock",
                "items":   [i.product_id for i in order.items if not i.in_stock]
            }
        )

19 How do you implement a plugin or extension system in FastAPI?

Architecture FastAPI’s lifespan, dependency injection, and middleware system make it easy to build pluggable, modular applications where features can be enabled or disabled via configuration.

from fastapi import FastAPI from contextlib import asynccontextmanager from typing import Protocol # Plugin interface (Protocol) class FastAPIPlugin(Protocol): async def setup(self, app: FastAPI) -> None: … async def teardown(self) -> None: … # Plugin implementations class RedisPlugin: def __init__(self, url: str): self.url = url self.client = None async def setup(self, app: FastAPI): self.client = await aioredis.from_url(self.url) app.state.redis = self.client async def teardown(self): if self.client: await self.client.close() class MetricsPlugin: async def setup(self, app: FastAPI): from prometheus_client import make_asgi_app metrics_app = make_asgi_app() app.mount(“/metrics”, metrics_app) async def teardown(self): pass # Plugin registry class PluginRegistry: def __init__(self): self._plugins: list[FastAPIPlugin] = [] def register(self, plugin: FastAPIPlugin) -> “PluginRegistry”: self._plugins.append(plugin) return self def build_lifespan(self): plugins = self._plugins @asynccontextmanager async def lifespan(app: FastAPI): for plugin in plugins: await plugin.setup(app) yield for plugin in reversed(plugins): await plugin.teardown() return lifespan registry = PluginRegistry() registry.register(RedisPlugin(settings.redis_url)) registry.register(MetricsPlugin()) app = FastAPI(lifespan=registry.build_lifespan())

20 What is the difference between Annotated dependencies and plain Depends in FastAPI?

DI Annotated (Python 3.9+) with Depends allows you to define dependency defaults in type annotations rather than in function signatures โ€” enabling reusable dependency types that work like type aliases.

from typing import Annotated from fastapi import Depends, FastAPI, Header, HTTPException app = FastAPI() # Traditional Depends โ€” dependency in function signature @app.get(“/items”) async def list_items(db: AsyncSession = Depends(get_db)): … # Annotated Depends โ€” dependency in the type annotation DBSession = Annotated[AsyncSession, Depends(get_db)] CurrentUser = Annotated[User, Depends(get_current_user)] AdminUser = Annotated[User, Depends(require_roles(Role.ADMIN))] # Now route signatures are cleaner and the type carries the dependency @app.get(“/items”) async def list_items(db: DBSession): # same as Depends(get_db) … @app.get(“/admin/users”) async def list_all_users( db: DBSession, user: AdminUser # automatically verifies admin role ): return await user_service.get_all(db) # Compose annotated types ActiveUser = Annotated[User, Depends(get_active_user)] PaginationDep = Annotated[PaginationParams, Depends(common_pagination)] @app.get(“/orders”) async def list_orders(user: ActiveUser, pagination: PaginationDep, db: DBSession): return await order_service.list_by_user(user.id, db, **pagination.model_dump())

21 How do you implement zero-downtime deployments for FastAPI?

Ops

  • Graceful shutdown โ€” Uvicorn handles SIGTERM gracefully: stops accepting new connections, completes in-flight requests, then exits. Set --timeout-graceful-shutdown 30.
  • Readiness probe โ€” Kubernetes only sends traffic to pods that return 200 from /health/ready. New pods don’t receive traffic until fully initialised (DB connected, caches warmed).
  • Rolling updates โ€” Kubernetes gradually replaces old pods. Old pods keep running until new ones are ready and traffic is shifted.
  • Database migrations โ€” run alembic upgrade head as an init container before the new app pods start. Migrations must be backward compatible (additive only โ€” don’t remove columns until all pods are on the new version).
# Kubernetes deployment with health probes
# deployment.yaml
spec:
  strategy:
    type: RollingUpdate
    rollingUpdate:
      maxSurge:       1      # start 1 new pod before terminating old
      maxUnavailable: 0      # never have fewer than desired replicas
  containers:
  - name: api
    image: myapp:v2.0
    readinessProbe:
      httpGet:
        path: /health/ready
        port: 8000
      initialDelaySeconds: 10
      periodSeconds: 5
      failureThreshold: 3
    livenessProbe:
      httpGet:
        path: /health/live
        port: 8000
      initialDelaySeconds: 30
      periodSeconds: 10
    lifecycle:
      preStop:
        exec:
          command: ["sleep", "5"]  # drain existing connections before SIGTERM

📝 Knowledge Check

These questions mirror real senior-level FastAPI architecture and systems design interview scenarios.

🧠 Quiz Question 1 of 5

What is ASGI and why does FastAPI require an ASGI server like Uvicorn rather than a WSGI server like Gunicorn?





🧠 Quiz Question 2 of 5

When should you use Celery instead of FastAPI’s built-in BackgroundTasks?





🧠 Quiz Question 3 of 5

What does the dependency cache in FastAPI guarantee within a single request?





🧠 Quiz Question 4 of 5

In production, why is a single Uvicorn process per container preferred over Gunicorn with multiple Uvicorn workers?





🧠 Quiz Question 5 of 5

What does the Repository pattern achieve in a FastAPI application?





Tip: Expert FastAPI interviews focus on architecture decisions, not syntax. For ASGI, explain why async matters before describing Uvicorn. For the Repository pattern, explain the testability problem first. For zero-downtime deployments, explain what goes wrong without readiness probes. Frame every answer as: problem โ†’ solution โ†’ tradeoffs โ€” that is what separates senior engineers.