Practical Decorators — Timing, Caching, Retry and Logging

The real value of decorators emerges when you apply them to real problems: measuring execution time, caching expensive results, retrying failed operations, and adding structured logging — all without cluttering the functions themselves with that logic. These cross-cutting concerns affect many functions but have nothing to do with each function’s core purpose. Extracting them into decorators keeps each function focused on its single responsibility and makes the cross-cutting behaviour easy to apply, test, and change globally. This lesson builds four production-quality decorators and shows how the standard library’s functools.lru_cache and functools.cache handle caching.

Timing Decorator

import functools
import time
import logging

logger = logging.getLogger(__name__)

def timed(func=None, *, label: str = None, log_level: int = logging.DEBUG):
    """Measure and log execution time. Works as @timed or @timed(label="...")."""
    if func is None:
        # Called with arguments: @timed(label="DB query")
        return lambda f: timed(f, label=label, log_level=log_level)

    # Called without arguments: @timed
    name = label or func.__qualname__

    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        start  = time.perf_counter()
        try:
            result = func(*args, **kwargs)
            elapsed = time.perf_counter() - start
            logger.log(log_level, f"{name} completed in {elapsed:.3f}s")
            return result
        except Exception:
            elapsed = time.perf_counter() - start
            logger.log(log_level, f"{name} FAILED after {elapsed:.3f}s")
            raise
    return wrapper

# Both forms work:
@timed
def quick_operation():
    return sum(range(1_000_000))

@timed(label="Heavy database query", log_level=logging.INFO)
def get_all_posts(db):
    return db.query(Post).all()
Note: The pattern def timed(func=None, *, label=None) allows a decorator to work both with and without arguments. When called as @timed, Python passes the function as the first argument. When called as @timed(label="x")), Python calls timed(label="x") first (func=None), which returns the actual decorator. This is the cleanest way to make a decorator optionally accept arguments without the calling syntax looking inconsistent.
Tip: Use time.perf_counter() rather than time.time() for measuring execution durations. perf_counter() uses the highest-resolution clock available on the system and is not affected by system clock adjustments (e.g. NTP sync). time.time() measures wall-clock time, which can jump forwards or backwards. perf_counter() is monotonic — it always increases — making it reliable for duration measurement.
Warning: Decorators that measure time on async functions must themselves be async — calling await func(*args, **kwargs) instead of func(*args, **kwargs). If you apply a synchronous wrapper to an async function, the wrapper calls the function, which returns a coroutine object (not the result). The coroutine never executes and the result is never returned. Always check asyncio.iscoroutinefunction(func) and provide the appropriate wrapper, as shown in Lesson 2.

Caching Decorators

import functools

# ── Manual LRU cache ──────────────────────────────────────────────────────────
def lru_cache_manual(maxsize: int = 128):
    def decorator(func):
        cache = {}
        access_order = []   # track LRU order

        @functools.wraps(func)
        def wrapper(*args):   # only positional args — must be hashable
            if args in cache:
                access_order.remove(args)
                access_order.append(args)
                return cache[args]

            result = func(*args)
            cache[args] = result
            access_order.append(args)

            if len(cache) > maxsize:
                oldest = access_order.pop(0)
                del cache[oldest]

            return result
        return wrapper
    return decorator

# ── Standard library: functools.lru_cache ────────────────────────────────────
from functools import lru_cache, cache

@lru_cache(maxsize=256)   # cache up to 256 results
def expensive_calculation(n: int) -> int:
    """Fibonacci with memoisation."""
    if n <= 1:
        return n
    return expensive_calculation(n - 1) + expensive_calculation(n - 2)

@cache   # functools.cache — unbounded LRU cache (Python 3.9+)
def fetch_config(env: str) -> dict:
    # Cached forever — suitable for immutable config
    return load_settings(env)

# Check cache stats
print(expensive_calculation.cache_info())
# CacheInfo(hits=28, misses=10, maxsize=256, currsize=10)

expensive_calculation.cache_clear()   # invalidate cache

