As a FastAPI application grows, placing all database logic directly in route handlers produces handlers that are long, hard to test, and impossible to reuse. The repository pattern extracts database access into dedicated functions or classes — each handler calls repository functions rather than SQLAlchemy directly. The service layer sits above repositories to orchestrate multi-step business operations. Together, these patterns keep handlers thin (validate → call service → return response), repositories focused (one responsibility: database access), and services testable (mock the repository in unit tests). The right level of layering depends on project complexity — a tiny API does not need all three layers.
The Repository Pattern
# app/repositories/post_repository.py
from sqlalchemy import select, func, update, delete
from sqlalchemy.orm import Session, joinedload, selectinload
from app.models.post import Post
def get_post_by_id(db: Session, post_id: int) -> Post | None:
return db.get(Post, post_id)
def get_post_by_slug(db: Session, slug: str) -> Post | None:
return db.scalars(
select(Post).where(Post.slug == slug, Post.deleted_at.is_(None))
).first()
def list_published_posts(
db: Session,
page: int = 1,
page_size: int = 10,
author_id: int | None = None,
search: str | None = None,
) -> tuple[list[Post], int]:
stmt = (
select(Post)
.options(joinedload(Post.author), selectinload(Post.tags))
.where(Post.status == "published", Post.deleted_at.is_(None))
)
if author_id:
stmt = stmt.where(Post.author_id == author_id)
if search:
stmt = stmt.where(
Post.title.ilike(f"%{search}%") | Post.body.ilike(f"%{search}%")
)
total = db.scalar(select(func.count()).select_from(stmt.subquery()))
posts = db.scalars(
stmt.order_by(Post.created_at.desc(), Post.id.desc())
.offset((page - 1) * page_size)
.limit(page_size)
).all()
return posts, total
def create_post(db: Session, data: dict) -> Post:
post = Post(**data)
db.add(post)
db.flush()
db.refresh(post)
return post
def update_post(db: Session, post: Post, changes: dict) -> Post:
for field, value in changes.items():
setattr(post, field, value)
db.flush()
db.refresh(post)
return post
def soft_delete_post(db: Session, post: Post) -> Post:
from sqlalchemy.sql import func
post.deleted_at = func.now()
db.flush()
return post
db.flush() rather than db.commit() — they write changes to the database within the current transaction but do not commit. The commit happens in the get_db dependency after the route handler returns successfully. This means multiple repository calls in one request share a single transaction: if the second call fails, the first is rolled back automatically. Never call db.commit() inside a repository function.The Service Layer
# app/services/post_service.py
from sqlalchemy.orm import Session
from fastapi import HTTPException, status
from app.models.post import Post
from app.models.user import User
from app.repositories import post_repo, tag_repo
class PostService:
def __init__(self, db: Session):
self.db = db
def get_or_404(self, post_id: int) -> Post:
post = post_repo.get_post_by_id(self.db, post_id)
if not post or post.deleted_at:
raise HTTPException(status.HTTP_404_NOT_FOUND, "Post not found")
return post
def require_owner(self, post: Post, user: User) -> None:
if post.author_id != user.id and user.role != "admin":
raise HTTPException(status.HTTP_403_FORBIDDEN, "Not your post")
def create(self, data: dict, author: User) -> Post:
# Check slug uniqueness
existing = post_repo.get_post_by_slug(self.db, data.get("slug", ""))
if existing:
raise HTTPException(status.HTTP_409_CONFLICT,
"A post with this slug already exists")
data["author_id"] = author.id
return post_repo.create_post(self.db, data)
def update(self, post_id: int, changes: dict, user: User) -> Post:
post = self.get_or_404(post_id)
self.require_owner(post, user)
# Slug uniqueness check if slug is changing
if "slug" in changes and changes["slug"] != post.slug:
existing = post_repo.get_post_by_slug(self.db, changes["slug"])
if existing:
raise HTTPException(409, "Slug already in use")
return post_repo.update_post(self.db, post, changes)
def delete(self, post_id: int, user: User) -> None:
post = self.get_or_404(post_id)
self.require_owner(post, user)
post_repo.soft_delete_post(self.db, post)
Thin Route Handlers
# app/routers/posts.py — thin handlers that delegate to the service
from fastapi import APIRouter, Depends, status
from sqlalchemy.orm import Session
from app.dependencies import get_db, get_current_user
from app.schemas.post import PostCreate, PostUpdate, PostResponse
from app.schemas.pagination import Page
from app.services.post_service import PostService
router = APIRouter()
@router.get("/", response_model=Page[PostResponse])
def list_posts(
page: int = 1, page_size: int = 10,
author_id: int | None = None, search: str | None = None,
db: Session = Depends(get_db),
):
svc = PostService(db)
posts, total = post_repo.list_published_posts(db, page, page_size, author_id, search)
return Page.create(items=posts, total=total, page=page, page_size=page_size)
@router.post("/", response_model=PostResponse, status_code=201)
def create_post(
post_in: PostCreate,
db: Session = Depends(get_db),
current_user = Depends(get_current_user),
):
svc = PostService(db)
return svc.create(post_in.model_dump(), current_user)
Common Mistakes
Mistake 1 — Calling db.commit() inside a repository
❌ Wrong — commits prematurely, loses transactional atomicity:
def create_post(db, data):
post = Post(**data)
db.add(post)
db.commit() # committed before the handler might need to do more work!
return post
✅ Correct — use flush, let get_db commit:
def create_post(db, data):
post = Post(**data)
db.add(post)
db.flush() # ✓ writes to DB within transaction, does not commit
db.refresh(post)
return post
Mistake 2 — Fat route handlers (all logic in the handler)
❌ Wrong — all validation, DB access, and business logic in one function:
@router.post("/")
def create_post(post: PostCreate, db = Depends(get_db), user = Depends(get_current_user)):
existing = db.scalars(select(Post).where(Post.slug == post.slug)).first()
if existing: raise HTTPException(409, "Slug taken")
db_post = Post(**post.model_dump(), author_id=user.id)
db.add(db_post)
db.flush()
# ... 20 more lines ...
✅ Correct — delegate to service:
@router.post("/")
def create_post(post: PostCreate, db = Depends(get_db), user = Depends(get_current_user)):
return PostService(db).create(post.model_dump(), user) # ✓ thin handler
Quick Reference
| Layer | Responsibility | Calls |
|---|---|---|
| Route handler | Parse input, call service, return response | Service |
| Service | Business logic, orchestration, validation | Repository |
| Repository | Database queries (flush, not commit) | SQLAlchemy session |