Some work triggered by an HTTP request should not block the response — sending a welcome email, resizing an image, updating analytics, or clearing a cache. FastAPI’s BackgroundTasks runs these tasks after the response has been sent to the client, in the same process and event loop. The client gets an immediate response while the work continues in the background. This is simpler than a full task queue (no Redis, no separate worker process) but has trade-offs: tasks are lost if the process crashes mid-execution, and they compete with request handling for CPU time.
BackgroundTasks Basics
from fastapi import FastAPI, BackgroundTasks, Depends
from sqlalchemy.orm import Session
import logging
app = FastAPI()
logger = logging.getLogger(__name__)
# ── Define background task functions (plain Python, not async required) ───────
def send_welcome_email(user_email: str, user_name: str) -> None:
"""Runs after the registration response has been sent."""
try:
# send_email is a blocking call — fine in a BackgroundTask
send_email(
to = user_email,
subject = "Welcome to StackLesson!",
body = f"Hi {user_name}, your account is ready.",
)
logger.info(f"Welcome email sent to {user_email}")
except Exception as e:
# Log but don't raise — the response was already sent
logger.error(f"Failed to send welcome email to {user_email}: {e}")
def update_post_view_analytics(post_id: int, ip: str) -> None:
"""Update view count and analytics without blocking the response."""
try:
# Use a new DB session — the request's session may be closed
with SessionLocal() as db:
db.execute(
update(Post).where(Post.id == post_id)
.values(view_count=Post.view_count + 1)
)
db.commit()
except Exception as e:
logger.error(f"Failed to update analytics for post {post_id}: {e}")
# ── Add tasks in route handlers ───────────────────────────────────────────────
@app.post("/auth/register", response_model=UserResponse, status_code=201)
def register(
data: RegisterRequest,
background: BackgroundTasks,
db: Session = Depends(get_db),
):
user = create_user(db, data)
# Schedule work to run AFTER the response is sent
background.add_task(send_welcome_email, user.email, user.name)
return user # response sent immediately; email sent after
@app.get("/posts/{post_id}")
def get_post(
post_id: int,
background: BackgroundTasks,
request: Request,
db: Session = Depends(get_db),
):
post = db.get(Post, post_id)
if not post:
raise HTTPException(404, "Not found")
# Fire-and-forget analytics
background.add_task(update_post_view_analytics, post_id, request.client.host)
return post
with SessionLocal() as db: inside the task function to create a fresh session with its own transaction. Do not pass the request’s db session to a background task, as it may already be committed, rolled back, or closed.BackgroundTasks are not durable — if the server process crashes, restarts, or is scaled down while a background task is in the queue, the task is lost. For tasks that must complete reliably (password reset emails, payment confirmations, important notifications), use a proper task queue with persistence: ARQ with Redis, Celery, or a managed service. BackgroundTasks are appropriate for best-effort work (view counters, analytics, cache warm-ups).Async Background Tasks
from fastapi import BackgroundTasks
import httpx
# Background tasks can also be async functions
async def notify_webhook(url: str, payload: dict) -> None:
"""Send a webhook notification asynchronously."""
try:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.post(url, json=payload)
response.raise_for_status()
except Exception as e:
logger.error(f"Webhook to {url} failed: {e}")
@app.post("/posts/{post_id}/publish")
async def publish_post(
post_id: int,
background: BackgroundTasks,
db: Session = Depends(get_db),
user: User = Depends(get_current_user),
):
post = db.get(Post, post_id)
if not post or post.author_id != user.id:
raise HTTPException(404, "Post not found")
post.status = "published"
post.published_at = func.now()
db.flush()
# Notify webhook subscribers asynchronously
background.add_task(
notify_webhook,
url = settings.webhook_url,
payload = {"event": "post.published", "post_id": post_id},
)
return post
BackgroundTasks vs Task Queue Decision
| Criteria | BackgroundTasks | ARQ / Celery |
|---|---|---|
| Setup complexity | None (built-in) | Redis + worker process |
| Durability | Lost on crash | Persisted in Redis/DB |
| Retries on failure | No | Yes, configurable |
| Scheduling | No | Yes (cron-style) |
| Monitoring | Logs only | Dashboards, metrics |
| Best for | Analytics, cache updates, non-critical notifications | Emails, payments, any critical async work |
Common Mistakes
Mistake 1 — Passing the request’s DB session to a background task
❌ Wrong — session may be closed by the time task runs:
background.add_task(update_analytics, post_id, db) # db may be closed!
✅ Correct — create a new session inside the task function.
Mistake 2 — Raising exceptions in background tasks
❌ Wrong — unhandled exception crashes the task silently:
def send_email(to: str):
smtp.send(to) # no try/except — fails silently on SMTP error
✅ Correct — always wrap in try/except and log failures.
Mistake 3 — Using BackgroundTasks for payment processing
❌ Wrong — payment lost on server crash:
background.add_task(charge_credit_card, order_id) # lost if server crashes!
✅ Correct — use a durable task queue (ARQ/Celery) for critical operations.
Quick Reference
| Task | Code |
|---|---|
| Declare dependency | background: BackgroundTasks parameter |
| Add task | background.add_task(fn, arg1, arg2) |
| Async task | Use async def function — FastAPI handles it |
| DB in background | Create new SessionLocal() inside the task |
| Error handling | Wrap task body in try/except, log errors |