Data transformation โ converting data from one shape to another โ is one of the most common tasks in any FastAPI application. Query results arrive as flat lists of rows but need to be reshaped into nested JSON structures. External API responses arrive in camelCase but need to be stored in snake_case. Pagination metadata needs to be calculated and attached to response bodies. Aggregations that could be done in SQL sometimes need to happen in Python when the database query is already constructed. Knowing clean, readable, and testable patterns for these transformations keeps your route handlers and service functions concise.
Flattening and Reshaping
# โโ Flatten a nested structure โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
nested = {
"user": {"id": 1, "name": "Alice"},
"post": {"id": 10, "title": "Hello", "tags": ["python", "fastapi"]},
}
def flatten(d: dict, parent_key: str = "", sep: str = "_") -> dict:
items = {}
for k, v in d.items():
key = f"{parent_key}{sep}{k}" if parent_key else k
if isinstance(v, dict):
items.update(flatten(v, key, sep))
else:
items[key] = v
return items
flat = flatten(nested)
# {"user_id": 1, "user_name": "Alice", "post_id": 10, "post_title": "Hello", "post_tags": [...]}
# โโ Group a flat list by a key (reshape to nested) โโโโโโโโโโโโโโโโโโโโโโโโโโโโ
comments = [
{"post_id": 1, "author": "Alice", "text": "Great!"},
{"post_id": 2, "author": "Bob", "text": "Thanks!"},
{"post_id": 1, "author": "Charlie", "text": "Agreed!"},
]
from collections import defaultdict
comments_by_post = defaultdict(list)
for c in comments:
comments_by_post[c["post_id"]].append(
{"author": c["author"], "text": c["text"]}
)
# {1: [{"author": "Alice", ...}, {"author": "Charlie", ...}], 2: [...]}
result = transform_c(transform_b(transform_a(raw_data))). The functools.reduce() function applies this pattern programmatically to a list of transform functions.sum() or len(), use a generator expression instead of a list comprehension โ it avoids allocating 10,000 items in memory just to reduce them to a single number. This matters in FastAPI endpoints that process large query results before returning a summary.Pagination Helper
from math import ceil
from pydantic import BaseModel
from typing import TypeVar, Generic, List
T = TypeVar("T")
class Page(BaseModel, Generic[T]):
"""Generic paginated response wrapper."""
items: List[T]
total: int
page: int
page_size: int
pages: int
has_next: bool
has_prev: bool
def paginate(items: list, page: int, page_size: int, total: int | None = None) -> Page:
"""Wrap a list slice into a Page response object."""
if total is None:
total = len(items)
items = items[(page - 1) * page_size : page * page_size]
pages = ceil(total / page_size) if page_size > 0 else 0
return Page(
items = items,
total = total,
page = page,
page_size = page_size,
pages = pages,
has_next = page < pages,
has_prev = page > 1,
)
# With database (total from COUNT query):
@app.get("/posts")
async def list_posts(page: int = 1, limit: int = 10, db = Depends(get_db)):
query = db.query(Post).filter(Post.published == True)
total = query.count()
posts = query.offset((page - 1) * limit).limit(limit).all()
return paginate(posts, page, limit, total)
In-Memory Aggregations
posts = [
{"id": 1, "views": 150, "tags": ["python", "fastapi"], "author": "Alice"},
{"id": 2, "views": 500, "tags": ["python"], "author": "Bob"},
{"id": 3, "views": 75, "tags": ["fastapi", "react"], "author": "Alice"},
{"id": 4, "views": 300, "tags": ["react"], "author": "Charlie"},
]
# Total views
total_views = sum(p["views"] for p in posts) # 1025
# Average views
avg_views = total_views / len(posts) # 256.25
# Top 2 by views
top_posts = sorted(posts, key=lambda p: p["views"], reverse=True)[:2]
# Views by author
from collections import defaultdict
views_by_author = defaultdict(int)
for p in posts:
views_by_author[p["author"]] += p["views"]
# {"Alice": 225, "Bob": 500, "Charlie": 300}
# Tag frequency
from collections import Counter
all_tags = [tag for post in posts for tag in post["tags"]]
tag_counts = Counter(all_tags).most_common(3)
# [("python", 2), ("fastapi", 2), ("react", 2)]
# Posts per author
posts_per_author = Counter(p["author"] for p in posts)
# Counter({"Alice": 2, "Bob": 1, "Charlie": 1})
The Pipeline Pattern
from typing import Callable, TypeVar
from functools import reduce
T = TypeVar("T")
def pipeline(*transforms: Callable) -> Callable:
"""Return a function that applies transforms in sequence."""
def apply(data):
return reduce(lambda d, f: f(d), transforms, data)
return apply
# Define transformation steps as pure functions
def filter_published(posts: list) -> list:
return [p for p in posts if p.get("published")]
def sort_by_views(posts: list) -> list:
return sorted(posts, key=lambda p: p["views"], reverse=True)
def take(n: int) -> Callable:
return lambda posts: posts[:n]
def add_rank(posts: list) -> list:
return [{**p, "rank": i + 1} for i, p in enumerate(posts)]
# Compose the pipeline
top_5_published = pipeline(
filter_published,
sort_by_views,
take(5),
add_rank,
)
result = top_5_published(all_posts)
# Each step transforms the list โ clean, testable, reusable
# FastAPI use
@app.get("/posts/trending")
async def trending_posts(db = Depends(get_db)):
raw_posts = db.query(Post).all()
return top_5_published([p.__dict__ for p in raw_posts])
Common Mistakes
Mistake 1 โ Building large intermediate lists unnecessarily
โ Wrong โ builds full list just to count:
published_views = [p["views"] for p in posts if p["published"]] # full list in memory
total = sum(published_views) # list not needed
โ Correct โ use a generator expression:
total = sum(p["views"] for p in posts if p["published"]) # no intermediate list โ
Mistake 2 โ Mutating input data in transformation functions
โ Wrong โ modifies the caller’s data:
def add_slug(posts: list) -> list:
for post in posts:
post["slug"] = post["title"].lower().replace(" ", "-") # mutates original!
return posts
โ Correct โ return new objects:
def add_slug(posts: list) -> list:
return [{**p, "slug": p["title"].lower().replace(" ", "-")} for p in posts] # โ
Mistake 3 โ Doing in Python what should be done in SQL
โ Wrong โ loading all posts to count per author:
all_posts = db.query(Post).all() # fetches all rows
by_author = Counter(p.author_id for p in all_posts) # 1M posts = 1M rows fetched!
โ Correct โ aggregate in SQL:
from sqlalchemy import func
result = db.query(Post.author_id, func.count(Post.id)).group_by(Post.author_id).all()
by_author = dict(result) # โ database does the counting
Quick Reference
| Task | Pattern |
|---|---|
| Group by key | defaultdict(list) + loop |
| Count occurrences | Counter(items).most_common(n) |
| Sum/max/min | sum(x["val"] for x in items) |
| Sort + slice | sorted(items, key=..., reverse=True)[:n] |
| Paginate | items[(page-1)*size : page*size] |
| Pipeline | functools.reduce(lambda d, f: f(d), transforms, data) |
| Non-mutating transform | [{**item, "new_key": val} for item in items] |
| Flatten nested | Recursive flatten function |