asyncio Tasks, gather() and Concurrent Execution

Sequential await calls run one after the other โ€” each completes before the next starts. For making multiple independent I/O calls (fetching data from several APIs, running several database queries), running them concurrently reduces total time from the sum of all durations to approximately the duration of the slowest one. asyncio.gather() and asyncio.create_task() are the two primary tools for concurrent execution. Understanding when and how to use them is one of the most impactful async patterns for FastAPI โ€” particularly for route handlers that need data from multiple sources.

asyncio.gather() โ€” Run Coroutines Concurrently

import asyncio
import httpx

async def fetch(client: httpx.AsyncClient, url: str) -> dict:
    response = await client.get(url, timeout=10.0)
    response.raise_for_status()
    return response.json()

async def main():
    base = "https://jsonplaceholder.typicode.com"

    async with httpx.AsyncClient() as client:
        # โ”€โ”€ Sequential โ€” total โ‰ˆ sum of each request time โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
        user  = await fetch(client, f"{base}/users/1")       # wait
        posts = await fetch(client, f"{base}/posts?userId=1") # then wait
        todos = await fetch(client, f"{base}/todos?userId=1") # then wait
        # Total โ‰ˆ 300ms (if each takes 100ms)

        # โ”€โ”€ Concurrent โ€” total โ‰ˆ slowest single request โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
        user, posts, todos = await asyncio.gather(
            fetch(client, f"{base}/users/1"),
            fetch(client, f"{base}/posts?userId=1"),
            fetch(client, f"{base}/todos?userId=1"),
        )
        # Total โ‰ˆ 100ms โ€” all three run simultaneously!

asyncio.run(main())
Note: asyncio.gather() returns a list of results in the same order as the input coroutines โ€” regardless of which completes first. If coro3 finishes before coro1, the result list is still [result1, result2, result3]. This ordering guarantee makes it safe to unpack: user, posts, todos = await asyncio.gather(...). By default, if any coroutine raises an exception, gather() cancels the remaining coroutines and re-raises the exception.
Tip: Use asyncio.gather(*coroutines, return_exceptions=True) when you want all coroutines to complete even if some fail. With return_exceptions=True, exceptions are returned as values in the results list instead of being re-raised. You can then inspect each result: if isinstance(result, Exception): handle_error(result). This is the right approach for fanout operations where partial success is acceptable (e.g., sending webhooks to multiple endpoints).
Warning: Gathering too many concurrent coroutines simultaneously can overwhelm external services, hit API rate limits, or exhaust database connection pools. If you have 1000 URLs to fetch, gathering all 1000 at once is likely to fail. Use a semaphore to limit concurrency: async with asyncio.Semaphore(10): await fetch(url) allows at most 10 concurrent fetches at any time. This pattern is called bounded concurrency.

asyncio.create_task() โ€” Background Tasks

import asyncio

async def slow_operation(name: str, delay: float) -> str:
    await asyncio.sleep(delay)
    return f"{name} done after {delay}s"

async def main():
    # create_task() schedules a coroutine to run "in the background"
    # Returns a Task object immediately โ€” the coroutine starts running concurrently
    task1 = asyncio.create_task(slow_operation("A", 1.0))
    task2 = asyncio.create_task(slow_operation("B", 0.5))
    task3 = asyncio.create_task(slow_operation("C", 1.5))

    # Do other work while tasks run in background...
    print("Tasks started โ€” doing other work")
    await asyncio.sleep(0)   # yield to let tasks start

    # Wait for all tasks to complete
    result1 = await task1   # 1.0s from start
    result2 = await task2   # already done (only 0.5s)
    result3 = await task3   # 1.5s from start
    # Total โ‰ˆ 1.5s (not 3.0s)

    print(result1, result2, result3)

# โ”€โ”€ gather vs create_task โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
# gather() is simpler when you have all coroutines up front
# create_task() gives more control: cancel, add callbacks, check status

async def with_tasks():
    task = asyncio.create_task(slow_operation("X", 2.0))

    # Check task status
    print(task.done())     # False โ€” still running
    print(task.cancelled())  # False

    # Cancel a task
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        print("Task was cancelled")

