Advanced Async Patterns — Combinators, Retry, Generators, and Mutexes

JavaScript’s concurrency model is fundamentally single-threaded, yet modern applications must orchestrate dozens of simultaneous I/O operations — database queries, HTTP calls, cache lookups, file reads. Doing this efficiently requires moving beyond sequential await chains to true parallel execution with Promise.all, understanding why naive parallelism can overwhelm rate-limited resources, implementing retry and circuit breaker patterns for resilient async code, and composing complex async workflows with async generators and iterators. This lesson covers the advanced async patterns that separate correctly concurrent Node.js code from accidentally sequential code.

Promise Combinators Comparison

Combinator Resolves When Rejects When Use For
Promise.all([...]) ALL resolve ANY rejects (immediately) Parallel work that all must succeed
Promise.allSettled([...]) ALL settle (resolve or reject) Never Parallel work where partial failure is acceptable
Promise.race([...]) FIRST settles (resolve or reject) FIRST rejects Timeouts, fastest-wins cache vs network
Promise.any([...]) FIRST resolves ALL reject (AggregateError) Fallback chain — try multiple sources
Note: Promise.all() fails fast — if the third of ten promises rejects, the other seven are not cancelled (they keep running), but the result is immediately a rejection. If you need the results of all settled promises regardless of individual failures, use Promise.allSettled() which always resolves with an array of { status: 'fulfilled'|'rejected', value|reason } objects. This is the correct choice for bulk operations where you want to process successes and log failures separately.
Tip: Implement concurrency limiting for bulk operations. Promise.all(items.map(fn)) fires all promises simultaneously — with 10,000 items this means 10,000 concurrent database connections, overwhelming any connection pool. Use a semaphore or the p-limit package to cap concurrency: const limit = pLimit(10); await Promise.all(items.map(item => limit(() => processItem(item)))) runs at most 10 at a time, queuing the rest.
Warning: Async functions always return Promises — they never block. await pauses the current async function but not the event loop or any other concurrent async functions. A common mistake is assuming await slowOperation() prevents other requests from being processed — it does not. The event loop continues handling other events while this function awaits. If you need true serialisation (one request at a time), implement an explicit queue or mutex.

Complete Advanced Async Patterns

// ── Parallel vs sequential vs batched ────────────────────────────────────
const Task = require('../models/task.model');

// Sequential — each awaits the previous, total = sum of all durations
async function loadTasksSequential(ids) {
    const tasks = [];
    for (const id of ids) {
        tasks.push(await Task.findById(id));  // one at a time
    }
    return tasks;
}

// Parallel — all fire at once, total = max of all durations
async function loadTasksParallel(ids) {
    return Promise.all(ids.map(id => Task.findById(id)));  // all at once
}

// Batched — limited concurrency, respects connection pool
const pLimit = require('p-limit');
const limit  = pLimit(5);  // max 5 concurrent

async function loadTasksBatched(ids) {
    return Promise.all(ids.map(id => limit(() => Task.findById(id))));
}

// ── Promise.allSettled for partial failure tolerance ──────────────────────
async function bulkSendNotifications(userIds, message) {
    const results = await Promise.allSettled(
        userIds.map(id => sendNotification(id, message))
    );

    const succeeded = results.filter(r => r.status === 'fulfilled').length;
    const failed    = results.filter(r => r.status === 'rejected');

    failed.forEach(({ reason }, i) => {
        console.error(`Notification failed for user ${userIds[i]}: ${reason.message}`);
    });

    return { succeeded, failed: failed.length };
}

// ── Promise.race for timeout pattern ─────────────────────────────────────
function withTimeout(promise, ms, message = 'Operation timed out') {
    const timeout = new Promise((_, reject) =>
        setTimeout(() => reject(new Error(message)), ms)
    );
    return Promise.race([promise, timeout]);
}

// Fetch with 5-second timeout
const task = await withTimeout(
    Task.findById(taskId),
    5000,
    'Database query timed out'
);

// ── Promise.any for fastest-wins fallback ────────────────────────────────
async function getTaskWithFallback(taskId) {
    try {
        return await Promise.any([
            redis.get(`task:${taskId}`).then(JSON.parse),    // try cache first
            Task.findById(taskId).lean(),                     // or DB directly
        ]);
    } catch (err) {
        if (err instanceof AggregateError) {
            throw new Error('All sources failed');
        }
        throw err;
    }
}

// ── Retry with exponential backoff ────────────────────────────────────────
async function withRetry(fn, { retries = 3, baseDelay = 100, maxDelay = 5000 } = {}) {
    for (let attempt = 0; attempt <= retries; attempt++) {
        try {
            return await fn();
        } catch (err) {
            if (attempt === retries) throw err;   // final attempt failed

            const isRetryable = err.code === 'ECONNRESET' ||
                                 err.code === 'ETIMEDOUT'  ||
                                 err.status === 503;
            if (!isRetryable) throw err;          // non-retryable — fail fast

            const delay = Math.min(baseDelay * 2 ** attempt + Math.random() * 100, maxDelay);
            console.warn(`Attempt ${attempt + 1} failed, retrying in ${Math.round(delay)}ms`);
            await new Promise(resolve => setTimeout(resolve, delay));
        }
    }
}

