Data Transformation Patterns in Python

Data transformation โ€” converting data from one shape to another โ€” is one of the most common tasks in any FastAPI application. Query results arrive as flat lists of rows but need to be reshaped into nested JSON structures. External API responses arrive in camelCase but need to be stored in snake_case. Pagination metadata needs to be calculated and attached to response bodies. Aggregations that could be done in SQL sometimes need to happen in Python when the database query is already constructed. Knowing clean, readable, and testable patterns for these transformations keeps your route handlers and service functions concise.

Flattening and Reshaping

# โ”€โ”€ Flatten a nested structure โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
nested = {
    "user": {"id": 1, "name": "Alice"},
    "post": {"id": 10, "title": "Hello", "tags": ["python", "fastapi"]},
}

def flatten(d: dict, parent_key: str = "", sep: str = "_") -> dict:
    items = {}
    for k, v in d.items():
        key = f"{parent_key}{sep}{k}" if parent_key else k
        if isinstance(v, dict):
            items.update(flatten(v, key, sep))
        else:
            items[key] = v
    return items

flat = flatten(nested)
# {"user_id": 1, "user_name": "Alice", "post_id": 10, "post_title": "Hello", "post_tags": [...]}

# โ”€โ”€ Group a flat list by a key (reshape to nested) โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
comments = [
    {"post_id": 1, "author": "Alice", "text": "Great!"},
    {"post_id": 2, "author": "Bob",   "text": "Thanks!"},
    {"post_id": 1, "author": "Charlie", "text": "Agreed!"},
]

from collections import defaultdict

comments_by_post = defaultdict(list)
for c in comments:
    comments_by_post[c["post_id"]].append(
        {"author": c["author"], "text": c["text"]}
    )
# {1: [{"author": "Alice", ...}, {"author": "Charlie", ...}], 2: [...]}
Note: The choice between doing data transformation in SQL (using JOINs, GROUP BY, aggregation functions) vs Python depends on data volume and query complexity. For small result sets (under a few thousand rows), Python transformation is fine and often easier to read and test. For large datasets, always push aggregation into SQL โ€” the database is optimised for it and network transfer of large raw result sets is expensive.
Tip: The pipeline pattern โ€” a list of transformation functions applied in sequence โ€” makes complex data processing readable and testable. Each step is a pure function that takes data and returns transformed data. You can test each step independently and compose them easily: result = transform_c(transform_b(transform_a(raw_data))). The functools.reduce() function applies this pattern programmatically to a list of transform functions.
Warning: Avoid building large intermediate lists when you only need to produce a final result. If you are transforming 10,000 rows just to pass them to sum() or len(), use a generator expression instead of a list comprehension โ€” it avoids allocating 10,000 items in memory just to reduce them to a single number. This matters in FastAPI endpoints that process large query results before returning a summary.

Pagination Helper

from math import ceil
from pydantic import BaseModel
from typing import TypeVar, Generic, List

T = TypeVar("T")

class Page(BaseModel, Generic[T]):
    """Generic paginated response wrapper."""
    items:      List[T]
    total:      int
    page:       int
    page_size:  int
    pages:      int
    has_next:   bool
    has_prev:   bool

def paginate(items: list, page: int, page_size: int, total: int | None = None) -> Page:
    """Wrap a list slice into a Page response object."""
    if total is None:
        total = len(items)
        items = items[(page - 1) * page_size : page * page_size]

    pages = ceil(total / page_size) if page_size > 0 else 0

    return Page(
        items     = items,
        total     = total,
        page      = page,
        page_size = page_size,
        pages     = pages,
        has_next  = page < pages,
        has_prev  = page > 1,
    )

# With database (total from COUNT query):
@app.get("/posts")
async def list_posts(page: int = 1, limit: int = 10, db = Depends(get_db)):
    query  = db.query(Post).filter(Post.published == True)
    total  = query.count()
    posts  = query.offset((page - 1) * limit).limit(limit).all()
    return paginate(posts, page, limit, total)

In-Memory Aggregations