asyncio.run(main())

Bounded Concurrency with Semaphore

import asyncio
import httpx

async def fetch_with_semaphore(
    client: httpx.AsyncClient,
    url: str,
    semaphore: asyncio.Semaphore,
) -> dict:
    async with semaphore:   # blocks if 10 coroutines already running
        response = await client.get(url, timeout=30.0)
        return {"url": url, "status": response.status_code, "data": response.json()}

async def fetch_all_bounded(urls: list[str], max_concurrent: int = 10) -> list[dict]:
    """Fetch all URLs concurrently but with a maximum of max_concurrent at once."""
    semaphore = asyncio.Semaphore(max_concurrent)

    async with httpx.AsyncClient() as client:
        tasks = [
            asyncio.create_task(fetch_with_semaphore(client, url, semaphore))
            for url in urls
        ]
        results = await asyncio.gather(*tasks, return_exceptions=True)

    return [r for r in results if not isinstance(r, Exception)]

# Process 100 URLs, at most 10 at once
urls = [f"https://api.example.com/posts/{i}" for i in range(1, 101)]
results = asyncio.run(fetch_all_bounded(urls, max_concurrent=10))

asyncio.wait() โ€” Fine-Grained Control

import asyncio

async def main():
    tasks = [
        asyncio.create_task(slow_operation("A", 1.0)),
        asyncio.create_task(slow_operation("B", 0.5)),
        asyncio.create_task(slow_operation("C", 2.0)),
    ]

    # Wait until ALL tasks complete
    done, pending = await asyncio.wait(tasks)
    for task in done:
        print(await task)

    # Wait until FIRST task completes (return_when=FIRST_COMPLETED)
    tasks2 = [asyncio.create_task(slow_operation(f"T{i}", i)) for i in range(1, 4)]
    done, pending = await asyncio.wait(tasks2, return_when=asyncio.FIRST_COMPLETED)
    # Cancel remaining
    for task in pending:
        task.cancel()
    for task in done:
        print(f"First completed: {task.result()}")

Common Mistakes

Mistake 1 โ€” Gathering without error handling

โŒ Wrong โ€” one failure cancels all:

results = await asyncio.gather(fetch(url1), fetch(url2), fetch(url3))
# If fetch(url2) raises โ€” all three are cancelled and exception propagates

โœ… Correct โ€” handle partial failures:

results = await asyncio.gather(fetch(url1), fetch(url2), fetch(url3),
                                return_exceptions=True)
good = [r for r in results if not isinstance(r, Exception)]   # โœ“

Mistake 2 โ€” Unbounded concurrency overwhelming an API

โŒ Wrong โ€” 1000 concurrent requests at once:

await asyncio.gather(*[fetch(url) for url in urls])   # 1000 concurrent โ€” rate limited!

โœ… Correct โ€” use a semaphore:

sem = asyncio.Semaphore(10)
await asyncio.gather(*[fetch_with_semaphore(url, sem) for url in urls])   # โœ“

Mistake 3 โ€” Awaiting tasks after gather โ€” double wait

โŒ Wrong โ€” gather already awaited them:

tasks = [asyncio.create_task(f()) for f in funcs]
results = await asyncio.gather(*tasks)
for task in tasks:
    r = await task   # task is already done โ€” redundant but works for completed tasks

โœ… Correct โ€” use gather’s results directly:

results = await asyncio.gather(*[f() for f in funcs])   # โœ“ results already available

Quick Reference

Pattern Code Use When
Concurrent all await asyncio.gather(c1, c2, c3) All coroutines known up front
Ignore failures gather(..., return_exceptions=True) Partial success acceptable
Background task task = asyncio.create_task(coro()) Fire and forget / cancel later
Limit concurrency async with asyncio.Semaphore(N): Rate limiting, connection pools
First completed asyncio.wait(tasks, return_when=FIRST_COMPLETED) Race to first response
Cancel a task task.cancel() Timeout, user abort

🧠 Test Yourself

A FastAPI route handler needs data from three independent database queries, each taking 100ms. What is the total response time if you use asyncio.gather() with three async queries vs sequential await calls?