Async Context Managers, Iterators and Queues

The regular with statement calls synchronous __enter__ and __exit__. For resources that require async operations to acquire and release โ€” database connection pools, HTTP clients, file handles on async file systems โ€” Python provides async with, which calls __aenter__ and __aexit__ with await. Similarly, async for iterates over async generators and async iterables that produce values asynchronously. These async variants of familiar constructs are used throughout FastAPI’s database session management, async HTTP clients, and streaming response generation.

async with โ€” Async Context Managers

import asyncio
import httpx
import asyncpg

# โ”€โ”€ httpx.AsyncClient โ€” must use async with โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
async def fetch_user(user_id: int) -> dict:
    async with httpx.AsyncClient() as client:   # __aenter__ creates client
        response = await client.get(f"https://api.example.com/users/{user_id}")
        return response.json()
    # __aexit__ closes the client โ€” guaranteed even on exception

# โ”€โ”€ asyncpg connection pool โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
async def get_users(pool: asyncpg.Pool) -> list:
    async with pool.acquire() as conn:   # acquire a connection from pool
        rows = await conn.fetch("SELECT id, name FROM users")
        return [dict(row) for row in rows]
    # connection automatically released back to pool

# โ”€โ”€ Building your own async context manager โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
from contextlib import asynccontextmanager

@asynccontextmanager
async def timer(label: str):
    import time
    start = time.perf_counter()
    try:
        yield   # code in the async with block runs here
    finally:
        elapsed = time.perf_counter() - start
        print(f"{label}: {elapsed:.3f}s")

async def main():
    async with timer("database query"):
        await asyncio.sleep(0.1)   # simulate a query
# database query: 0.100s

# โ”€โ”€ async with for transaction management โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
class AsyncTransaction:
    def __init__(self, conn):
        self.conn = conn
        self.tx   = None

    async def __aenter__(self):
        self.tx = self.conn.transaction()
        await self.tx.start()
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        if exc_type:
            await self.tx.rollback()
        else:
            await self.tx.commit()
        return False
Note: The @asynccontextmanager decorator from contextlib works the same as @contextmanager but for async functions. Everything before yield runs on entry (__aenter__), everything after yield (including in finally) runs on exit (__aexit__). FastAPI’s lifespan parameter accepts an async context manager โ€” you use @asynccontextmanager to define startup and shutdown hooks that can perform async operations like connecting to a database pool.
Tip: FastAPI’s dependency injection works seamlessly with both sync and async generators. A dependency with yield (sync) is run in a thread pool; a dependency with yield in an async def function runs on the event loop. For database sessions with async drivers (asyncpg, async SQLAlchemy), always use async def get_db(): async with ... as session: yield session. For sync drivers (psycopg2, standard SQLAlchemy), use a plain def get_db(): with ... as session: yield session.
Warning: Never use a synchronous context manager (with) for async resources. with httpx.AsyncClient() as client does not work โ€” httpx.AsyncClient.__enter__ is not defined; only __aenter__ is. Similarly, never use async with for purely synchronous context managers that have no __aenter__. Always check the library documentation to know which type of context manager to use.

async for โ€” Async Generators and Iterators

import asyncio

# โ”€โ”€ Async generator function โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
async def paginated_results(db, page_size: int = 100):
    """Yield database rows page by page asynchronously."""
    offset = 0
    while True:
        rows = await db.fetch(
            f"SELECT * FROM posts ORDER BY id LIMIT {page_size} OFFSET {offset}"
        )
        if not rows:
            break
        for row in rows:
            yield row   # async generator: yield inside async def
        offset += page_size

# โ”€โ”€ Iterate with async for โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
async def process_all_posts(db):
    async for post in paginated_results(db, page_size=500):
        await process_post(post)   # process each post

# โ”€โ”€ FastAPI streaming with async generator โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
from fastapi.responses import StreamingResponse
import json

async def stream_posts_json(db):
    """Stream posts as JSON lines (NDJSON format)."""
    yield "["
    first = True
    async for post in paginated_results(db):
        if not first:
            yield ","
        yield json.dumps({"id": post["id"], "title": post["title"]})
        first = False
    yield "]"

@app.get("/posts/stream")
async def stream_all_posts(db = Depends(get_async_db)):
    return StreamingResponse(
        stream_posts_json(db),
        media_type="application/json"
    )

# โ”€โ”€ Implementing the async iteration protocol manually โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
class AsyncNumberRange:
    def __init__(self, start: int, stop: int):
        self.current = start
        self.stop    = stop

    def __aiter__(self):
        return self

    async def __anext__(self) -> int:
        if self.current > self.stop:
            raise StopAsyncIteration   # async equivalent of StopIteration
        await asyncio.sleep(0)         # yield to event loop each iteration
        value = self.current
        self.current += 1
        return value

async def main():
    async for n in AsyncNumberRange(1, 5):
        print(n)   # 1, 2, 3, 4, 5

asyncio.Queue โ€” Producer-Consumer Pattern

import asyncio

async def producer(queue: asyncio.Queue, items: list):
    for item in items:
        await queue.put(item)   # add to queue
        print(f"Produced: {item}")
        await asyncio.sleep(0.1)   # simulate work
    await queue.put(None)   # sentinel โ€” signals end of production

async def consumer(queue: asyncio.Queue, name: str):
    while True:
        item = await queue.get()   # wait for item
        if item is None:
            await queue.put(None)   # pass sentinel to next consumer
            break
        print(f"Consumer {name} processing: {item}")
        await asyncio.sleep(0.2)   # simulate processing
        queue.task_done()           # signal item processed

async def main():
    queue = asyncio.Queue(maxsize=5)   # buffer at most 5 items

    # Start producer and two consumers concurrently
    await asyncio.gather(
        producer(queue, range(10)),
        consumer(queue, "A"),
        consumer(queue, "B"),
    )

asyncio.run(main())

Common Mistakes

Mistake 1 โ€” Using with instead of async with for async resources

โŒ Wrong โ€” regular with on an async context manager:

with httpx.AsyncClient() as client:   # AttributeError: __enter__ not defined
    ...

โœ… Correct:

async with httpx.AsyncClient() as client:   # โœ“ __aenter__ / __aexit__
    ...

Mistake 2 โ€” Using for instead of async for with an async generator

โŒ Wrong โ€” regular for loop on async generator:

for post in paginated_results(db):   # TypeError: 'async_generator' not iterable
    ...

โœ… Correct:

async for post in paginated_results(db):   # โœ“
    ...

Mistake 3 โ€” Forgetting @asynccontextmanager yield in try/finally

โŒ Wrong โ€” cleanup skipped on exception:

@asynccontextmanager
async def db_session():
    session = await create_session()
    yield session
    await session.close()   # NOT called if exception raised inside with block!

โœ… Correct:

@asynccontextmanager
async def db_session():
    session = await create_session()
    try:
        yield session
    finally:
        await session.close()   # โœ“ always called

Quick Reference

Pattern Code
Async context manager async with resource as r:
Build async CM @asynccontextmanager async def f(): yield
Class-based async CM Implement __aenter__ and __aexit__
Async generator async def gen(): yield value
Iterate async gen async for item in gen():
Async iterator class Implement __aiter__ and __anext__
Producer-consumer asyncio.Queue(maxsize=N)
FastAPI lifespan @asynccontextmanager async def lifespan(app):

🧠 Test Yourself

You write a FastAPI database session dependency: async def get_db(): session = await create_session(); yield session; await session.close(). A route raises an exception mid-request. What happens to the session?