The regular with statement calls synchronous __enter__ and __exit__. For resources that require async operations to acquire and release โ database connection pools, HTTP clients, file handles on async file systems โ Python provides async with, which calls __aenter__ and __aexit__ with await. Similarly, async for iterates over async generators and async iterables that produce values asynchronously. These async variants of familiar constructs are used throughout FastAPI’s database session management, async HTTP clients, and streaming response generation.
async with โ Async Context Managers
import asyncio
import httpx
import asyncpg
# โโ httpx.AsyncClient โ must use async with โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
async def fetch_user(user_id: int) -> dict:
async with httpx.AsyncClient() as client: # __aenter__ creates client
response = await client.get(f"https://api.example.com/users/{user_id}")
return response.json()
# __aexit__ closes the client โ guaranteed even on exception
# โโ asyncpg connection pool โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
async def get_users(pool: asyncpg.Pool) -> list:
async with pool.acquire() as conn: # acquire a connection from pool
rows = await conn.fetch("SELECT id, name FROM users")
return [dict(row) for row in rows]
# connection automatically released back to pool
# โโ Building your own async context manager โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
from contextlib import asynccontextmanager
@asynccontextmanager
async def timer(label: str):
import time
start = time.perf_counter()
try:
yield # code in the async with block runs here
finally:
elapsed = time.perf_counter() - start
print(f"{label}: {elapsed:.3f}s")
async def main():
async with timer("database query"):
await asyncio.sleep(0.1) # simulate a query
# database query: 0.100s
# โโ async with for transaction management โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
class AsyncTransaction:
def __init__(self, conn):
self.conn = conn
self.tx = None
async def __aenter__(self):
self.tx = self.conn.transaction()
await self.tx.start()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
if exc_type:
await self.tx.rollback()
else:
await self.tx.commit()
return False
Note: The
@asynccontextmanager decorator from contextlib works the same as @contextmanager but for async functions. Everything before yield runs on entry (__aenter__), everything after yield (including in finally) runs on exit (__aexit__). FastAPI’s lifespan parameter accepts an async context manager โ you use @asynccontextmanager to define startup and shutdown hooks that can perform async operations like connecting to a database pool.Tip: FastAPI’s dependency injection works seamlessly with both sync and async generators. A dependency with
yield (sync) is run in a thread pool; a dependency with yield in an async def function runs on the event loop. For database sessions with async drivers (asyncpg, async SQLAlchemy), always use async def get_db(): async with ... as session: yield session. For sync drivers (psycopg2, standard SQLAlchemy), use a plain def get_db(): with ... as session: yield session.Warning: Never use a synchronous context manager (
with) for async resources. with httpx.AsyncClient() as client does not work โ httpx.AsyncClient.__enter__ is not defined; only __aenter__ is. Similarly, never use async with for purely synchronous context managers that have no __aenter__. Always check the library documentation to know which type of context manager to use.async for โ Async Generators and Iterators
import asyncio
# โโ Async generator function โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
async def paginated_results(db, page_size: int = 100):
"""Yield database rows page by page asynchronously."""
offset = 0
while True:
rows = await db.fetch(
f"SELECT * FROM posts ORDER BY id LIMIT {page_size} OFFSET {offset}"
)
if not rows:
break
for row in rows:
yield row # async generator: yield inside async def
offset += page_size
# โโ Iterate with async for โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
async def process_all_posts(db):
async for post in paginated_results(db, page_size=500):
await process_post(post) # process each post
# โโ FastAPI streaming with async generator โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
from fastapi.responses import StreamingResponse
import json
async def stream_posts_json(db):
"""Stream posts as JSON lines (NDJSON format)."""
yield "["
first = True
async for post in paginated_results(db):
if not first:
yield ","
yield json.dumps({"id": post["id"], "title": post["title"]})
first = False
yield "]"
@app.get("/posts/stream")
async def stream_all_posts(db = Depends(get_async_db)):
return StreamingResponse(
stream_posts_json(db),
media_type="application/json"
)
# โโ Implementing the async iteration protocol manually โโโโโโโโโโโโโโโโโโโโโโโโ
class AsyncNumberRange:
def __init__(self, start: int, stop: int):
self.current = start
self.stop = stop
def __aiter__(self):
return self
async def __anext__(self) -> int:
if self.current > self.stop:
raise StopAsyncIteration # async equivalent of StopIteration
await asyncio.sleep(0) # yield to event loop each iteration
value = self.current
self.current += 1
return value
async def main():
async for n in AsyncNumberRange(1, 5):
print(n) # 1, 2, 3, 4, 5
asyncio.Queue โ Producer-Consumer Pattern
import asyncio
async def producer(queue: asyncio.Queue, items: list):
for item in items:
await queue.put(item) # add to queue
print(f"Produced: {item}")
await asyncio.sleep(0.1) # simulate work
await queue.put(None) # sentinel โ signals end of production
async def consumer(queue: asyncio.Queue, name: str):
while True:
item = await queue.get() # wait for item
if item is None:
await queue.put(None) # pass sentinel to next consumer
break
print(f"Consumer {name} processing: {item}")
await asyncio.sleep(0.2) # simulate processing
queue.task_done() # signal item processed
async def main():
queue = asyncio.Queue(maxsize=5) # buffer at most 5 items
# Start producer and two consumers concurrently
await asyncio.gather(
producer(queue, range(10)),
consumer(queue, "A"),
consumer(queue, "B"),
)
asyncio.run(main())
Common Mistakes
Mistake 1 โ Using with instead of async with for async resources
โ Wrong โ regular with on an async context manager:
with httpx.AsyncClient() as client: # AttributeError: __enter__ not defined
...
โ Correct:
async with httpx.AsyncClient() as client: # โ __aenter__ / __aexit__
...
Mistake 2 โ Using for instead of async for with an async generator
โ Wrong โ regular for loop on async generator:
for post in paginated_results(db): # TypeError: 'async_generator' not iterable
...
โ Correct:
async for post in paginated_results(db): # โ
...
Mistake 3 โ Forgetting @asynccontextmanager yield in try/finally
โ Wrong โ cleanup skipped on exception:
@asynccontextmanager
async def db_session():
session = await create_session()
yield session
await session.close() # NOT called if exception raised inside with block!
โ Correct:
@asynccontextmanager
async def db_session():
session = await create_session()
try:
yield session
finally:
await session.close() # โ always called
Quick Reference
| Pattern | Code |
|---|---|
| Async context manager | async with resource as r: |
| Build async CM | @asynccontextmanager async def f(): yield |
| Class-based async CM | Implement __aenter__ and __aexit__ |
| Async generator | async def gen(): yield value |
| Iterate async gen | async for item in gen(): |
| Async iterator class | Implement __aiter__ and __anext__ |
| Producer-consumer | asyncio.Queue(maxsize=N) |
| FastAPI lifespan | @asynccontextmanager async def lifespan(app): |