Understanding iterators and generators is most valuable when applied to real production problems. In FastAPI, three scenarios demand memory-efficient patterns: streaming large database query results (thousands of rows that would exhaust RAM if loaded at once), streaming large file downloads (CSV exports, ZIP files, PDF generation), and processing large uploaded files in chunks rather than loading them fully into memory. This lesson applies everything from the chapter to these concrete use cases, with complete code you can adapt directly into your FastAPI applications.
Streaming Database Results with SQLAlchemy
from sqlalchemy.orm import Session
from fastapi import FastAPI, Depends
from fastapi.responses import StreamingResponse
import csv, io
app = FastAPI()
# โโ Problem: loading all rows at once uses too much memory โโโโโโโโโโโโโโโโโโโโ
def bad_export(db: Session):
posts = db.query(Post).all() # loads ALL rows โ potentially gigabytes!
return [p.__dict__ for p in posts]
# โโ Solution 1: yield_per โ SQLAlchemy batch fetching โโโโโโโโโโโโโโโโโโโโโโโโโโ
def stream_posts_yield_per(db: Session, batch_size: int = 1000):
"""Fetch posts in batches of batch_size using SQLAlchemy yield_per."""
query = db.query(Post).order_by(Post.id).yield_per(batch_size)
for post in query:
yield post
# โโ Solution 2: manual pagination with offset/limit โโโโโโโโโโโโโโโโโโโโโโโโโโ
def stream_posts_paginated(db: Session, page_size: int = 1000):
"""Fetch posts page by page, yielding each row."""
offset = 0
while True:
batch = db.query(Post).order_by(Post.id).offset(offset).limit(page_size).all()
if not batch:
break
yield from batch
offset += page_size
db.expire_all() # release memory held by SQLAlchemy for previous batch
yield_per(n) is a hint to the underlying database driver to fetch rows in batches of n instead of all at once. With PostgreSQL and psycopg2, this uses server-side cursors โ the database only sends n rows at a time, keeping memory usage proportional to the batch size rather than the total result set. This is the correct approach for exporting millions of rows. However, yield_per keeps the database transaction open for the duration of iteration โ ensure your session lifecycle handles this correctly.db.expire_all() after processing each batch to release the memory SQLAlchemy holds for the ORM objects. SQLAlchemy keeps track of all loaded objects in the session’s identity map โ without expiring them, the memory usage grows with every batch even though the previous batch’s data is no longer needed. Alternatively, use db.expunge_all() to detach all objects from the session entirely.StreamingResponse accepts any iterable or async iterable. When you pass a generator, FastAPI iterates it and sends each yielded chunk to the client as it is produced. However, once StreamingResponse starts sending the response, the HTTP status code and headers have already been sent โ you cannot change them if an error occurs mid-stream. Always validate that data exists and the operation is authorised before starting the stream.Streaming File Downloads
import csv
import io
from fastapi.responses import StreamingResponse
def generate_csv_export(db: Session):
"""Generator that yields CSV chunks for StreamingResponse."""
# Yield the header
header_buf = io.StringIO()
writer = csv.writer(header_buf)
writer.writerow(["id", "title", "author", "views", "created_at"])
yield header_buf.getvalue()
# Stream rows in batches of 500
for post in stream_posts_yield_per(db, batch_size=500):
row_buf = io.StringIO()
writer = csv.writer(row_buf)
writer.writerow([
post.id,
post.title,
post.author.name if post.author else "",
post.view_count,
post.created_at.isoformat(),
])
yield row_buf.getvalue() # yield one row at a time
@app.get("/posts/export.csv")
async def export_posts_csv(db: Session = Depends(get_db)):
return StreamingResponse(
generate_csv_export(db),
media_type="text/csv",
headers={"Content-Disposition": 'attachment; filename="posts.csv"'},
)
# โโ Streaming a large file from disk โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
def file_chunks(path: str, chunk_size: int = 65536):
"""Yield a file's contents in chunks without loading it all into memory."""
with open(path, "rb") as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
yield chunk
@app.get("/downloads/{filename}")
async def download_file(filename: str):
path = UPLOAD_DIR / filename
if not path.exists():
raise HTTPException(404, "File not found")
return StreamingResponse(
file_chunks(str(path)),
media_type="application/octet-stream",
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
)
Processing Large Uploaded Files in Chunks
from fastapi import UploadFile, File, HTTPException
import csv, io
MAX_FILE_SIZE = 50 * 1024 * 1024 # 50MB limit
@app.post("/users/import")
async def import_users_streamed(file: UploadFile = File(...)):
if not file.filename.endswith(".csv"):
raise HTTPException(400, "Only CSV files accepted")
# Read in chunks โ avoid loading the entire file at once
content = b""
total_size = 0
chunk_size = 1024 * 64 # 64KB chunks
while True:
chunk = await file.read(chunk_size)
if not chunk:
break
total_size += len(chunk)
if total_size > MAX_FILE_SIZE:
raise HTTPException(413, f"File exceeds {MAX_FILE_SIZE // 1024 // 1024}MB limit")
content += chunk
# Process the collected content
text = content.decode("utf-8")
reader = csv.DictReader(io.StringIO(text))
results = {"imported": 0, "errors": []}
for row_num, row in enumerate(reader, start=2):
try:
# validate and import row
results["imported"] += 1
except Exception as e:
results["errors"].append({"row": row_num, "error": str(e)})
return results
Lazy Database Cursor Pattern
from contextlib import contextmanager
@contextmanager
def db_cursor_stream(query, batch_size: int = 500):
"""Context manager that yields database rows lazily in batches."""
offset = 0
while True:
batch = query.offset(offset).limit(batch_size).all()
if not batch:
break
yield from batch
offset += batch_size
# Usage in a route
@app.get("/stats/tag-counts")
async def tag_statistics(db: Session = Depends(get_db)):
from collections import Counter
query = db.query(Post).filter(Post.published == True)
# Process 500 rows at a time โ never loads all posts at once
tag_counter = Counter()
with db_cursor_stream(query) as posts:
for post in posts:
tag_counter.update(post.tags)
return {"top_tags": tag_counter.most_common(20)}
When to Use Each Approach
| Scenario | Approach | Memory |
|---|---|---|
| Export 100 rows | db.query().all() โ fine |
Constant |
| Export 10,000 rows | yield_per(1000) with StreamingResponse |
O(batch) |
| Export 1M rows | Server-side cursor + streaming CSV | O(batch) |
| Download a 5MB file | Read all at once โ fine | 5MB |
| Download a 500MB file | Stream in 64KB chunks | 64KB |
| Upload a 1MB CSV | Read all + csv.DictReader | ~1MB |
| Upload a 100MB CSV | Read in chunks, validate row by row | O(chunk) |
| Aggregate 1M rows | Stream + in-memory Counter | O(unique keys) |
Common Mistakes
Mistake 1 โ Reading an entire large upload into memory
โ Wrong โ loads the entire 100MB file at once:
content = await file.read() # 100MB file โ 100MB in RAM
โ Correct โ read in chunks:
while chunk := await file.read(65536): # 64KB at a time โ
process(chunk)
Mistake 2 โ Starting a stream before validating auth
โ Wrong โ sends 200 OK before auth check:
return StreamingResponse(generate_data()) # 200 sent, then auth check fails!
โ Correct โ validate everything before returning StreamingResponse:
if not current_user.can_export:
raise HTTPException(403) # sent before stream starts โ
return StreamingResponse(generate_data())
Mistake 3 โ Not expiring SQLAlchemy objects between batches
โ Wrong โ memory grows with every batch:
for batch in batches:
yield from batch # ORM objects accumulate in session identity map!
โ Correct โ expire between batches:
for batch in batches:
yield from batch
db.expire_all() # โ release memory for processed objects
Quick Reference
| Pattern | Code |
|---|---|
| Stream DB rows | query.yield_per(1000) |
| Stream CSV download | StreamingResponse(csv_generator, media_type="text/csv") |
| Stream file chunks | while chunk := f.read(65536): yield chunk |
| Chunk upload read | await file.read(chunk_size) in a loop |
| Release batch memory | db.expire_all() after each batch |
| Content-Disposition | 'attachment; filename="export.csv"' |