psycopg2 — The Synchronous PostgreSQL Driver

psycopg2 is the most widely deployed PostgreSQL adapter for Python. It is a C extension that wraps the libpq client library, providing a DB-API 2.0 compliant interface. When you use standard synchronous SQLAlchemy in a FastAPI application (with plain def route handlers), psycopg2 is the driver underneath — SQLAlchemy translates your ORM calls to SQL and hands them to psycopg2 to execute. Understanding psycopg2 directly helps you debug connection errors, write efficient raw queries for complex operations the ORM struggles with, and tune connection pool settings correctly.

Connecting and Executing Queries

import psycopg2
import psycopg2.extras   # for DictCursor

# ── Basic connection ──────────────────────────────────────────────────────────
conn = psycopg2.connect(
    host     = "localhost",
    port     = 5432,
    dbname   = "blog_dev",
    user     = "blog_app",
    password = "your_password",
)
# Or use a connection string:
conn = psycopg2.connect("postgresql://blog_app:password@localhost:5432/blog_dev")

# ── Execute a query ───────────────────────────────────────────────────────────
cursor = conn.cursor()

# ALWAYS use parameterised queries — never string interpolation
cursor.execute(
    "SELECT id, title, status FROM posts WHERE author_id = %s AND status = %s",
    (1, "published"),     # tuple of parameters — %s placeholders
)

# Fetch results
rows     = cursor.fetchall()     # list of tuples
one_row  = cursor.fetchone()     # single tuple or None
n_rows   = cursor.fetchmany(10)  # list of up to 10 tuples

# Access by index
for row in rows:
    print(row[0], row[1], row[2])  # id, title, status

cursor.close()
conn.close()
Note: psycopg2 uses %s as the parameter placeholder for all types — not ? like sqlite3 or :name like SQLAlchemy. The parameters are passed as a tuple (or list) as the second argument to cursor.execute(). psycopg2 handles the escaping and quoting — passing a Python string "O'Brien" is safe because psycopg2 properly escapes the single quote. Never format the SQL string yourself: f"WHERE id = {user_id}" is a SQL injection vulnerability.
Tip: Use psycopg2.extras.DictCursor or RealDictCursor to get results as dictionaries instead of tuples. With DictCursor, rows behave like dicts: row["title"] instead of row[1]. With RealDictCursor, rows are real Python dicts (fully serialisable). Use RealDictCursor when you need to convert results to JSON or pass them to Pydantic. Create a cursor with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor).
Warning: psycopg2 connections are not thread-safe — do not share a single connection between multiple threads. Each thread needs its own connection. In a multi-threaded FastAPI application (with multiple Uvicorn workers), use psycopg2.pool.ThreadedConnectionPool or, better, let SQLAlchemy manage the connection pool. Never create a single global psycopg2 connection at module level in a FastAPI application.

DictCursor and Transaction Control

import psycopg2
import psycopg2.extras

conn = psycopg2.connect("postgresql://blog_app:password@localhost/blog_dev")

# ── DictCursor — access results by column name ────────────────────────────────
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
    cur.execute(
        "SELECT id, title, view_count FROM posts WHERE status = %s LIMIT %s",
        ("published", 10)
    )
    posts = cur.fetchall()   # list of RealDictRow (behaves like dict)
    for post in posts:
        print(post["id"], post["title"], post["view_count"])

# ── Transaction control ────────────────────────────────────────────────────────
# psycopg2 starts a transaction automatically on the first query
# Must commit or rollback explicitly

try:
    with conn.cursor() as cur:
        cur.execute(
            "INSERT INTO users (email, name) VALUES (%s, %s) RETURNING id",
            ("alice@example.com", "Alice Smith")
        )
        user_id = cur.fetchone()[0]

        cur.execute(
            "INSERT INTO posts (author_id, title, slug, body) VALUES (%s, %s, %s, %s)",
            (user_id, "First Post", "first-post", "Hello world")
        )
    conn.commit()   # both INSERTs committed atomically
    print(f"Created user {user_id}")

except psycopg2.IntegrityError as e:
    conn.rollback()   # must rollback before next operation after an error
    print(f"Duplicate email or other constraint violation: {e}")
except Exception as e:
    conn.rollback()
    raise
finally:
    conn.close()

Connection Pooling with ThreadedConnectionPool

import psycopg2.pool
from contextlib import contextmanager

