Async SQLAlchemy with the asyncpg driver unlocks the full performance potential of FastAPI’s async event loop. When you use synchronous SQLAlchemy with psycopg2 in a plain def route handler, FastAPI runs the handler in a thread pool — it works, but the thread is blocked during every database query. With async SQLAlchemy and asyncpg in an async def handler, the event loop is free to process other requests during database I/O. For high-concurrency applications, this can dramatically increase throughput. The API is nearly identical to the sync version, with await added to database operations.
Async Engine and Session Setup
pip install sqlalchemy[asyncio] asyncpg
# app/database_async.py
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase
from app.config import settings
# ── Async engine: use postgresql+asyncpg:// URL ───────────────────────────────
# Change: postgresql://user:pass@host/db
# → postgresql+asyncpg://user:pass@host/db
async_engine = create_async_engine(
str(settings.database_url).replace("postgresql://", "postgresql+asyncpg://"),
pool_size = settings.db_pool_size,
max_overflow = 10,
pool_pre_ping = True,
echo = settings.db_echo_sql,
)
# ── Async session factory ─────────────────────────────────────────────────────
AsyncSessionLocal = async_sessionmaker(
bind = async_engine,
class_ = AsyncSession,
autocommit = False,
autoflush = False,
expire_on_commit = False, # important for async: don't expire after commit
)
class Base(DeclarativeBase):
pass
expire_on_commit=False is important for async SQLAlchemy. By default, SQLAlchemy expires all object attributes after a commit, expecting you to reload them before use. In async contexts, accessing an expired attribute triggers an implicit lazy load — but async sessions cannot implicitly load data (they require await). Setting expire_on_commit=False keeps attribute values in memory after commit, preventing MissingGreenlet errors when you access object attributes after committing.AsyncSession cannot be used with synchronous operations and vice versa. If you use async SQLAlchemy, all database operations in async routes must use await. Accessing a lazy relationship on an ORM object loaded through an AsyncSession outside of an await context will raise a MissingGreenlet error. Always eager-load all relationships you need before the async context ends.The Async get_db Dependency
# app/dependencies.py (async version)
from typing import AsyncGenerator
from sqlalchemy.ext.asyncio import AsyncSession
from app.database_async import AsyncSessionLocal
async def get_db() -> AsyncGenerator[AsyncSession, None]:
"""
FastAPI async dependency: provides an AsyncSession per request.
"""
async with AsyncSessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
# session.close() called automatically by async with on exit
# Usage in async route handler:
# @router.get("/posts")
# async def list_posts(db: AsyncSession = Depends(get_db)):
# result = await db.execute(select(Post))
# return result.scalars().all()
Async Query Operations
from sqlalchemy import select, func
from sqlalchemy.orm import joinedload, selectinload
from sqlalchemy.ext.asyncio import AsyncSession
# ── Fetch all (async) ─────────────────────────────────────────────────────────
async def get_published_posts(db: AsyncSession) -> list[Post]:
result = await db.execute(
select(Post)
.options(joinedload(Post.author), selectinload(Post.tags))
.where(Post.status == "published")
.order_by(Post.created_at.desc())
.limit(20)
)
return result.scalars().all() # .scalars() unwraps Row tuples
# ── Fetch by primary key ──────────────────────────────────────────────────────
async def get_post(db: AsyncSession, post_id: int) -> Post | None:
return await db.get(Post, post_id)
# ── Count ─────────────────────────────────────────────────────────────────────
async def count_posts(db: AsyncSession) -> int:
result = await db.execute(
select(func.count()).select_from(Post).where(Post.status == "published")
)
return result.scalar_one()
# ── Create (async) ────────────────────────────────────────────────────────────
async def create_post(db: AsyncSession, data: dict) -> Post:
post = Post(**data)
db.add(post)
await db.flush() # flush to get the generated id
await db.refresh(post)
return post
# ── Update (async) ────────────────────────────────────────────────────────────
async def update_post(db: AsyncSession, post_id: int, data: dict) -> Post | None:
post = await db.get(Post, post_id)
if post is None:
return None
for field, value in data.items():
setattr(post, field, value)
await db.flush()
await db.refresh(post)
return post
Async Eager Loading
from sqlalchemy import select
from sqlalchemy.orm import selectinload, joinedload
from sqlalchemy.ext.asyncio import AsyncSession
# ── selectinload for async — the preferred eager loading strategy ────────────
# selectinload works well with async; joinedload can cause issues with
# concurrent use when using async sessions incorrectly
async def get_post_detail(db: AsyncSession, post_id: int) -> Post | None:
result = await db.execute(
select(Post)
.options(
joinedload(Post.author), # one extra JOIN
selectinload(Post.tags), # one extra IN query
selectinload(Post.comments)
.joinedload(Comment.author), # comments + authors
)
.where(Post.id == post_id)
)
# unique() is needed when joinedload is used on a collection
return result.unique().scalar_one_or_none()
# ── After commit, refresh to get server-defaults ─────────────────────────────
async def create_user(db: AsyncSession, email: str, name: str) -> User:
user = User(email=email, name=name)
db.add(user)
await db.flush()
await db.refresh(user) # loads id, created_at, etc.
return user
Common Mistakes
Mistake 1 — Using synchronous session in async route handler
❌ Wrong — sync session blocks event loop in async handler:
@app.get("/posts")
async def list_posts(db: Session = Depends(sync_get_db)):
posts = db.execute(select(Post)).scalars().all() # blocks event loop!
✅ Correct — async session in async handler:
@app.get("/posts")
async def list_posts(db: AsyncSession = Depends(async_get_db)):
result = await db.execute(select(Post)) # ✓ non-blocking
return result.scalars().all()
Mistake 2 — Forgetting await on db.execute()
❌ Wrong — returns coroutine, not result:
result = db.execute(select(Post)) # coroutine object, not result!
posts = result.scalars().all() # AttributeError on coroutine
✅ Correct:
result = await db.execute(select(Post)) # ✓
posts = result.scalars().all()
Mistake 3 — Accessing lazy relationship outside await (MissingGreenlet)
❌ Wrong — lazy relationship access on async session outside await:
post = await db.get(Post, 1)
print(post.author.name) # MissingGreenlet error — lazy load needs await!
✅ Correct — always eager-load relationships needed in response:
result = await db.execute(select(Post).options(joinedload(Post.author)).where(Post.id==1))
post = result.unique().scalar_one()
print(post.author.name) # ✓ author already loaded
Quick Reference — Async vs Sync Comparison
| Operation | Sync | Async |
|---|---|---|
| Execute query | db.execute(stmt) |
await db.execute(stmt) |
| Scalars | db.scalars(stmt).all() |
(await db.execute(stmt)).scalars().all() |
| Get by PK | db.get(Model, pk) |
await db.get(Model, pk) |
| Flush | db.flush() |
await db.flush() |
| Refresh | db.refresh(obj) |
await db.refresh(obj) |
| Commit | db.commit() |
await db.commit() |
| Rollback | db.rollback() |
await db.rollback() |