// Usage: withRetry(() => fetch('/api/data'), { retries: 3, baseDelay: 200 })

// ── Async generators for paginated data streaming ─────────────────────────
async function* paginateTasks(userId, pageSize = 100) {
    let page = 1;
    let hasMore = true;

    while (hasMore) {
        const tasks = await Task.find({ user: userId })
            .skip((page - 1) * pageSize)
            .limit(pageSize)
            .lean();

        for (const task of tasks) {
            yield task;               // one task at a time — lazy evaluation
        }

        hasMore = tasks.length === pageSize;
        page++;
    }
}

// Consumer: process all tasks without loading them all into memory
async function exportUserTasks(userId, outputStream) {
    for await (const task of paginateTasks(userId)) {
        outputStream.write(JSON.stringify(task) + '\n');
    }
}

// ── Async mutex for critical sections ────────────────────────────────────
class AsyncMutex {
    constructor() { this._queue = []; this._locked = false; }

    acquire() {
        return new Promise(resolve => {
            if (!this._locked) {
                this._locked = true;
                resolve(() => this._release());
            } else {
                this._queue.push(resolve);
            }
        });
    }

    _release() {
        if (this._queue.length > 0) {
            const next = this._queue.shift();
            next(() => this._release());
        } else {
            this._locked = false;
        }
    }
}

const mutex = new AsyncMutex();

async function criticalSection(data) {
    const release = await mutex.acquire();
    try {
        // Only one execution at a time
        return await performAtomicUpdate(data);
    } finally {
        release();   // always release, even on error
    }
}

How It Works

Step 1 — Promise.all Fires All Promises Simultaneously

Promise.all([p1, p2, p3]) subscribes to all three promises at the moment it is called — not waiting for each to resolve before starting the next. All three start executing concurrently. The total time is approximately max(t1, t2, t3) rather than t1 + t2 + t3. This is the primary tool for converting sequential async code into parallel code and is the most common performance optimisation in Node.js applications.

Step 2 — Exponential Backoff Prevents Thundering Herd

When a service fails and many clients retry simultaneously at fixed intervals, they create a “thundering herd” — a spike of traffic exactly when the service is struggling to recover. Exponential backoff (baseDelay * 2^attempt) spreads retries out over time. Adding jitter (+ Math.random() * 100) desynchronises clients that started retrying simultaneously, preventing coordinated spikes. The maxDelay cap prevents waits from growing indefinitely.

Step 3 — Async Generators Enable Lazy Paginated Streams

An async generator function (async function*) can yield values asynchronously. The consumer (for await...of) pulls values on demand — the generator only advances when the consumer is ready. This enables processing millions of database records without loading them all into memory: one page is fetched, its items are yielded one by one, then the next page is fetched only when needed. Memory usage stays proportional to page size, not total record count.

Step 4 — Concurrency Limiting Respects Resource Constraints

pLimit(n) creates a limiting function that allows at most n concurrent executions. Additional calls queue until a slot opens. For database operations, the limit should match the connection pool size — running more concurrent queries than available connections just causes queueing inside the driver anyway, adding overhead without increasing throughput. For external APIs with rate limits, set the limit to stay under the allowed rate.

Step 5 — Async Mutex Serialises Access to Shared Resources

Node.js’s single-threaded event loop does not prevent race conditions in async code. Between two await points, other async functions can run. If two requests both read a counter (count = 5), both increment it (count = 6), and both write back (count = 6) — the second write clobbers the first. An async mutex serialises the read-modify-write cycle, ensuring each operation sees the result of the previous one.

Quick Reference

Pattern Code
Parallel (all must succeed) await Promise.all([fn1(), fn2(), fn3()])
Parallel (partial failure ok) await Promise.allSettled([...])
Timeout await Promise.race([operation(), timeoutPromise(ms)])
Fastest wins await Promise.any([cache(), database()])
Concurrency limit const limit = pLimit(10); Promise.all(items.map(i => limit(() => fn(i))))
Retry with backoff await withRetry(() => fn(), { retries: 3 })
Paginated stream async function* paginate() { while (...) { ... yield item; } }
Mutex const release = await mutex.acquire(); try { ... } finally { release(); }

🧠 Test Yourself

A route needs to fetch a user’s profile, their tasks, and their notifications simultaneously. Using await user(), then await tasks(), then await notifications() takes 300ms total. What does Promise.all([user(), tasks(), notifications()]) achieve and approximately how long does it take?