A follow system — where users follow other users to see their content — is a self-referential many-to-many relationship on the users table: the “followers” of user A are users who have a follow relationship pointing TO user A; the “following” of user A are users that A has a follow relationship pointing FROM A. This is also called an adjacency list or self-join many-to-many. Building a follow system introduces the feed endpoint — which returns posts from all users the current user follows — and demonstrates how to write efficient queries for social graph traversal without loading the entire graph into memory.
UserFollow Model — Self-Referential Many-to-Many
# app/models/user_follow.py
from sqlalchemy import ForeignKey, Table, Column, UniqueConstraint
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.sql import func
from datetime import datetime
from app.database import Base
# ── Association table for user follows ────────────────────────────────────────
user_follows = Table(
"user_follows",
Base.metadata,
Column("follower_id", ForeignKey("users.id", ondelete="CASCADE"),
primary_key=True),
Column("following_id", ForeignKey("users.id", ondelete="CASCADE"),
primary_key=True),
Column("created_at", server_default=func.now()),
)
# ── Add to User model ─────────────────────────────────────────────────────────
# (In app/models/user.py, add these two relationships)
#
# following: Mapped[list["User"]] = relationship(
# secondary = user_follows,
# primaryjoin = id == user_follows.c.follower_id,
# secondaryjoin = id == user_follows.c.following_id,
# back_populates = "followers",
# )
#
# followers: Mapped[list["User"]] = relationship(
# secondary = user_follows,
# primaryjoin = id == user_follows.c.following_id,
# secondaryjoin = id == user_follows.c.follower_id,
# back_populates = "following",
# )
#
# Also add integer counter caches:
# follower_count: Mapped[int] = mapped_column(Integer, default=0)
# following_count: Mapped[int] = mapped_column(Integer, default=0)
primaryjoin and secondaryjoin parameters because SQLAlchemy cannot automatically determine which end of the foreign key is the “from” side and which is the “to” side when both foreign keys point to the same table. The primaryjoin specifies how the current model connects to the association table; the secondaryjoin specifies how the association table connects back to the related model (which in a self-referential case is the same model).SELECT posts.* FROM posts JOIN user_follows ON posts.author_id = user_follows.following_id WHERE user_follows.follower_id = :user_id. This scales to thousands of followed users — the database joins the data rather than your Python code iterating through lists.UPDATE users SET follower_count = follower_count + 1) rather than read-modify-write (user.follower_count += 1) to avoid lost updates under concurrency.Follow and Unfollow Endpoints
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy import select, insert, delete, update
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session
router = APIRouter(prefix="/users", tags=["Users"])
@router.post("/{user_id}/follow", status_code=200)
def follow_user(
user_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
if user_id == current_user.id:
raise HTTPException(400, "Cannot follow yourself")
target = db.get(User, user_id)
if not target:
raise HTTPException(404, "User not found")
# Check if already following
existing = db.execute(
select(user_follows).where(
user_follows.c.follower_id == current_user.id,
user_follows.c.following_id == user_id,
)
).first()
if existing:
return {"following": True, "follower_count": target.follower_count}
try:
db.execute(
insert(user_follows).values(
follower_id=current_user.id, following_id=user_id
)
)
# Atomic counter update
db.execute(
update(User).where(User.id == user_id)
.values(follower_count=User.follower_count + 1)
)
db.execute(
update(User).where(User.id == current_user.id)
.values(following_count=User.following_count + 1)
)
db.flush()
return {"following": True, "follower_count": target.follower_count + 1}
except IntegrityError:
db.rollback()
return {"following": True, "follower_count": target.follower_count}
@router.delete("/{user_id}/follow", status_code=200)
def unfollow_user(
user_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
result = db.execute(
delete(user_follows).where(
user_follows.c.follower_id == current_user.id,
user_follows.c.following_id == user_id,
)
)
if result.rowcount == 0:
raise HTTPException(400, "Not following this user")
db.execute(update(User).where(User.id == user_id)
.values(follower_count=User.follower_count - 1))
db.execute(update(User).where(User.id == current_user.id)
.values(following_count=User.following_count - 1))
db.flush()
return {"following": False}
Feed and Follower List Endpoints
from sqlalchemy.orm import joinedload, selectinload
@router.get("/me/feed", response_model=Page[PostResponse])
def get_feed(
page: int = 1,
page_size: int = 20,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user),
):
"""Return posts from users the current user follows."""
stmt = (
select(Post)
.options(joinedload(Post.author), selectinload(Post.tags))
.join(user_follows,
(Post.author_id == user_follows.c.following_id) &
(user_follows.c.follower_id == current_user.id))
.where(Post.status == "published", Post.deleted_at.is_(None))
.order_by(Post.created_at.desc(), Post.id.desc())
)
total = db.scalar(select(func.count()).select_from(stmt.subquery()))
posts = db.scalars(
stmt.offset((page - 1) * page_size).limit(page_size)
).all()
return Page.create(items=posts, total=total, page=page, page_size=page_size)
@router.get("/{user_id}/followers", response_model=Page[UserSummary])
def list_followers(user_id: int, page: int = 1, page_size: int = 20,
db: Session = Depends(get_db)):
stmt = (
select(User)
.join(user_follows,
(User.id == user_follows.c.follower_id) &
(user_follows.c.following_id == user_id))
.where(User.is_active == True)
.order_by(user_follows.c.created_at.desc())
)
total = db.scalar(select(func.count()).select_from(stmt.subquery()))
users = db.scalars(stmt.offset((page-1)*page_size).limit(page_size)).all()
return Page.create(items=users, total=total, page=page, page_size=page_size)
Common Mistakes
Mistake 1 — Allowing self-follow
❌ Wrong — user can follow themselves:
db.execute(insert(user_follows).values(follower_id=current_user.id, following_id=user_id))
# No check: current_user.id == user_id → appears in own feed!
✅ Correct — check at the start of the endpoint:
if user_id == current_user.id:
raise HTTPException(400, "Cannot follow yourself") # ✓
Mistake 2 — N+1 feed query (one post query per followed user)
❌ Wrong — loads followed users, then queries posts for each:
followed = db.scalars(select(User).join(user_follows, ...)).all()
for user in followed:
posts = db.scalars(select(Post).where(Post.author_id == user.id)).all()
# N queries for N followed users!
✅ Correct — single JOIN to get all feed posts.
Mistake 3 — Non-atomic counter update (read-modify-write race condition)
❌ Wrong — read-modify-write can lose concurrent updates:
target = db.get(User, user_id)
target.follower_count += 1 # reads current value, another thread may have updated it!
✅ Correct — SQL atomic increment:
db.execute(update(User).where(User.id==user_id)
.values(follower_count=User.follower_count + 1)) # ✓ atomic
Quick Reference
| Endpoint | Auth | Key Logic |
|---|---|---|
| POST /users/{id}/follow | Yes | No self-follow, check existing, atomic counter |
| DELETE /users/{id}/follow | Yes | Delete row, atomic counter decrement |
| GET /me/feed | Yes | JOIN on user_follows, paginated |
| GET /users/{id}/followers | No | JOIN users → user_follows, paginated |
| GET /users/{id}/following | No | JOIN users → user_follows (reversed), paginated |