Task Queues — Durable Background Jobs with ARQ and Redis

ARQ (Async Redis Queue) is a lightweight async task queue built on Redis and asyncio, designed specifically for FastAPI/asyncio applications. Unlike Celery (which requires a separate broker setup and worker daemon), ARQ workers are async Python processes that process jobs from a Redis queue. Jobs are enqueued from FastAPI route handlers and executed by ARQ workers running in separate processes. If the application crashes, jobs remain in Redis and are retried when the worker restarts — providing the durability that FastAPI’s BackgroundTasks lacks.

ARQ Setup

pip install arq   # includes redis dependency
# app/tasks/worker.py — ARQ worker definition
from arq import create_pool
from arq.connections import RedisSettings

# ── Task functions: these run in the worker process ───────────────────────────
async def send_welcome_email(ctx: dict, user_id: int, email: str, name: str) -> None:
    """
    ctx is injected by ARQ and contains shared resources (db pool, etc.)
    Set up in on_startup and torn down in on_shutdown.
    """
    db = ctx["db_pool"]   # use connection from startup pool
    try:
        await _send_email(email, "Welcome!", f"Hi {name}!")
        print(f"Welcome email sent to {email}")
    except Exception as e:
        print(f"Email failed: {e}")
        raise   # re-raise to trigger ARQ retry logic

async def resize_post_image(ctx: dict, image_path: str, post_id: int) -> str:
    """Resize an uploaded post image in the background."""
    result_path = await _process_and_resize(image_path)
    # Update database with new image path
    async with ctx["db_pool"].acquire() as conn:
        await conn.execute(
            "UPDATE posts SET image_url = $1 WHERE id = $2",
            result_path, post_id
        )
    return result_path

# ── Worker startup/shutdown: runs once when worker starts ────────────────────
async def on_startup(ctx: dict) -> None:
    import asyncpg
    ctx["db_pool"] = await asyncpg.create_pool(
        dsn      = settings.database_url,
        min_size = 2,
        max_size = 5,
    )

async def on_shutdown(ctx: dict) -> None:
    await ctx["db_pool"].close()

# ── Worker configuration ──────────────────────────────────────────────────────
class WorkerSettings:
    functions       = [send_welcome_email, resize_post_image]
    on_startup      = on_startup
    on_shutdown     = on_shutdown
    redis_settings  = RedisSettings(host=settings.redis_host, port=6379)
    max_jobs        = 10              # concurrent jobs per worker
    job_timeout     = 300             # seconds before job is considered failed
    keep_result     = 3600            # keep result in Redis for 1 hour
    retry_jobs      = True
    max_tries       = 3               # retry failed jobs up to 3 times
Note: The ctx parameter passed to every ARQ task function is a shared context dictionary set up in on_startup and torn down in on_shutdown. Use it to store resources that should be created once per worker process — database connection pools, HTTP clients, Redis connections — rather than creating them fresh for each task. This is analogous to FastAPI’s lifespan context manager.
Tip: ARQ retries failed tasks (those that raise an exception) up to max_tries times with exponential backoff. Design idempotent tasks where possible — a task that can safely run multiple times without causing duplicate side effects. For example, use INSERT ... ON CONFLICT DO NOTHING for database writes, and check whether the email was already sent before sending again. This makes retries safe even if the task partially completed before failing.
Warning: ARQ requires Redis. Adding Redis to your infrastructure adds operational complexity — it needs to be deployed, monitored, and backed up. For applications that rarely need background processing, FastAPI’s BackgroundTasks may be sufficient. Choose ARQ (or Celery, or a managed service) when you have identified specific tasks that must survive process restarts and/or need retry logic. Do not add a task queue preemptively if you do not need its features yet.

Enqueueing Jobs from FastAPI

# app/main.py — set up ARQ pool during FastAPI startup
from contextlib import asynccontextmanager
from arq import create_pool
from arq.connections import RedisSettings

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Create ARQ Redis pool shared across all requests
    app.state.arq_pool = await create_pool(
        RedisSettings(host=settings.redis_host)
    )
    yield
    await app.state.arq_pool.close()

app = FastAPI(lifespan=lifespan)

# Dependency: inject ARQ pool into route handlers
def get_arq_pool(request: Request):
    return request.app.state.arq_pool

ArqPool = Annotated[object, Depends(get_arq_pool)]

# ── Enqueue jobs from route handlers ─────────────────────────────────────────
@router.post("/auth/register", response_model=UserResponse, status_code=201)
async def register(
    data:     RegisterRequest,
    db:       Session = Depends(get_db),
    arq_pool: ArqPool = None,
):
    user = create_user(db, data)

    # Enqueue a durable background job
    # Job persists in Redis — survives server crashes
    await arq_pool.enqueue_job(
        "send_welcome_email",   # matches the function name in WorkerSettings
        user.id,
        user.email,
        user.name,
        _job_id    = f"welcome:{user.id}",  # idempotent: same ID = dedup
        _defer_by  = 2,         # delay 2 seconds (let transaction commit first)
    )

    return user

# ── Run the worker (separate process) ────────────────────────────────────────
# python -m arq app.tasks.worker.WorkerSettings
# Or in Dockerfile: CMD ["python", "-m", "arq", "app.tasks.worker.WorkerSettings"]

Common Mistakes

Mistake 1 — Non-idempotent tasks without deduplication

❌ Wrong — multiple retries send multiple welcome emails:

await arq_pool.enqueue_job("send_welcome_email", user_id)
# Retry on failure → 3 welcome emails sent!

✅ Correct — use a stable job ID for deduplication:

await arq_pool.enqueue_job("send_welcome_email", user_id,
    _job_id=f"welcome:{user_id}")   # ✓ same ID skips duplicate enqueue

Mistake 2 — Sharing DB session between FastAPI and ARQ worker

❌ Wrong — worker cannot use FastAPI’s session (different process):

await arq_pool.enqueue_job("task", db_session)   # session not serialisable!

✅ Correct — pass only IDs, reconstruct DB access in the worker’s on_startup pool.

Mistake 3 — Not running the worker separately

❌ Wrong — ARQ jobs sit in Redis queue, never executed:

# Forgot to start: python -m arq app.tasks.worker.WorkerSettings

✅ Correct — always run at least one worker process alongside the FastAPI server.

Quick Reference

Task Code
Install pip install arq
Define task async def task(ctx: dict, arg1, arg2): ...
Create pool await create_pool(RedisSettings(host=...))
Enqueue job await pool.enqueue_job("task_name", arg1, arg2)
Deduplicate _job_id="unique-key" prevents duplicate enqueue
Defer execution _defer_by=5 (seconds) or _defer_until=datetime
Run worker python -m arq app.tasks.worker.WorkerSettings

🧠 Test Yourself

A user registration enqueues a “send_welcome_email” job. The ARQ worker crashes after dequeuing the job but before the email is sent. What happens when the worker restarts?