# Create a pool (at module startup)
pool = psycopg2.pool.ThreadedConnectionPool(
    minconn = 2,
    maxconn = 10,
    dsn     = "postgresql://blog_app:password@localhost/blog_dev",
)

@contextmanager
def get_db_conn():
    """Context manager that checks out a connection and returns it to the pool."""
    conn = pool.getconn()
    try:
        yield conn
        conn.commit()
    except Exception:
        conn.rollback()
        raise
    finally:
        pool.putconn(conn)   # return to pool (not close!)

# Usage:
def get_published_posts(limit: int = 10) -> list[dict]:
    with get_db_conn() as conn:
        with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
            cur.execute(
                "SELECT id, title, view_count FROM posts "
                "WHERE status = 'published' ORDER BY created_at DESC LIMIT %s",
                (limit,)
            )
            return [dict(row) for row in cur.fetchall()]

# In FastAPI:
from fastapi import FastAPI
app = FastAPI()

@app.get("/posts")
def list_posts(limit: int = 10):
    return get_published_posts(limit)

Executing Multiple Statements Efficiently

import psycopg2.extras

# ── executemany: insert multiple rows ─────────────────────────────────────────
# Slower than execute_values but simpler
with conn.cursor() as cur:
    posts = [
        ("Title A", "author@example.com"),
        ("Title B", "author@example.com"),
    ]
    cur.executemany(
        "INSERT INTO posts (title, email) VALUES (%s, %s)",
        posts
    )
conn.commit()

# ── execute_values: fastest bulk insert ───────────────────────────────────────
# Generates a single multi-row VALUES statement
with conn.cursor() as cur:
    posts = [(1, "Hello World", "hello-world"), (1, "FastAPI Guide", "fastapi-guide")]
    psycopg2.extras.execute_values(
        cur,
        "INSERT INTO posts (author_id, title, slug) VALUES %s",
        posts,
        template="(%s, %s, %s)"
    )
conn.commit()

# ── copy_expert: fastest bulk load ────────────────────────────────────────────
# Uses PostgreSQL's COPY protocol — fastest for millions of rows
import io, csv

buf = io.StringIO()
writer = csv.writer(buf)
for post in large_dataset:
    writer.writerow([post["author_id"], post["title"], post["body"]])
buf.seek(0)

with conn.cursor() as cur:
    cur.copy_expert(
        "COPY posts (author_id, title, body) FROM STDIN WITH CSV",
        buf
    )
conn.commit()

Common Mistakes

Mistake 1 — String interpolation instead of parameterised queries

❌ Wrong — SQL injection vulnerability:

user_id = request.query_params["id"]
cur.execute(f"SELECT * FROM users WHERE id = {user_id}")   # INJECTION RISK!

✅ Correct — always parameterise:

cur.execute("SELECT * FROM users WHERE id = %s", (user_id,))   # ✓

Mistake 2 — Sharing a connection across threads

❌ Wrong — one global connection used by all request handlers:

global_conn = psycopg2.connect(...)   # module-level — shared across threads!
@app.get("/posts")
def get_posts(): global_conn.cursor()...   # race condition!

✅ Correct — use ThreadedConnectionPool or SQLAlchemy’s pool.

Mistake 3 — Forgetting to rollback after IntegrityError

❌ Wrong — next operation fails because session is in error state:

try:
    cur.execute("INSERT INTO users (email) VALUES (%s)", ("dup@email.com",))
    conn.commit()
except psycopg2.IntegrityError:
    pass   # no rollback! next operation will fail with "InFailedSqlTransaction"

✅ Correct:

except psycopg2.IntegrityError:
    conn.rollback()   # ✓ clears error state

Quick Reference

Task Code
Connect psycopg2.connect(dsn)
Execute query cur.execute(sql, (param1, param2))
Fetch all rows cur.fetchall()
Fetch one row cur.fetchone()
Dict results conn.cursor(cursor_factory=RealDictCursor)
Commit conn.commit()
Rollback conn.rollback()
Bulk insert psycopg2.extras.execute_values(cur, sql, rows)
Connection pool ThreadedConnectionPool(minconn, maxconn, dsn)

🧠 Test Yourself

After a psycopg2 IntegrityError (duplicate email), you catch the exception but do not call conn.rollback(). What happens when you try to execute the next query on the same connection?