FastAPI works with both synchronous (def) and asynchronous (async def) route handlers and dependencies. The distinction matters significantly for performance: calling a synchronous blocking function in an async def handler blocks the entire event loop, making the application unable to handle other requests while it waits. FastAPI handles this correctly when you use the right form — async def for truly async operations and plain def for synchronous ones (FastAPI runs sync functions in a thread pool). Understanding this execution model is the key to writing high-performance FastAPI applications.
async def vs def in FastAPI Route Handlers
from fastapi import FastAPI, Depends
from sqlalchemy.orm import Session
import time, asyncio
app = FastAPI()
# ── async def route handler ────────────────────────────────────────────────────
# Use when: you make async I/O calls (await httpx, await async SQLAlchemy, etc.)
@app.get("/posts/{id}/async")
async def get_post_async(post_id: int):
# ✓ These are non-blocking — event loop handles other requests while waiting
async with httpx.AsyncClient() as client:
response = await client.get(f"https://api.example.com/posts/{post_id}")
return response.json()
# ── def route handler ─────────────────────────────────────────────────────────
# Use when: you use sync libraries (standard SQLAlchemy, psycopg2, etc.)
# FastAPI automatically runs def handlers in a thread pool
@app.get("/posts/{id}/sync")
def get_post_sync(post_id: int, db: Session = Depends(get_db)):
# ✓ FastAPI runs this in a thread — blocking I/O does not block event loop
post = db.query(Post).filter(Post.id == post_id).first()
return post
# ── The WORST pattern: sync I/O inside async def ───────────────────────────────
@app.get("/posts/{id}/bad")
async def get_post_bad(post_id: int, db: Session = Depends(get_db)):
# ✗ BLOCKS THE ENTIRE EVENT LOOP for the duration of the DB query!
time.sleep(0.1) # all other requests wait while this sleeps
post = db.query(Post).filter(Post.id == post_id).first() # sync driver blocks!
return post
def route handler, it runs it in Starlette’s thread pool executor using asyncio.to_thread(). This means synchronous blocking code in a def handler does NOT block the event loop — it runs in a separate thread, and the event loop is free to handle other requests. This is why using standard synchronous SQLAlchemy (with a sync driver like psycopg2) inside a plain def handler is perfectly correct and common in FastAPI applications.async def. If it uses a synchronous library (standard SQLAlchemy with psycopg2, files with open(), requests), use plain def. Using the wrong form in the wrong direction (sync I/O in async def) is the most common performance mistake in FastAPI applications.async def handlers block the event loop for their full duration — even without any I/O. A heavy computation like image resizing or PDF generation in an async def handler will prevent all other requests from being served until it completes. Use await asyncio.to_thread(cpu_function, args) to offload CPU-bound work to the thread pool, freeing the event loop while the computation runs.asyncio.to_thread() — Running Sync Code in Thread Pool
import asyncio
# ── Run a sync function in a thread without blocking the event loop ───────────
def sync_db_query(post_id: int) -> dict:
"""Synchronous database query using psycopg2."""
import psycopg2
conn = psycopg2.connect("postgresql://localhost/mydb")
cursor = conn.cursor()
cursor.execute("SELECT id, title FROM posts WHERE id = %s", (post_id,))
row = cursor.fetchone()
conn.close()
return {"id": row[0], "title": row[1]} if row else None
@app.get("/posts/{post_id}")
async def get_post(post_id: int):
# Run sync function in thread pool — does NOT block event loop
post = await asyncio.to_thread(sync_db_query, post_id)
return post
# ── CPU-bound work in thread pool ─────────────────────────────────────────────
def resize_image(image_bytes: bytes, width: int, height: int) -> bytes:
from PIL import Image
import io
img = Image.open(io.BytesIO(image_bytes))
resized = img.resize((width, height))
buf = io.BytesIO()
resized.save(buf, format="JPEG")
return buf.getvalue()
@app.post("/images/resize")
async def resize(width: int, height: int, file: UploadFile):
image_bytes = await file.read()
# Offload CPU-intensive resize to thread pool
result_bytes = await asyncio.to_thread(resize_image, image_bytes, width, height)
return Response(content=result_bytes, media_type="image/jpeg")
Decision Tree: async def or def?
Is your route handler doing any I/O?
│
├── NO (pure computation, in-memory operations)
│ └── Use plain def — FastAPI runs it in thread pool
│ (or async def is fine — just no performance difference)
│
└── YES — what kind of I/O?
│
├── Async library (httpx, asyncpg, async SQLAlchemy, aiofiles)
│ └── Use async def + await ✓
│
├── Sync library (psycopg2, requests, standard SQLAlchemy)
│ └── Use plain def — FastAPI handles thread pool automatically ✓
│ DO NOT use async def with sync I/O!
│
└── Mix of sync and async
└── Option A: use async def + asyncio.to_thread() for sync parts
Option B: refactor to use async libraries throughout
Common FastAPI Async Pitfalls
# ── Pitfall 1: sync dependency in async handler ───────────────────────────────
# Standard SQLAlchemy Session (sync) inside async def blocks event loop!
@app.get("/posts")
async def get_posts(db: Session = Depends(get_db)): # sync Session in async handler
posts = db.query(Post).all() # BLOCKS event loop!
return posts
# Fix: use plain def OR switch to async SQLAlchemy
# ── Pitfall 2: time.sleep() in async handler ─────────────────────────────────
@app.get("/slow")
async def slow_endpoint():
time.sleep(5) # BLOCKS event loop for 5 seconds!
return {"message": "done"}
# Fix: await asyncio.sleep(5) — or better, fix the underlying slowness
# ── Pitfall 3: requests library in async handler ─────────────────────────────
@app.get("/proxy")
async def proxy():
import requests
r = requests.get("https://api.example.com/data") # BLOCKS event loop!
return r.json()
# Fix: use httpx.AsyncClient with await
# ── Pitfall 4: CPU-bound in async def ────────────────────────────────────────
@app.post("/process")
async def process_data(data: list[int]):
result = sorted(data) # fine for small lists
# For large lists or heavy computation:
result = await asyncio.to_thread(sorted, data) # offload to thread ✓
return result
Common Mistakes
Mistake 1 — Using standard SQLAlchemy sync session inside async def
❌ Wrong — psycopg2-backed SQLAlchemy blocks event loop in async handler:
@app.get("/users")
async def get_users(db: Session = Depends(get_db)):
return db.query(User).all() # sync psycopg2 call blocks event loop!
✅ Correct — use plain def (FastAPI runs it in thread pool):
@app.get("/users")
def get_users(db: Session = Depends(get_db)): # ✓ sync, thread pool
return db.query(User).all()
Mistake 2 — Assuming async def is always faster than def
❌ Wrong — switching all handlers to async def for “performance”:
@app.get("/config")
async def get_config():
# Returns a static dict — no I/O at all
return {"version": "1.0"} # async def adds overhead without benefit
✅ Correct — use plain def for non-I/O handlers:
@app.get("/config")
def get_config():
return {"version": "1.0"} # ✓ fine for pure computation/static responses
Mistake 3 — Blocking with CPU work in async def without to_thread
❌ Wrong — heavy computation blocks all requests:
@app.post("/analyse")
async def analyse(text: str):
result = run_nlp_model(text) # takes 500ms — blocks event loop!
return result
✅ Correct — offload to thread:
@app.post("/analyse")
async def analyse(text: str):
result = await asyncio.to_thread(run_nlp_model, text) # ✓ non-blocking
return result
Quick Reference — FastAPI Execution Model
| Handler Type | Runs In | Use For |
|---|---|---|
async def handler() |
Event loop directly | Async I/O (httpx, asyncpg, async SQLAlchemy) |
def handler() |
Thread pool (via Starlette) | Sync I/O (psycopg2, requests, standard SQLAlchemy) |
asyncio.to_thread(fn) |
Thread pool | Sync/CPU-bound work called from async context |
| Sync I/O in async def | Event loop (BLOCKS) | Never do this |