Create, Update and Delete — Session Operations and Transactions

Read operations are the majority of database work, but write operations — create, update, delete — are where transaction management and error handling become critical. SQLAlchemy’s unit-of-work pattern tracks all changes made to ORM objects within a session and writes them to the database in a single efficient batch when you commit. Understanding when SQLAlchemy flushes, how to use db.refresh() to get server-generated values, and how to handle constraint violations cleanly are the skills that separate robust CRUD endpoints from fragile ones.

Creating Records

from sqlalchemy.orm import Session
from sqlalchemy.exc import IntegrityError
from fastapi import HTTPException

# ── Simple create ─────────────────────────────────────────────────────────────
def create_user(db: Session, email: str, name: str, password_hash: str) -> User:
    user = User(email=email, name=name, password_hash=password_hash)
    db.add(user)
    db.commit()
    db.refresh(user)   # reload from DB to get server defaults (id, created_at)
    return user

# ── Create with IntegrityError handling ──────────────────────────────────────
def create_user_safe(db: Session, data: dict) -> User:
    try:
        user = User(**data)
        db.add(user)
        db.commit()
        db.refresh(user)
        return user
    except IntegrityError as e:
        db.rollback()
        if "users_email_key" in str(e.orig) or "unique" in str(e.orig).lower():
            raise HTTPException(409, "A user with this email already exists")
        raise HTTPException(400, "Database constraint violation")

# ── Create with flush (get id before commit) ─────────────────────────────────
def create_post_with_tags(db: Session, post_data: dict, tag_names: list[str]) -> Post:
    post = Post(**post_data)
    db.add(post)
    db.flush()   # writes to DB without committing — post.id is now available

    # Use post.id to create related records in the same transaction
    for tag_name in tag_names:
        tag = db.scalars(select(Tag).where(Tag.name == tag_name)).first()
        if tag:
            post.tags.append(tag)

    db.commit()
    db.refresh(post)
    return post
Note: db.refresh(obj) reloads the object’s attributes from the database. This is necessary after db.commit() if you need server-generated values like id (from SERIAL), created_at (from server_default=func.now()), or any database trigger-computed column. Without db.refresh(), these attributes remain in an “expired” state — accessing them after commit would trigger a lazy reload from the database anyway, but calling refresh explicitly is more predictable and readable.
Tip: db.flush() writes pending changes to the database within the current transaction but does not commit them. This is useful when you need the auto-generated ID of a newly-created record to create related records in the same transaction. The flush sends the INSERT to the database (so the ID is assigned by the sequence), but the entire transaction is still uncommitted and can be rolled back if a later step fails.
Warning: Always call db.rollback() after catching an IntegrityError before reusing the session. An IntegrityError puts the session in a “needs rollback” state — any subsequent operation on the same session will raise PendingRollbackError until you rollback. In the get_db dependency pattern, the rollback happens automatically in the except block, but if you catch exceptions yourself inside a handler, you must explicitly rollback.

Updating Records

from sqlalchemy import select, update
from sqlalchemy.orm import Session

# ── Update by loading and modifying object (recommended) ─────────────────────
def update_post(db: Session, post_id: int, update_data: dict) -> Post | None:
    post = db.get(Post, post_id)
    if post is None:
        return None

    for field, value in update_data.items():
        setattr(post, field, value)

    db.commit()
    db.refresh(post)
    return post

# ── Bulk update with UPDATE statement (no ORM object load) ───────────────────
def archive_old_posts(db: Session) -> int:
    result = db.execute(
        update(Post)
        .where(Post.created_at < func.now() - text("INTERVAL '1 year'"))
        .where(Post.status == "draft")
        .values(status="archived", updated_at=func.now())
    )
    db.commit()
    return result.rowcount   # number of rows updated

# ── Increment a counter (thread-safe) ────────────────────────────────────────
def increment_view_count(db: Session, post_id: int) -> None:
    db.execute(
        update(Post)
        .where(Post.id == post_id)
        .values(view_count=Post.view_count + 1)
    )
    db.commit()

