psycopg2 is the most widely deployed PostgreSQL adapter for Python. It is a C extension that wraps the libpq client library, providing a DB-API 2.0 compliant interface. When you use standard synchronous SQLAlchemy in a FastAPI application (with plain def route handlers), psycopg2 is the driver underneath — SQLAlchemy translates your ORM calls to SQL and hands them to psycopg2 to execute. Understanding psycopg2 directly helps you debug connection errors, write efficient raw queries for complex operations the ORM struggles with, and tune connection pool settings correctly.
Connecting and Executing Queries
import psycopg2
import psycopg2.extras # for DictCursor
# ── Basic connection ──────────────────────────────────────────────────────────
conn = psycopg2.connect(
host = "localhost",
port = 5432,
dbname = "blog_dev",
user = "blog_app",
password = "your_password",
)
# Or use a connection string:
conn = psycopg2.connect("postgresql://blog_app:password@localhost:5432/blog_dev")
# ── Execute a query ───────────────────────────────────────────────────────────
cursor = conn.cursor()
# ALWAYS use parameterised queries — never string interpolation
cursor.execute(
"SELECT id, title, status FROM posts WHERE author_id = %s AND status = %s",
(1, "published"), # tuple of parameters — %s placeholders
)
# Fetch results
rows = cursor.fetchall() # list of tuples
one_row = cursor.fetchone() # single tuple or None
n_rows = cursor.fetchmany(10) # list of up to 10 tuples
# Access by index
for row in rows:
print(row[0], row[1], row[2]) # id, title, status
cursor.close()
conn.close()
%s as the parameter placeholder for all types — not ? like sqlite3 or :name like SQLAlchemy. The parameters are passed as a tuple (or list) as the second argument to cursor.execute(). psycopg2 handles the escaping and quoting — passing a Python string "O'Brien" is safe because psycopg2 properly escapes the single quote. Never format the SQL string yourself: f"WHERE id = {user_id}" is a SQL injection vulnerability.psycopg2.extras.DictCursor or RealDictCursor to get results as dictionaries instead of tuples. With DictCursor, rows behave like dicts: row["title"] instead of row[1]. With RealDictCursor, rows are real Python dicts (fully serialisable). Use RealDictCursor when you need to convert results to JSON or pass them to Pydantic. Create a cursor with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor).psycopg2.pool.ThreadedConnectionPool or, better, let SQLAlchemy manage the connection pool. Never create a single global psycopg2 connection at module level in a FastAPI application.DictCursor and Transaction Control
import psycopg2
import psycopg2.extras
conn = psycopg2.connect("postgresql://blog_app:password@localhost/blog_dev")
# ── DictCursor — access results by column name ────────────────────────────────
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute(
"SELECT id, title, view_count FROM posts WHERE status = %s LIMIT %s",
("published", 10)
)
posts = cur.fetchall() # list of RealDictRow (behaves like dict)
for post in posts:
print(post["id"], post["title"], post["view_count"])
# ── Transaction control ────────────────────────────────────────────────────────
# psycopg2 starts a transaction automatically on the first query
# Must commit or rollback explicitly
try:
with conn.cursor() as cur:
cur.execute(
"INSERT INTO users (email, name) VALUES (%s, %s) RETURNING id",
("alice@example.com", "Alice Smith")
)
user_id = cur.fetchone()[0]
cur.execute(
"INSERT INTO posts (author_id, title, slug, body) VALUES (%s, %s, %s, %s)",
(user_id, "First Post", "first-post", "Hello world")
)
conn.commit() # both INSERTs committed atomically
print(f"Created user {user_id}")
except psycopg2.IntegrityError as e:
conn.rollback() # must rollback before next operation after an error
print(f"Duplicate email or other constraint violation: {e}")
except Exception as e:
conn.rollback()
raise
finally:
conn.close()
Connection Pooling with ThreadedConnectionPool
import psycopg2.pool
from contextlib import contextmanager
# Create a pool (at module startup)
pool = psycopg2.pool.ThreadedConnectionPool(
minconn = 2,
maxconn = 10,
dsn = "postgresql://blog_app:password@localhost/blog_dev",
)
@contextmanager
def get_db_conn():
"""Context manager that checks out a connection and returns it to the pool."""
conn = pool.getconn()
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
pool.putconn(conn) # return to pool (not close!)
# Usage:
def get_published_posts(limit: int = 10) -> list[dict]:
with get_db_conn() as conn:
with conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) as cur:
cur.execute(
"SELECT id, title, view_count FROM posts "
"WHERE status = 'published' ORDER BY created_at DESC LIMIT %s",
(limit,)
)
return [dict(row) for row in cur.fetchall()]
# In FastAPI:
from fastapi import FastAPI
app = FastAPI()
@app.get("/posts")
def list_posts(limit: int = 10):
return get_published_posts(limit)
Executing Multiple Statements Efficiently
import psycopg2.extras
# ── executemany: insert multiple rows ─────────────────────────────────────────
# Slower than execute_values but simpler
with conn.cursor() as cur:
posts = [
("Title A", "author@example.com"),
("Title B", "author@example.com"),
]
cur.executemany(
"INSERT INTO posts (title, email) VALUES (%s, %s)",
posts
)
conn.commit()
# ── execute_values: fastest bulk insert ───────────────────────────────────────
# Generates a single multi-row VALUES statement
with conn.cursor() as cur:
posts = [(1, "Hello World", "hello-world"), (1, "FastAPI Guide", "fastapi-guide")]
psycopg2.extras.execute_values(
cur,
"INSERT INTO posts (author_id, title, slug) VALUES %s",
posts,
template="(%s, %s, %s)"
)
conn.commit()
# ── copy_expert: fastest bulk load ────────────────────────────────────────────
# Uses PostgreSQL's COPY protocol — fastest for millions of rows
import io, csv
buf = io.StringIO()
writer = csv.writer(buf)
for post in large_dataset:
writer.writerow([post["author_id"], post["title"], post["body"]])
buf.seek(0)
with conn.cursor() as cur:
cur.copy_expert(
"COPY posts (author_id, title, body) FROM STDIN WITH CSV",
buf
)
conn.commit()
Common Mistakes
Mistake 1 — String interpolation instead of parameterised queries
❌ Wrong — SQL injection vulnerability:
user_id = request.query_params["id"]
cur.execute(f"SELECT * FROM users WHERE id = {user_id}") # INJECTION RISK!
✅ Correct — always parameterise:
cur.execute("SELECT * FROM users WHERE id = %s", (user_id,)) # ✓
Mistake 2 — Sharing a connection across threads
❌ Wrong — one global connection used by all request handlers:
global_conn = psycopg2.connect(...) # module-level — shared across threads!
@app.get("/posts")
def get_posts(): global_conn.cursor()... # race condition!
✅ Correct — use ThreadedConnectionPool or SQLAlchemy’s pool.
Mistake 3 — Forgetting to rollback after IntegrityError
❌ Wrong — next operation fails because session is in error state:
try:
cur.execute("INSERT INTO users (email) VALUES (%s)", ("dup@email.com",))
conn.commit()
except psycopg2.IntegrityError:
pass # no rollback! next operation will fail with "InFailedSqlTransaction"
✅ Correct:
except psycopg2.IntegrityError:
conn.rollback() # ✓ clears error state
Quick Reference
| Task | Code |
|---|---|
| Connect | psycopg2.connect(dsn) |
| Execute query | cur.execute(sql, (param1, param2)) |
| Fetch all rows | cur.fetchall() |
| Fetch one row | cur.fetchone() |
| Dict results | conn.cursor(cursor_factory=RealDictCursor) |
| Commit | conn.commit() |
| Rollback | conn.rollback() |
| Bulk insert | psycopg2.extras.execute_values(cur, sql, rows) |
| Connection pool | ThreadedConnectionPool(minconn, maxconn, dsn) |