The real value of decorators emerges when you apply them to real problems: measuring execution time, caching expensive results, retrying failed operations, and adding structured logging — all without cluttering the functions themselves with that logic. These cross-cutting concerns affect many functions but have nothing to do with each function’s core purpose. Extracting them into decorators keeps each function focused on its single responsibility and makes the cross-cutting behaviour easy to apply, test, and change globally. This lesson builds four production-quality decorators and shows how the standard library’s functools.lru_cache and functools.cache handle caching.
Timing Decorator
import functools
import time
import logging
logger = logging.getLogger(__name__)
def timed(func=None, *, label: str = None, log_level: int = logging.DEBUG):
"""Measure and log execution time. Works as @timed or @timed(label="...")."""
if func is None:
# Called with arguments: @timed(label="DB query")
return lambda f: timed(f, label=label, log_level=log_level)
# Called without arguments: @timed
name = label or func.__qualname__
@functools.wraps(func)
def wrapper(*args, **kwargs):
start = time.perf_counter()
try:
result = func(*args, **kwargs)
elapsed = time.perf_counter() - start
logger.log(log_level, f"{name} completed in {elapsed:.3f}s")
return result
except Exception:
elapsed = time.perf_counter() - start
logger.log(log_level, f"{name} FAILED after {elapsed:.3f}s")
raise
return wrapper
# Both forms work:
@timed
def quick_operation():
return sum(range(1_000_000))
@timed(label="Heavy database query", log_level=logging.INFO)
def get_all_posts(db):
return db.query(Post).all()
def timed(func=None, *, label=None) allows a decorator to work both with and without arguments. When called as @timed, Python passes the function as the first argument. When called as @timed(label="x")), Python calls timed(label="x") first (func=None), which returns the actual decorator. This is the cleanest way to make a decorator optionally accept arguments without the calling syntax looking inconsistent.time.perf_counter() rather than time.time() for measuring execution durations. perf_counter() uses the highest-resolution clock available on the system and is not affected by system clock adjustments (e.g. NTP sync). time.time() measures wall-clock time, which can jump forwards or backwards. perf_counter() is monotonic — it always increases — making it reliable for duration measurement.await func(*args, **kwargs) instead of func(*args, **kwargs). If you apply a synchronous wrapper to an async function, the wrapper calls the function, which returns a coroutine object (not the result). The coroutine never executes and the result is never returned. Always check asyncio.iscoroutinefunction(func) and provide the appropriate wrapper, as shown in Lesson 2.Caching Decorators
import functools
# ── Manual LRU cache ──────────────────────────────────────────────────────────
def lru_cache_manual(maxsize: int = 128):
def decorator(func):
cache = {}
access_order = [] # track LRU order
@functools.wraps(func)
def wrapper(*args): # only positional args — must be hashable
if args in cache:
access_order.remove(args)
access_order.append(args)
return cache[args]
result = func(*args)
cache[args] = result
access_order.append(args)
if len(cache) > maxsize:
oldest = access_order.pop(0)
del cache[oldest]
return result
return wrapper
return decorator
# ── Standard library: functools.lru_cache ────────────────────────────────────
from functools import lru_cache, cache
@lru_cache(maxsize=256) # cache up to 256 results
def expensive_calculation(n: int) -> int:
"""Fibonacci with memoisation."""
if n <= 1:
return n
return expensive_calculation(n - 1) + expensive_calculation(n - 2)
@cache # functools.cache — unbounded LRU cache (Python 3.9+)
def fetch_config(env: str) -> dict:
# Cached forever — suitable for immutable config
return load_settings(env)
# Check cache stats
print(expensive_calculation.cache_info())
# CacheInfo(hits=28, misses=10, maxsize=256, currsize=10)
expensive_calculation.cache_clear() # invalidate cache
# ── TTL cache for FastAPI (time-limited) ───────────────────────────────────────
def ttl_cache(seconds: int = 300):
"""Cache results for a fixed number of seconds."""
def decorator(func):
cache: dict = {}
@functools.wraps(func)
def wrapper(*args, **kwargs):
key = (args, tuple(sorted(kwargs.items())))
if key in cache:
result, expires = cache[key]
if time.time() < expires:
return result
del cache[key]
result = func(*args, **kwargs)
cache[key] = (result, time.time() + seconds)
return result
return wrapper
return decorator
@ttl_cache(seconds=60)
def get_trending_posts():
return db.query(Post).order_by(Post.view_count.desc()).limit(10).all()
Retry Decorator with Exponential Backoff
import functools
import time
import asyncio
import logging
from typing import Type
logger = logging.getLogger(__name__)
def retry(
max_attempts: int = 3,
delay: float = 1.0,
backoff: float = 2.0,
exceptions: tuple[Type[Exception], ...] = (Exception,),
):
"""Retry a function on failure with exponential backoff."""
def decorator(func):
@functools.wraps(func)
async def async_wrapper(*args, **kwargs):
current_delay = delay
last_exc = None
for attempt in range(1, max_attempts + 1):
try:
return await func(*args, **kwargs)
except exceptions as e:
last_exc = e
if attempt < max_attempts:
logger.warning(
f"{func.__name__} attempt {attempt}/{max_attempts} failed: {e}. "
f"Retrying in {current_delay:.1f}s..."
)
await asyncio.sleep(current_delay)
current_delay *= backoff
else:
logger.error(f"{func.__name__} failed after {max_attempts} attempts")
raise last_exc
@functools.wraps(func)
def sync_wrapper(*args, **kwargs):
current_delay = delay
last_exc = None
for attempt in range(1, max_attempts + 1):
try:
return func(*args, **kwargs)
except exceptions as e:
last_exc = e
if attempt < max_attempts:
time.sleep(current_delay)
current_delay *= backoff
raise last_exc
if asyncio.iscoroutinefunction(func):
return async_wrapper
return sync_wrapper
return decorator
# Usage
@retry(max_attempts=3, delay=1.0, backoff=2.0, exceptions=(httpx.RequestError,))
async def send_webhook(url: str, payload: dict) -> dict:
async with httpx.AsyncClient() as client:
response = await client.post(url, json=payload, timeout=10.0)
response.raise_for_status()
return response.json()
Common Mistakes
Mistake 1 — Using lru_cache with mutable arguments
❌ Wrong — lists are not hashable:
@lru_cache()
def process(items: list): # TypeError: unhashable type: 'list'
return sum(items)
✅ Correct — convert to tuple (hashable):
@lru_cache()
def process(items: tuple): # ✓ tuples are hashable
return sum(items)
# Or wrap the call:
result = process(tuple(my_list))
Mistake 2 — Caching database queries that return ORM objects
❌ Wrong — cached SQLAlchemy objects become detached from the session:
@lru_cache()
def get_user(user_id: int):
return db.query(User).get(user_id) # ORM object detached after session closes!
✅ Correct — cache serialisable data:
@lru_cache()
def get_user_dict(user_id: int) -> dict:
user = db.query(User).get(user_id)
return {"id": user.id, "name": user.name} # ✓ plain dict, no session needed
Mistake 3 — Retry on non-transient errors
❌ Wrong — retrying a 404 is pointless:
@retry(exceptions=(Exception,)) # retries everything including 404!
async def get_resource(id: int): ...
✅ Correct — only retry transient errors:
@retry(exceptions=(httpx.TimeoutException, httpx.ConnectError)) # ✓ network errors only
async def get_resource(id: int): ...
Quick Reference
| Decorator | Use Case | Key Notes |
|---|---|---|
@timed |
Measure execution time | Use perf_counter() |
@lru_cache(maxsize=N) |
Cache function results | Args must be hashable |
@cache |
Unbounded cache (3.9+) | Use for pure functions |
@ttl_cache(seconds=N) |
Time-limited cache | For data that changes |
@retry(...) |
Retry on transient errors | Only for transient exceptions |