asyncpg — The Async PostgreSQL Driver

asyncpg is the high-performance async PostgreSQL driver for Python. Written in Cython, it communicates with PostgreSQL using the binary protocol directly (bypassing the text-based libpq), which makes it significantly faster than psycopg2 for data-intensive queries. asyncpg is the driver used by async SQLAlchemy (with the postgresql+asyncpg:// connection string) and by direct async database access in FastAPI async-def route handlers. Its API is different from psycopg2 — named parameters instead of %s, connection pools with async context managers, and prepared statements for repeated queries.

Connecting and Executing Queries

import asyncio
import asyncpg

async def main():
    # ── Single connection ─────────────────────────────────────────────────────
    conn = await asyncpg.connect(
        host     = "localhost",
        port     = 5432,
        database = "blog_dev",
        user     = "blog_app",
        password = "your_password",
    )
    # Or DSN string:
    conn = await asyncpg.connect("postgresql://blog_app:password@localhost/blog_dev")

    # ── Execute a query ───────────────────────────────────────────────────────
    # asyncpg uses $1, $2, $3 for positional parameters (not %s)
    rows = await conn.fetch(
        "SELECT id, title, view_count FROM posts WHERE author_id = $1 AND status = $2",
        1, "published"   # positional arguments, no tuple needed
    )

    # rows is a list of asyncpg.Record objects (dict-like)
    for row in rows:
        print(row["id"], row["title"], row["view_count"])

    # ── fetch, fetchrow, fetchval ─────────────────────────────────────────────
    rows    = await conn.fetch("SELECT * FROM posts LIMIT 10")       # list of Records
    one_row = await conn.fetchrow("SELECT * FROM posts WHERE id = $1", 42)  # Record or None
    count   = await conn.fetchval("SELECT COUNT(*) FROM posts")      # single scalar value

    await conn.close()

asyncio.run(main())
Note: asyncpg uses $1, $2, $3 as parameter placeholders — the same syntax as PostgreSQL’s internal prepared statement format. This is different from psycopg2’s %s. The parameters are passed as positional arguments after the SQL string, not in a tuple: conn.fetch(sql, param1, param2) — not conn.fetch(sql, (param1, param2)). This is a common source of errors when switching from psycopg2 to asyncpg.
Tip: asyncpg’s Record objects behave like read-only dicts — access values by column name (row["title"]) or by index (row[0]). They are memory-efficient and fast. To convert a Record to a mutable dict (for Pydantic or JSON serialisation), use dict(row). For a list of records: [dict(row) for row in rows]. This conversion is necessary when passing asyncpg results to Pydantic’s model_validate().
Warning: Never use asyncpg connection objects directly in FastAPI without a connection pool. Creating a new asyncpg connection per request takes ~5–10ms and does not scale. Always use asyncpg.create_pool() for production — the pool maintains a set of persistent connections and hands them out for each request. The pool is typically created in the FastAPI lifespan event handler (startup) and stored in app.state.pool.

Connection Pooling

import asyncpg
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request

# ── FastAPI lifespan: create pool on startup, close on shutdown ───────────────
@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup: create connection pool
    app.state.pool = await asyncpg.create_pool(
        dsn      = "postgresql://blog_app:password@localhost/blog_dev",
        min_size = 5,
        max_size = 20,
    )
    yield
    # Shutdown: close all connections
    await app.state.pool.close()

app = FastAPI(lifespan=lifespan)

# ── FastAPI dependency: check out a connection ────────────────────────────────
async def get_db(request: Request):
    async with request.app.state.pool.acquire() as conn:
        yield conn

# ── Route handler ─────────────────────────────────────────────────────────────
from fastapi import Depends
import asyncpg

@app.get("/posts")
async def list_posts(
    limit: int = 10,
    conn: asyncpg.Connection = Depends(get_db)
):
    rows = await conn.fetch(
        "SELECT id, title, view_count FROM posts WHERE status = $1 "
        "ORDER BY created_at DESC LIMIT $2",
        "published", limit
    )
    return [dict(row) for row in rows]

Transactions with asyncpg

import asyncpg

async def create_user_with_post(pool: asyncpg.Pool, email: str, title: str) -> dict:
    async with pool.acquire() as conn:
        # ── Transaction as async context manager ─────────────────────────────
        async with conn.transaction():
            # All operations in this block are one atomic transaction
            user_id = await conn.fetchval(
                "INSERT INTO users (email, name) VALUES ($1, $2) RETURNING id",
                email, email.split("@")[0]
            )
            post_id = await conn.fetchval(
                "INSERT INTO posts (author_id, title, slug, body) "
                "VALUES ($1, $2, $3, $4) RETURNING id",
                user_id, title, title.lower().replace(" ", "-"), ""
            )
            # If any statement raises: conn.transaction().__aexit__ calls ROLLBACK
        # If both succeed: COMMIT happens on context manager exit
        return {"user_id": user_id, "post_id": post_id}

# ── Savepoints ────────────────────────────────────────────────────────────────
async with conn.transaction():
    await conn.execute("UPDATE accounts SET balance = balance - $1 WHERE id = $2", 100, 1)

    async with conn.transaction():   # nested transaction = SAVEPOINT
        try:
            await conn.execute("UPDATE accounts SET balance = balance + $1 WHERE id = $2", 100, 2)
        except asyncpg.PostgresError:
            pass   # inner transaction rolled back to savepoint; outer continues

Prepared Statements

import asyncpg

async def high_frequency_queries(conn: asyncpg.Connection):
    # Prepare a statement once — PostgreSQL parses and plans it once
    # Subsequent executions reuse the plan (faster for repeated queries)
    stmt = await conn.prepare(
        "SELECT id, title FROM posts WHERE author_id = $1 AND status = $2"
    )

    # Execute the prepared statement many times
    for author_id in range(1, 100):
        rows = await stmt.fetch(author_id, "published")
        # No re-parsing or re-planning — fastest for repeated queries

    # asyncpg automatically uses prepared statements for conn.fetch() calls
    # after the first execution (cached internally per connection)

Common Mistakes

Mistake 1 — Using psycopg2-style %s placeholders with asyncpg

❌ Wrong — asyncpg uses $1, $2 not %s:

await conn.fetch("SELECT * FROM users WHERE id = %s", user_id)
# asyncpg raises: invalid placeholder syntax

✅ Correct:

await conn.fetch("SELECT * FROM users WHERE id = $1", user_id)   # ✓

Mistake 2 — Passing parameters as a tuple instead of positional args

❌ Wrong — psycopg2-style tuple:

await conn.fetch("SELECT * FROM posts WHERE id = $1", (42,))
# asyncpg treats the tuple as a single parameter!

✅ Correct — positional args:

await conn.fetch("SELECT * FROM posts WHERE id = $1", 42)   # ✓

Mistake 3 — Creating a new pool per request

❌ Wrong — pool created inside request handler:

@app.get("/posts")
async def get_posts():
    pool = await asyncpg.create_pool(dsn=...)   # new pool every request!
    ...

✅ Correct — create pool once at startup, reuse across requests:

# In lifespan: app.state.pool = await asyncpg.create_pool(...)
# In handler: async with request.app.state.pool.acquire() as conn: ...   ✓

Quick Reference

Task asyncpg Code
Connect await asyncpg.connect(dsn)
Create pool await asyncpg.create_pool(dsn, min_size=5, max_size=20)
Acquire connection async with pool.acquire() as conn:
Fetch all rows await conn.fetch(sql, param1, param2)
Fetch one row await conn.fetchrow(sql, param1)
Fetch scalar await conn.fetchval(sql, param1)
Execute (no return) await conn.execute(sql, param1)
Transaction async with conn.transaction():
Row to dict dict(row) or [dict(r) for r in rows]
Placeholder syntax $1, $2, $3

🧠 Test Yourself

When should you use asyncpg directly vs using SQLAlchemy with the asyncpg driver in a FastAPI application?