posts = [
    {"id": 1, "views": 150, "tags": ["python", "fastapi"], "author": "Alice"},
    {"id": 2, "views": 500, "tags": ["python"],            "author": "Bob"},
    {"id": 3, "views": 75,  "tags": ["fastapi", "react"],  "author": "Alice"},
    {"id": 4, "views": 300, "tags": ["react"],             "author": "Charlie"},
]

# Total views
total_views = sum(p["views"] for p in posts)   # 1025

# Average views
avg_views = total_views / len(posts)   # 256.25

# Top 2 by views
top_posts = sorted(posts, key=lambda p: p["views"], reverse=True)[:2]

# Views by author
from collections import defaultdict
views_by_author = defaultdict(int)
for p in posts:
    views_by_author[p["author"]] += p["views"]
# {"Alice": 225, "Bob": 500, "Charlie": 300}

# Tag frequency
from collections import Counter
all_tags = [tag for post in posts for tag in post["tags"]]
tag_counts = Counter(all_tags).most_common(3)
# [("python", 2), ("fastapi", 2), ("react", 2)]

# Posts per author
posts_per_author = Counter(p["author"] for p in posts)
# Counter({"Alice": 2, "Bob": 1, "Charlie": 1})

The Pipeline Pattern

from typing import Callable, TypeVar
from functools import reduce

T = TypeVar("T")

def pipeline(*transforms: Callable) -> Callable:
    """Return a function that applies transforms in sequence."""
    def apply(data):
        return reduce(lambda d, f: f(d), transforms, data)
    return apply

# Define transformation steps as pure functions
def filter_published(posts: list) -> list:
    return [p for p in posts if p.get("published")]

def sort_by_views(posts: list) -> list:
    return sorted(posts, key=lambda p: p["views"], reverse=True)

def take(n: int) -> Callable:
    return lambda posts: posts[:n]

def add_rank(posts: list) -> list:
    return [{**p, "rank": i + 1} for i, p in enumerate(posts)]

# Compose the pipeline
top_5_published = pipeline(
    filter_published,
    sort_by_views,
    take(5),
    add_rank,
)

result = top_5_published(all_posts)
# Each step transforms the list โ€” clean, testable, reusable

# FastAPI use
@app.get("/posts/trending")
async def trending_posts(db = Depends(get_db)):
    raw_posts = db.query(Post).all()
    return top_5_published([p.__dict__ for p in raw_posts])

Common Mistakes

Mistake 1 โ€” Building large intermediate lists unnecessarily

โŒ Wrong โ€” builds full list just to count:

published_views = [p["views"] for p in posts if p["published"]]  # full list in memory
total = sum(published_views)   # list not needed

โœ… Correct โ€” use a generator expression:

total = sum(p["views"] for p in posts if p["published"])   # no intermediate list โœ“

Mistake 2 โ€” Mutating input data in transformation functions

โŒ Wrong โ€” modifies the caller’s data:

def add_slug(posts: list) -> list:
    for post in posts:
        post["slug"] = post["title"].lower().replace(" ", "-")   # mutates original!
    return posts

โœ… Correct โ€” return new objects:

def add_slug(posts: list) -> list:
    return [{**p, "slug": p["title"].lower().replace(" ", "-")} for p in posts]   # โœ“

Mistake 3 โ€” Doing in Python what should be done in SQL

โŒ Wrong โ€” loading all posts to count per author:

all_posts = db.query(Post).all()   # fetches all rows
by_author = Counter(p.author_id for p in all_posts)   # 1M posts = 1M rows fetched!

โœ… Correct โ€” aggregate in SQL:

from sqlalchemy import func
result = db.query(Post.author_id, func.count(Post.id)).group_by(Post.author_id).all()
by_author = dict(result)   # โœ“ database does the counting

Quick Reference

Task Pattern
Group by key defaultdict(list) + loop
Count occurrences Counter(items).most_common(n)
Sum/max/min sum(x["val"] for x in items)
Sort + slice sorted(items, key=..., reverse=True)[:n]
Paginate items[(page-1)*size : page*size]
Pipeline functools.reduce(lambda d, f: f(d), transforms, data)
Non-mutating transform [{**item, "new_key": val} for item in items]
Flatten nested Recursive flatten function

🧠 Test Yourself

You need to find the total view count of all published posts in a list of 50,000 post dicts. Which approach is most memory-efficient?