Sequential await calls run one after the other โ each completes before the next starts. For making multiple independent I/O calls (fetching data from several APIs, running several database queries), running them concurrently reduces total time from the sum of all durations to approximately the duration of the slowest one. asyncio.gather() and asyncio.create_task() are the two primary tools for concurrent execution. Understanding when and how to use them is one of the most impactful async patterns for FastAPI โ particularly for route handlers that need data from multiple sources.
asyncio.gather() โ Run Coroutines Concurrently
import asyncio
import httpx
async def fetch(client: httpx.AsyncClient, url: str) -> dict:
response = await client.get(url, timeout=10.0)
response.raise_for_status()
return response.json()
async def main():
base = "https://jsonplaceholder.typicode.com"
async with httpx.AsyncClient() as client:
# โโ Sequential โ total โ sum of each request time โโโโโโโโโโโโโโโโโโโโโโ
user = await fetch(client, f"{base}/users/1") # wait
posts = await fetch(client, f"{base}/posts?userId=1") # then wait
todos = await fetch(client, f"{base}/todos?userId=1") # then wait
# Total โ 300ms (if each takes 100ms)
# โโ Concurrent โ total โ slowest single request โโโโโโโโโโโโโโโโโโโโโโโโ
user, posts, todos = await asyncio.gather(
fetch(client, f"{base}/users/1"),
fetch(client, f"{base}/posts?userId=1"),
fetch(client, f"{base}/todos?userId=1"),
)
# Total โ 100ms โ all three run simultaneously!
asyncio.run(main())
asyncio.gather() returns a list of results in the same order as the input coroutines โ regardless of which completes first. If coro3 finishes before coro1, the result list is still [result1, result2, result3]. This ordering guarantee makes it safe to unpack: user, posts, todos = await asyncio.gather(...). By default, if any coroutine raises an exception, gather() cancels the remaining coroutines and re-raises the exception.asyncio.gather(*coroutines, return_exceptions=True) when you want all coroutines to complete even if some fail. With return_exceptions=True, exceptions are returned as values in the results list instead of being re-raised. You can then inspect each result: if isinstance(result, Exception): handle_error(result). This is the right approach for fanout operations where partial success is acceptable (e.g., sending webhooks to multiple endpoints).async with asyncio.Semaphore(10): await fetch(url) allows at most 10 concurrent fetches at any time. This pattern is called bounded concurrency.asyncio.create_task() โ Background Tasks
import asyncio
async def slow_operation(name: str, delay: float) -> str:
await asyncio.sleep(delay)
return f"{name} done after {delay}s"
async def main():
# create_task() schedules a coroutine to run "in the background"
# Returns a Task object immediately โ the coroutine starts running concurrently
task1 = asyncio.create_task(slow_operation("A", 1.0))
task2 = asyncio.create_task(slow_operation("B", 0.5))
task3 = asyncio.create_task(slow_operation("C", 1.5))
# Do other work while tasks run in background...
print("Tasks started โ doing other work")
await asyncio.sleep(0) # yield to let tasks start
# Wait for all tasks to complete
result1 = await task1 # 1.0s from start
result2 = await task2 # already done (only 0.5s)
result3 = await task3 # 1.5s from start
# Total โ 1.5s (not 3.0s)
print(result1, result2, result3)
# โโ gather vs create_task โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
# gather() is simpler when you have all coroutines up front
# create_task() gives more control: cancel, add callbacks, check status
async def with_tasks():
task = asyncio.create_task(slow_operation("X", 2.0))
# Check task status
print(task.done()) # False โ still running
print(task.cancelled()) # False
# Cancel a task
task.cancel()
try:
await task
except asyncio.CancelledError:
print("Task was cancelled")
asyncio.run(main())
Bounded Concurrency with Semaphore
import asyncio
import httpx
async def fetch_with_semaphore(
client: httpx.AsyncClient,
url: str,
semaphore: asyncio.Semaphore,
) -> dict:
async with semaphore: # blocks if 10 coroutines already running
response = await client.get(url, timeout=30.0)
return {"url": url, "status": response.status_code, "data": response.json()}
async def fetch_all_bounded(urls: list[str], max_concurrent: int = 10) -> list[dict]:
"""Fetch all URLs concurrently but with a maximum of max_concurrent at once."""
semaphore = asyncio.Semaphore(max_concurrent)
async with httpx.AsyncClient() as client:
tasks = [
asyncio.create_task(fetch_with_semaphore(client, url, semaphore))
for url in urls
]
results = await asyncio.gather(*tasks, return_exceptions=True)
return [r for r in results if not isinstance(r, Exception)]
# Process 100 URLs, at most 10 at once
urls = [f"https://api.example.com/posts/{i}" for i in range(1, 101)]
results = asyncio.run(fetch_all_bounded(urls, max_concurrent=10))
asyncio.wait() โ Fine-Grained Control
import asyncio
async def main():
tasks = [
asyncio.create_task(slow_operation("A", 1.0)),
asyncio.create_task(slow_operation("B", 0.5)),
asyncio.create_task(slow_operation("C", 2.0)),
]
# Wait until ALL tasks complete
done, pending = await asyncio.wait(tasks)
for task in done:
print(await task)
# Wait until FIRST task completes (return_when=FIRST_COMPLETED)
tasks2 = [asyncio.create_task(slow_operation(f"T{i}", i)) for i in range(1, 4)]
done, pending = await asyncio.wait(tasks2, return_when=asyncio.FIRST_COMPLETED)
# Cancel remaining
for task in pending:
task.cancel()
for task in done:
print(f"First completed: {task.result()}")
Common Mistakes
Mistake 1 โ Gathering without error handling
โ Wrong โ one failure cancels all:
results = await asyncio.gather(fetch(url1), fetch(url2), fetch(url3))
# If fetch(url2) raises โ all three are cancelled and exception propagates
โ Correct โ handle partial failures:
results = await asyncio.gather(fetch(url1), fetch(url2), fetch(url3),
return_exceptions=True)
good = [r for r in results if not isinstance(r, Exception)] # โ
Mistake 2 โ Unbounded concurrency overwhelming an API
โ Wrong โ 1000 concurrent requests at once:
await asyncio.gather(*[fetch(url) for url in urls]) # 1000 concurrent โ rate limited!
โ Correct โ use a semaphore:
sem = asyncio.Semaphore(10)
await asyncio.gather(*[fetch_with_semaphore(url, sem) for url in urls]) # โ
Mistake 3 โ Awaiting tasks after gather โ double wait
โ Wrong โ gather already awaited them:
tasks = [asyncio.create_task(f()) for f in funcs]
results = await asyncio.gather(*tasks)
for task in tasks:
r = await task # task is already done โ redundant but works for completed tasks
โ Correct โ use gather’s results directly:
results = await asyncio.gather(*[f() for f in funcs]) # โ results already available
Quick Reference
| Pattern | Code | Use When |
|---|---|---|
| Concurrent all | await asyncio.gather(c1, c2, c3) |
All coroutines known up front |
| Ignore failures | gather(..., return_exceptions=True) |
Partial success acceptable |
| Background task | task = asyncio.create_task(coro()) |
Fire and forget / cancel later |
| Limit concurrency | async with asyncio.Semaphore(N): |
Rate limiting, connection pools |
| First completed | asyncio.wait(tasks, return_when=FIRST_COMPLETED) |
Race to first response |
| Cancel a task | task.cancel() |
Timeout, user abort |