Tags connect posts to topics — one post can have many tags, one tag can belong to many posts. This is the classic many-to-many relationship managed through the post_tags association table. Beyond the basic relationship, a production tags API needs: a way to find or create tags on the fly (so clients do not need a separate “create tag” step), a tags listing endpoint that shows each tag’s post count, and clean handling when tags are removed from posts (the tag itself persists, only the association row is deleted). This lesson builds the complete tags management layer.
Tags Model and Association Table
# app/models/tag.py
from sqlalchemy import String, Table, Column, ForeignKey, Integer
from sqlalchemy.orm import Mapped, mapped_column, relationship
from sqlalchemy.sql import func
from datetime import datetime
from app.database import Base
# ── Association table — no ORM class needed ───────────────────────────────────
post_tags = Table(
"post_tags",
Base.metadata,
Column("post_id", ForeignKey("posts.id", ondelete="CASCADE"), primary_key=True),
Column("tag_id", ForeignKey("tags.id", ondelete="CASCADE"), primary_key=True),
)
class Tag(Base):
__tablename__ = "tags"
id: Mapped[int] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(String(100), unique=True, index=True)
slug: Mapped[str] = mapped_column(String(100), unique=True, index=True)
created_at: Mapped[datetime] = mapped_column(server_default=func.now())
# Back-reference: list of posts using this tag
posts: Mapped[list["Post"]] = relationship(
secondary = post_tags,
back_populates = "tags",
)
post_tags is defined as a plain SQLAlchemy Table object (not an ORM model class). This is appropriate when the junction table has no extra columns beyond the two foreign keys. If you need extra data on the association — such as the order in which tags were added or who added them — promote it to a full ORM model class with a compound primary key or a surrogate primary key and additional columns.INSERT ... ON CONFLICT DO NOTHING for bulk tag creation or association. When a client submits a post with tags, you may be creating new tags, referencing existing tags, or a mix. Rather than looping and checking for each tag individually, insert all tags in one statement with conflict handling and then query the IDs. This reduces N individual queries to 2 queries regardless of how many tags are submitted.post.tags = new_tag_list, SQLAlchemy replaces the entire tag list — it deletes all existing post_tags rows for that post and inserts new ones for the new list. This is fine for a PATCH where the client sends the complete desired tag list. But if the client sends an empty list [] to mean “remove all tags”, make sure you distinguish between None (not provided — do not change tags) and [] (explicitly provided — remove all tags). Use Pydantic’s model_dump(exclude_none=True) and a separate check for the tags field.Tags Schemas and Router
from pydantic import BaseModel, Field
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy import select, func
from sqlalchemy.orm import Session
class TagCreate(BaseModel):
name: str = Field(..., min_length=1, max_length=100)
slug: str = Field(..., pattern=r"^[a-z0-9-]+$", max_length=100)
class TagResponse(BaseModel):
model_config = {"from_attributes": True}
id: int
name: str
slug: str
post_count: int = 0 # populated from a query, not a column
router = APIRouter(prefix="/tags", tags=["Tags"])
@router.get("/", response_model=list[TagResponse])
def list_tags(db: Session = Depends(get_db)):
"""Return all tags with their post counts, ordered by popularity."""
rows = db.execute(
select(
Tag.id,
Tag.name,
Tag.slug,
func.count(post_tags.c.post_id).label("post_count"),
)
.outerjoin(post_tags, Tag.id == post_tags.c.tag_id)
.outerjoin(Post, (post_tags.c.post_id == Post.id) &
(Post.status == "published") &
Post.deleted_at.is_(None))
.group_by(Tag.id)
.order_by(func.count(post_tags.c.post_id).desc())
).all()
return [
{"id": r.id, "name": r.name, "slug": r.slug, "post_count": r.post_count}
for r in rows
]
@router.get("/{slug}/posts")
def posts_by_tag(slug: str, page: int = 1, page_size: int = 10,
db: Session = Depends(get_db)):
tag = db.scalars(select(Tag).where(Tag.slug == slug)).first()
if not tag:
raise HTTPException(404, "Tag not found")
stmt = (
select(Post)
.join(Post.tags)
.where(Tag.slug == slug, Post.status == "published",
Post.deleted_at.is_(None))
.distinct()
.order_by(Post.created_at.desc())
)
total = db.scalar(select(func.count()).select_from(stmt.subquery()))
posts = db.scalars(stmt.offset((page-1)*page_size).limit(page_size)).all()
return {"tag": tag.name, "total": total, "posts": posts}
Get-or-Create Tag Helper
from sqlalchemy.dialects.postgresql import insert as pg_insert
import re
def slugify(name: str) -> str:
return re.sub(r"[^\w\s-]", "", name.lower()).strip()
def get_or_create_tags(db: Session, tag_names: list[str]) -> list[Tag]:
"""Find existing tags by name or create them. Returns Tag ORM objects."""
if not tag_names:
return []
# Normalise names
normalised = [t.strip().lower() for t in tag_names if t.strip()]
# Fetch existing tags in one query
existing = db.scalars(
select(Tag).where(Tag.name.in_(normalised))
).all()
existing_names = {t.name for t in existing}
# Create missing tags
new_tags = []
for name in normalised:
if name not in existing_names:
tag = Tag(name=name, slug=slugify(name))
db.add(tag)
new_tags.append(tag)
if new_tags:
db.flush() # get IDs for new tags
return list(existing) + new_tags
Common Mistakes
Mistake 1 — Setting post.tags = [] when update does not include tags
❌ Wrong — removes all tags when PATCH body does not include tags key:
post.tags = update.model_dump().get("tags", []) # tags defaults to [] — removes all!
✅ Correct — only update tags if explicitly provided:
if update.tags is not None: # None = not sent, [] = explicitly remove all
post.tags = get_or_create_tags(db, update.tags) # ✓
Mistake 2 — N+1 queries when building tag list with post counts
❌ Wrong — one count query per tag:
tags = db.scalars(select(Tag)).all()
for tag in tags:
count = db.scalar(select(func.count()).where(post_tags.c.tag_id == tag.id))
tag.post_count = count # N+1!
✅ Correct — single JOIN + GROUP BY query for all counts.
Mistake 3 — Deleting a tag when only the association should be removed
❌ Wrong — deletes the tag entirely when removing from a post:
tag = db.get(Tag, tag_id)
db.delete(tag) # removes the tag for ALL posts, not just this one!
✅ Correct — remove from the relationship list:
post.tags = [t for t in post.tags if t.id != tag_id] # ✓ removes association, not tag
Quick Reference
| Operation | Code |
|---|---|
| Add tag to post | post.tags.append(tag) |
| Replace all tags | post.tags = [tag1, tag2] |
| Remove one tag | post.tags = [t for t in post.tags if t.id != tag_id] |
| Get-or-create | Query by name, create if missing, flush for ID |
| Tag post count | JOIN post_tags + GROUP BY + func.count() |
| Posts by tag | .join(Post.tags).where(Tag.slug == slug).distinct() |