Memory-Efficient Patterns for FastAPI

Understanding iterators and generators is most valuable when applied to real production problems. In FastAPI, three scenarios demand memory-efficient patterns: streaming large database query results (thousands of rows that would exhaust RAM if loaded at once), streaming large file downloads (CSV exports, ZIP files, PDF generation), and processing large uploaded files in chunks rather than loading them fully into memory. This lesson applies everything from the chapter to these concrete use cases, with complete code you can adapt directly into your FastAPI applications.

Streaming Database Results with SQLAlchemy

from sqlalchemy.orm import Session
from fastapi import FastAPI, Depends
from fastapi.responses import StreamingResponse
import csv, io

app = FastAPI()

# โ”€โ”€ Problem: loading all rows at once uses too much memory โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
def bad_export(db: Session):
    posts = db.query(Post).all()   # loads ALL rows โ†’ potentially gigabytes!
    return [p.__dict__ for p in posts]

# โ”€โ”€ Solution 1: yield_per โ€” SQLAlchemy batch fetching โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
def stream_posts_yield_per(db: Session, batch_size: int = 1000):
    """Fetch posts in batches of batch_size using SQLAlchemy yield_per."""
    query = db.query(Post).order_by(Post.id).yield_per(batch_size)
    for post in query:
        yield post

# โ”€โ”€ Solution 2: manual pagination with offset/limit โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
def stream_posts_paginated(db: Session, page_size: int = 1000):
    """Fetch posts page by page, yielding each row."""
    offset = 0
    while True:
        batch = db.query(Post).order_by(Post.id).offset(offset).limit(page_size).all()
        if not batch:
            break
        yield from batch
        offset += page_size
        db.expire_all()   # release memory held by SQLAlchemy for previous batch
Note: SQLAlchemy’s yield_per(n) is a hint to the underlying database driver to fetch rows in batches of n instead of all at once. With PostgreSQL and psycopg2, this uses server-side cursors โ€” the database only sends n rows at a time, keeping memory usage proportional to the batch size rather than the total result set. This is the correct approach for exporting millions of rows. However, yield_per keeps the database transaction open for the duration of iteration โ€” ensure your session lifecycle handles this correctly.
Tip: When streaming large exports, call db.expire_all() after processing each batch to release the memory SQLAlchemy holds for the ORM objects. SQLAlchemy keeps track of all loaded objects in the session’s identity map โ€” without expiring them, the memory usage grows with every batch even though the previous batch’s data is no longer needed. Alternatively, use db.expunge_all() to detach all objects from the session entirely.
Warning: FastAPI’s StreamingResponse accepts any iterable or async iterable. When you pass a generator, FastAPI iterates it and sends each yielded chunk to the client as it is produced. However, once StreamingResponse starts sending the response, the HTTP status code and headers have already been sent โ€” you cannot change them if an error occurs mid-stream. Always validate that data exists and the operation is authorised before starting the stream.

Streaming File Downloads

import csv
import io
from fastapi.responses import StreamingResponse

def generate_csv_export(db: Session):
    """Generator that yields CSV chunks for StreamingResponse."""
    # Yield the header
    header_buf = io.StringIO()
    writer = csv.writer(header_buf)
    writer.writerow(["id", "title", "author", "views", "created_at"])
    yield header_buf.getvalue()

    # Stream rows in batches of 500
    for post in stream_posts_yield_per(db, batch_size=500):
        row_buf = io.StringIO()
        writer  = csv.writer(row_buf)
        writer.writerow([
            post.id,
            post.title,
            post.author.name if post.author else "",
            post.view_count,
            post.created_at.isoformat(),
        ])
        yield row_buf.getvalue()   # yield one row at a time

@app.get("/posts/export.csv")
async def export_posts_csv(db: Session = Depends(get_db)):
    return StreamingResponse(
        generate_csv_export(db),
        media_type="text/csv",
        headers={"Content-Disposition": 'attachment; filename="posts.csv"'},
    )

# โ”€โ”€ Streaming a large file from disk โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
def file_chunks(path: str, chunk_size: int = 65536):
    """Yield a file's contents in chunks without loading it all into memory."""
    with open(path, "rb") as f:
        while True:
            chunk = f.read(chunk_size)
            if not chunk:
                break
            yield chunk

@app.get("/downloads/{filename}")
async def download_file(filename: str):
    path = UPLOAD_DIR / filename
    if not path.exists():
        raise HTTPException(404, "File not found")

    return StreamingResponse(
        file_chunks(str(path)),
        media_type="application/octet-stream",
        headers={"Content-Disposition": f'attachment; filename="{filename}"'},
    )