Deleting Records

from sqlalchemy import delete
from sqlalchemy.orm import Session

# ── Delete a single ORM object ────────────────────────────────────────────────
def delete_post(db: Session, post_id: int) -> bool:
    post = db.get(Post, post_id)
    if post is None:
        return False

    db.delete(post)   # marks for deletion, cascade delete-orphan removes comments
    db.commit()
    return True

# ── Bulk delete (no ORM load — direct DELETE statement) ──────────────────────
def delete_archived_posts(db: Session, days_old: int = 90) -> int:
    result = db.execute(
        delete(Post)
        .where(Post.status == "archived")
        .where(Post.updated_at < func.now() - text(f"INTERVAL '{days_old} days'"))
    )
    db.commit()
    return result.rowcount

# ── Soft delete (set deleted_at instead of removing row) ─────────────────────
def soft_delete_post(db: Session, post_id: int, deleted_by: int) -> Post | None:
    post = db.get(Post, post_id)
    if post is None:
        return None
    post.deleted_at = func.now()
    post.deleted_by_id = deleted_by
    db.commit()
    db.refresh(post)
    return post

Bulk Insert for Efficiency

from sqlalchemy import insert

# ── Bulk insert: single INSERT with multiple VALUE rows ───────────────────────
def bulk_create_tags(db: Session, tag_data: list[dict]) -> int:
    if not tag_data:
        return 0
    db.execute(
        insert(Tag).on_conflict_do_nothing(index_elements=["name"]),
        tag_data   # [{"name": "python", "slug": "python"}, ...]
    )
    db.commit()
    return len(tag_data)

# ── Upsert (insert or update on conflict) ─────────────────────────────────────
from sqlalchemy.dialects.postgresql import insert as pg_insert

def upsert_post_view(db: Session, post_id: int, ip: str) -> None:
    stmt = pg_insert(PostView).values(post_id=post_id, ip_address=ip, count=1)
    stmt = stmt.on_conflict_do_update(
        index_elements=["post_id", "ip_address"],
        set_={"count": PostView.count + 1, "last_seen": func.now()},
    )
    db.execute(stmt)
    db.commit()

Common Mistakes

Mistake 1 — Forgetting db.refresh() after commit (stale attributes)

❌ Wrong — returning object with expired server-default attributes:

db.add(user); db.commit()
return user   # id and created_at are expired — accessing them triggers lazy reload

✅ Correct:

db.add(user); db.commit(); db.refresh(user)   # ✓ fresh values from DB
return user

Mistake 2 — Not rolling back after IntegrityError

❌ Wrong — session stuck in error state:

except IntegrityError:
    raise HTTPException(409, "Conflict")   # no rollback — next operation fails!

✅ Correct:

except IntegrityError:
    db.rollback()   # ✓
    raise HTTPException(409, "Conflict")

Mistake 3 — Using ORM load + delete for bulk operations

❌ Wrong — loads all rows into memory just to delete them:

for post in db.scalars(select(Post).where(Post.status == "archived")).all():
    db.delete(post)   # loads 10,000 rows into memory!

✅ Correct — use a bulk DELETE statement:

db.execute(delete(Post).where(Post.status == "archived"))   # ✓ one DELETE query

Quick Reference

Operation Code
Create db.add(obj); db.commit(); db.refresh(obj)
Flush (get ID) db.add(obj); db.flush() # id available, not committed
Update (ORM) obj.field = value; db.commit()
Bulk UPDATE db.execute(update(Model).where(...).values(...))
Delete (ORM) db.delete(obj); db.commit()
Bulk DELETE db.execute(delete(Model).where(...))
Bulk INSERT db.execute(insert(Model), list_of_dicts)
Handle conflict except IntegrityError: db.rollback(); raise HTTPException(409)

🧠 Test Yourself

You create a new User object, call db.add(user) and db.commit(), then return user.id to the client. What happens if you do NOT call db.refresh(user)?