# ── TTL cache for FastAPI (time-limited) ───────────────────────────────────────
def ttl_cache(seconds: int = 300):
    """Cache results for a fixed number of seconds."""
    def decorator(func):
        cache: dict = {}

        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            key = (args, tuple(sorted(kwargs.items())))
            if key in cache:
                result, expires = cache[key]
                if time.time() < expires:
                    return result
                del cache[key]

            result = func(*args, **kwargs)
            cache[key] = (result, time.time() + seconds)
            return result
        return wrapper
    return decorator

@ttl_cache(seconds=60)
def get_trending_posts():
    return db.query(Post).order_by(Post.view_count.desc()).limit(10).all()

Retry Decorator with Exponential Backoff

import functools
import time
import asyncio
import logging
from typing import Type

logger = logging.getLogger(__name__)

def retry(
    max_attempts: int = 3,
    delay: float = 1.0,
    backoff: float = 2.0,
    exceptions: tuple[Type[Exception], ...] = (Exception,),
):
    """Retry a function on failure with exponential backoff."""
    def decorator(func):
        @functools.wraps(func)
        async def async_wrapper(*args, **kwargs):
            current_delay = delay
            last_exc = None
            for attempt in range(1, max_attempts + 1):
                try:
                    return await func(*args, **kwargs)
                except exceptions as e:
                    last_exc = e
                    if attempt < max_attempts:
                        logger.warning(
                            f"{func.__name__} attempt {attempt}/{max_attempts} failed: {e}. "
                            f"Retrying in {current_delay:.1f}s..."
                        )
                        await asyncio.sleep(current_delay)
                        current_delay *= backoff
                    else:
                        logger.error(f"{func.__name__} failed after {max_attempts} attempts")
            raise last_exc

        @functools.wraps(func)
        def sync_wrapper(*args, **kwargs):
            current_delay = delay
            last_exc = None
            for attempt in range(1, max_attempts + 1):
                try:
                    return func(*args, **kwargs)
                except exceptions as e:
                    last_exc = e
                    if attempt < max_attempts:
                        time.sleep(current_delay)
                        current_delay *= backoff
            raise last_exc

        if asyncio.iscoroutinefunction(func):
            return async_wrapper
        return sync_wrapper
    return decorator

# Usage
@retry(max_attempts=3, delay=1.0, backoff=2.0, exceptions=(httpx.RequestError,))
async def send_webhook(url: str, payload: dict) -> dict:
    async with httpx.AsyncClient() as client:
        response = await client.post(url, json=payload, timeout=10.0)
        response.raise_for_status()
        return response.json()

Common Mistakes

Mistake 1 — Using lru_cache with mutable arguments

❌ Wrong — lists are not hashable:

@lru_cache()
def process(items: list):   # TypeError: unhashable type: 'list'
    return sum(items)

✅ Correct — convert to tuple (hashable):

@lru_cache()
def process(items: tuple):   # ✓ tuples are hashable
    return sum(items)

# Or wrap the call:
result = process(tuple(my_list))

Mistake 2 — Caching database queries that return ORM objects

❌ Wrong — cached SQLAlchemy objects become detached from the session:

@lru_cache()
def get_user(user_id: int):
    return db.query(User).get(user_id)   # ORM object detached after session closes!

✅ Correct — cache serialisable data:

@lru_cache()
def get_user_dict(user_id: int) -> dict:
    user = db.query(User).get(user_id)
    return {"id": user.id, "name": user.name}   # ✓ plain dict, no session needed

Mistake 3 — Retry on non-transient errors

❌ Wrong — retrying a 404 is pointless:

@retry(exceptions=(Exception,))   # retries everything including 404!
async def get_resource(id: int): ...

✅ Correct — only retry transient errors:

@retry(exceptions=(httpx.TimeoutException, httpx.ConnectError))   # ✓ network errors only
async def get_resource(id: int): ...

Quick Reference

Decorator Use Case Key Notes
@timed Measure execution time Use perf_counter()
@lru_cache(maxsize=N) Cache function results Args must be hashable
@cache Unbounded cache (3.9+) Use for pure functions
@ttl_cache(seconds=N) Time-limited cache For data that changes
@retry(...) Retry on transient errors Only for transient exceptions

🧠 Test Yourself

You apply @lru_cache() to a function that accepts a list of user IDs: def get_users(user_ids: list). What happens when you call it, and how do you fix it?