Processing Large Uploaded Files in Chunks

from fastapi import UploadFile, File, HTTPException
import csv, io

MAX_FILE_SIZE = 50 * 1024 * 1024   # 50MB limit

@app.post("/users/import")
async def import_users_streamed(file: UploadFile = File(...)):
    if not file.filename.endswith(".csv"):
        raise HTTPException(400, "Only CSV files accepted")

    # Read in chunks โ€” avoid loading the entire file at once
    content = b""
    total_size = 0
    chunk_size = 1024 * 64   # 64KB chunks

    while True:
        chunk = await file.read(chunk_size)
        if not chunk:
            break
        total_size += len(chunk)
        if total_size > MAX_FILE_SIZE:
            raise HTTPException(413, f"File exceeds {MAX_FILE_SIZE // 1024 // 1024}MB limit")
        content += chunk

    # Process the collected content
    text    = content.decode("utf-8")
    reader  = csv.DictReader(io.StringIO(text))
    results = {"imported": 0, "errors": []}

    for row_num, row in enumerate(reader, start=2):
        try:
            # validate and import row
            results["imported"] += 1
        except Exception as e:
            results["errors"].append({"row": row_num, "error": str(e)})

    return results

Lazy Database Cursor Pattern

from contextlib import contextmanager

@contextmanager
def db_cursor_stream(query, batch_size: int = 500):
    """Context manager that yields database rows lazily in batches."""
    offset = 0
    while True:
        batch = query.offset(offset).limit(batch_size).all()
        if not batch:
            break
        yield from batch
        offset += batch_size

# Usage in a route
@app.get("/stats/tag-counts")
async def tag_statistics(db: Session = Depends(get_db)):
    from collections import Counter

    query  = db.query(Post).filter(Post.published == True)
    # Process 500 rows at a time โ€” never loads all posts at once
    tag_counter = Counter()

    with db_cursor_stream(query) as posts:
        for post in posts:
            tag_counter.update(post.tags)

    return {"top_tags": tag_counter.most_common(20)}

When to Use Each Approach

Scenario Approach Memory
Export 100 rows db.query().all() โ€” fine Constant
Export 10,000 rows yield_per(1000) with StreamingResponse O(batch)
Export 1M rows Server-side cursor + streaming CSV O(batch)
Download a 5MB file Read all at once โ€” fine 5MB
Download a 500MB file Stream in 64KB chunks 64KB
Upload a 1MB CSV Read all + csv.DictReader ~1MB
Upload a 100MB CSV Read in chunks, validate row by row O(chunk)
Aggregate 1M rows Stream + in-memory Counter O(unique keys)

Common Mistakes

Mistake 1 โ€” Reading an entire large upload into memory

โŒ Wrong โ€” loads the entire 100MB file at once:

content = await file.read()   # 100MB file โ†’ 100MB in RAM

โœ… Correct โ€” read in chunks:

while chunk := await file.read(65536):   # 64KB at a time โœ“
    process(chunk)

Mistake 2 โ€” Starting a stream before validating auth

โŒ Wrong โ€” sends 200 OK before auth check:

return StreamingResponse(generate_data())   # 200 sent, then auth check fails!

โœ… Correct โ€” validate everything before returning StreamingResponse:

if not current_user.can_export:
    raise HTTPException(403)   # sent before stream starts โœ“
return StreamingResponse(generate_data())

Mistake 3 โ€” Not expiring SQLAlchemy objects between batches

โŒ Wrong โ€” memory grows with every batch:

for batch in batches:
    yield from batch   # ORM objects accumulate in session identity map!

โœ… Correct โ€” expire between batches:

for batch in batches:
    yield from batch
    db.expire_all()   # โœ“ release memory for processed objects

Quick Reference

Pattern Code
Stream DB rows query.yield_per(1000)
Stream CSV download StreamingResponse(csv_generator, media_type="text/csv")
Stream file chunks while chunk := f.read(65536): yield chunk
Chunk upload read await file.read(chunk_size) in a loop
Release batch memory db.expire_all() after each batch
Content-Disposition 'attachment; filename="export.csv"'

🧠 Test Yourself

A FastAPI endpoint exports 500,000 database rows as a CSV download. Your current implementation calls db.query(Post).all() and builds a list before streaming. The server runs out of memory. What is the correct fix?