Querying — select(), filter(), order_by() and Joins

SQLAlchemy 2.x uses a select()-based query API that is more explicit, composable, and type-safe than the legacy session.query() style. Understanding the query API — how to filter, sort, join, aggregate, and paginate — is the core skill for writing efficient FastAPI endpoints backed by SQLAlchemy. This lesson covers the query patterns you will use most often: single-object lookup, filtered lists, joins for related data, COUNT for pagination, and how to compose queries dynamically based on optional filter parameters.

select() — The Modern Query API

from sqlalchemy import select, func, and_, or_
from sqlalchemy.orm import Session, joinedload, selectinload

# ── Fetch all rows ─────────────────────────────────────────────────────────────
stmt = select(Post)
posts = session.scalars(stmt).all()   # list of Post ORM objects

# ── Fetch by primary key (fastest — uses identity map) ────────────────────────
post = session.get(Post, post_id)     # returns Post or None

# ── Filter with where() ───────────────────────────────────────────────────────
stmt = select(Post).where(Post.status == "published")
stmt = select(Post).where(Post.author_id == 5, Post.status == "published")
stmt = select(Post).where(
    and_(Post.author_id == 5, Post.status == "published")   # explicit AND
)
stmt = select(Post).where(
    or_(Post.status == "published", Post.status == "draft")  # OR
)

# ── Order and paginate ────────────────────────────────────────────────────────
stmt = (
    select(Post)
    .where(Post.status == "published")
    .order_by(Post.created_at.desc(), Post.id.desc())  # newest first, id as tiebreaker
    .offset((page - 1) * page_size)
    .limit(page_size)
)
posts = session.scalars(stmt).all()

# ── Count ─────────────────────────────────────────────────────────────────────
count_stmt = select(func.count()).select_from(Post).where(Post.status == "published")
total = session.scalar(count_stmt)   # returns int
Note: session.scalars(stmt).all() returns a list of the first column’s values — when you select a Model, this gives you a list of model instances. session.execute(stmt).all() returns a list of Row objects (tuples) — useful when selecting multiple columns or expressions. session.scalar(stmt) returns a single value (the first column of the first row) — useful for COUNT queries. Always use session.get(Model, pk) for primary key lookups — it checks the identity map first and only queries the database if the object is not already cached in the session.
Tip: Build queries dynamically for endpoints with optional filters by starting with the base query and conditionally adding WHERE clauses. This is cleaner than building SQL strings or using complex ternary expressions: if author_id: stmt = stmt.where(Post.author_id == author_id). Each .where() call returns a new query object (SQLAlchemy queries are immutable), so you can compose them without mutation concerns.
Warning: Avoid session.query(Post).all() (the legacy 1.x API) in new code. While it still works in SQLAlchemy 2.x (via compatibility mode), it is deprecated and will eventually be removed. Use session.scalars(select(Post)).all() for new code. The select() API is more composable, better typed, and works identically with both synchronous and async sessions.
from sqlalchemy import select
from sqlalchemy.orm import joinedload

# ── Join to filter on related table ──────────────────────────────────────────
# Find all published posts by admin users
stmt = (
    select(Post)
    .join(Post.author)                          # JOIN users ON posts.author_id = users.id
    .where(Post.status == "published")
    .where(User.role == "admin")
    .options(joinedload(Post.author))           # still need to load author for response
    .order_by(Post.created_at.desc())
)
posts = session.scalars(stmt).all()

# ── Outer join (keep posts even if author was deleted) ────────────────────────
stmt = (
    select(Post)
    .outerjoin(Post.author)
    .where(Post.status == "published")
)

# ── Filter on many-to-many related model ────────────────────────────────────
# Posts tagged with "python"
stmt = (
    select(Post)
    .join(Post.tags)                            # JOIN post_tags JOIN tags
    .where(Tag.name == "python")
    .where(Post.status == "published")
    .distinct()                                 # avoid duplicate posts with multiple tags
)

# ── Selecting specific columns (no full object load) ─────────────────────────
# For list views, avoid loading the full body text
stmt = (
    select(Post.id, Post.title, Post.slug, Post.created_at, User.name.label("author_name"))
    .join(Post.author)
    .where(Post.status == "published")
    .order_by(Post.created_at.desc())
    .limit(20)
)
rows = session.execute(stmt).all()   # list of Row namedtuples
for row in rows:
    print(row.id, row.title, row.author_name)

Dynamic Query Building

from sqlalchemy import select, or_
from sqlalchemy.orm import Session
from typing import Literal

def get_posts(
    db:          Session,
    page:        int = 1,
    page_size:   int = 10,
    author_id:   int | None = None,
    status:      str | None = None,
    search:      str | None = None,
    sort_by:     Literal["created_at", "view_count", "title"] = "created_at",
    order:       Literal["asc", "desc"] = "desc",
) -> tuple[list[Post], int]:
    """Build a paginated, filtered post query from optional parameters."""

    # Start with the base query
    base = select(Post).options(joinedload(Post.author), selectinload(Post.tags))

    # Apply filters conditionally
    if author_id is not None:
        base = base.where(Post.author_id == author_id)
    if status is not None:
        base = base.where(Post.status == status)
    if search:
        base = base.where(
            or_(
                Post.title.ilike(f"%{search}%"),
                Post.body.ilike(f"%{search}%"),
            )
        )

    # Count total matching rows (without pagination)
    count_stmt = select(func.count()).select_from(base.subquery())
    total = db.scalar(count_stmt)

    # Apply sort
    sort_col = {
        "created_at": Post.created_at,
        "view_count":  Post.view_count,
        "title":       Post.title,
    }[sort_by]
    if order == "desc":
        base = base.order_by(sort_col.desc(), Post.id.desc())
    else:
        base = base.order_by(sort_col.asc(), Post.id.asc())

    # Apply pagination
    base = base.offset((page - 1) * page_size).limit(page_size)

    posts = db.scalars(base).all()
    return posts, total

Common Mistakes

Mistake 1 — Using session.query() in new code

❌ Wrong — legacy 1.x API:

posts = db.query(Post).filter(Post.status == "published").all()

✅ Correct — modern 2.x select() API:

posts = db.scalars(select(Post).where(Post.status == "published")).all()   # ✓

❌ Wrong — duplicate posts if a post has multiple matching tags:

stmt = select(Post).join(Post.tags).where(Tag.name.in_(["python", "fastapi"]))
# Post tagged with both python AND fastapi appears twice!

✅ Correct — deduplicate with distinct():

stmt = select(Post).join(Post.tags).where(Tag.name.in_(["python", "fastapi"])).distinct()   # ✓

Mistake 3 — Loading full ORM objects when only a few columns are needed

❌ Wrong — loads body text (potentially megabytes) for a list view:

posts = db.scalars(select(Post).limit(20)).all()
# Returns full Post objects including body column

✅ Correct — select only needed columns:

rows = db.execute(select(Post.id, Post.title, Post.created_at).limit(20)).all()   # ✓

Quick Reference

Operation SQLAlchemy 2.x Code
Fetch all db.scalars(select(Post)).all()
Fetch by PK db.get(Post, pk)
Fetch first db.scalars(select(Post).where(...)).first()
Count db.scalar(select(func.count()).select_from(Post))
Filter select(Post).where(Post.status == "published")
Sort .order_by(Post.created_at.desc())
Paginate .offset((page-1)*size).limit(size)
Join + filter .join(Post.author).where(User.role == "admin")

🧠 Test Yourself

You need the total count of published posts by author_id=5, then the first 10 of them. What is the most efficient approach?