Flattening Operators — switchMap, mergeMap, concatMap, exhaustMap

Flattening operators are the most powerful — and most misunderstood — operators in RxJS. They all do the same basic thing: take a value from an outer Observable, use it to create an inner Observable, and “flatten” the inner Observable’s emissions into the outer stream. The critical difference between them is how they handle concurrent inner Observables — what happens when a new outer value arrives while an inner Observable is still running. Getting this wrong produces duplicate requests, race conditions, and stale data in production. Getting it right makes your application rock-solid.

Flattening Operators Comparison

Operator On new outer value Concurrent inner Obs Order of results
switchMap Cancel current inner, start new Only latest — 1 at a time Latest only
mergeMap (=flatMap) Start new inner alongside existing Unlimited concurrent First-come first-served
concatMap Queue — start new only when current completes 1 at a time (queue) Guaranteed order
exhaustMap Ignore new while inner is running Only oldest — 1 at a time First wins

Choosing the Right Flattening Operator

Use Case Operator Reason
Search autocomplete switchMap Cancel previous request when user types a new query
Route change → load resource switchMap Cancel previous load when route changes
Upload multiple files mergeMap Upload all files simultaneously, don’t wait for each
Process tasks in order concatMap One at a time, preserve order, queue the rest
Submit form once exhaustMap Ignore duplicate clicks while submission is in-flight
Login button exhaustMap Prevent double login from impatient double-click
Audit log — sequential writes concatMap Preserve order — write N only after N-1 completes
Refresh data on interval switchMap Cancel pending request if interval fires again
Note: switchMap is the default choice for most Angular use cases because Angular applications are primarily user-driven: the user navigates to a page (triggering a load), types in a search (triggering a query), or selects a filter (triggering a reload). In all these cases, if the user triggers a new action before the previous one completes, you want to cancel the previous operation and start the new one — exactly what switchMap does. Reserve mergeMap, concatMap, and exhaustMap for the specific cases where their behaviour is required.
Tip: Always use exhaustMap for form submit buttons and login buttons. When the user double-clicks submit, exhaustMap ignores the second click while the first submission is in-flight. This prevents duplicate database records from impatient clicking. Using switchMap here would cancel the first request and send a second — potentially after the form data has been cleared. Using mergeMap would send both and create duplicates.
Warning: mergeMap without a concurrency limit creates an unbounded number of inner Observables — for a rapidly emitting source, this can create thousands of concurrent HTTP requests. When using mergeMap for bulk operations, always set the concurrency argument: mergeMap(item => process(item), 3) limits to 3 concurrent inner Observables. For file uploads, 3-5 concurrent uploads is typically a good balance between speed and server load.

Complete Flattening Operator Examples

import { fromEvent, from, interval, Subject, of } from 'rxjs';
import {
    switchMap, mergeMap, concatMap, exhaustMap,
    map, filter, tap, delay, catchError, finalize,
} from 'rxjs/operators';
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';

// ── switchMap — search / route-driven loading ─────────────────────────────

// Classic search autocomplete
this.searchControl.valueChanges.pipe(
    debounceTime(300),
    distinctUntilChanged(),
    // Each new query CANCELS the previous HTTP request
    switchMap(query => query.length >= 2
        ? this.taskService.search(query).pipe(
            catchError(() => of([]))  // catch inside inner — outer stream continues
          )
        : of([])
    ),
    takeUntilDestroyed(),
).subscribe(results => this.results.set(results));

// Route params → load data (cancel previous load on navigation)
this.route.params.pipe(
    map(p => p['id']),
    switchMap(id => this.taskService.getById(id).pipe(
        catchError(err => {
            this.error.set(err.message);
            return of(null);
        })
    )),
    filter(task => task !== null),
    takeUntilDestroyed(),
).subscribe(task => this.task.set(task!));

// Auto-refresh on interval — cancel pending if interval fires before response
interval(30000).pipe(
    switchMap(() => this.taskService.getAll()),
    takeUntilDestroyed(),
).subscribe(tasks => this.tasks.set(tasks));

// ── exhaustMap — prevent duplicate submissions ────────────────────────────

// Form submit button — ignore while submission in-flight
fromEvent(this.submitBtn.nativeElement, 'click').pipe(
    exhaustMap(() => {
        this.submitting.set(true);
        return this.taskService.create(this.formValue()).pipe(
            finalize(() => this.submitting.set(false)),
            catchError(err => {
                this.error.set(err.message);
                return of(null);
            }),
        );
    }),
    filter(result => result !== null),
    takeUntilDestroyed(),
).subscribe(task => {
    this.store.addTask(task!);
    this.toast.success('Task created');
    this.form.reset();
});

// Login button — exhaustMap prevents double login
this.loginButton$.pipe(
    exhaustMap(() =>
        this.authService.login(this.credentials()).pipe(
            catchError(err => {
                this.loginError.set(err.error?.message ?? 'Login failed');
                return of(null);
            }),
        )
    ),
    filter(user => user !== null),
    takeUntilDestroyed(),
).subscribe(user => {
    this.authStore.setUser(user!);
    this.router.navigate(['/tasks']);
});

// ── concatMap — ordered sequential operations ─────────────────────────────

// Process tasks one at a time in order (e.g. sequential API calls with dependencies)
from(taskIds).pipe(
    concatMap(id => this.taskService.getById(id).pipe(
        catchError(() => of(null))
    )),
    filter(task => task !== null),
).subscribe(task => this.processTask(task!));

// Undo queue — apply undo operations in the exact reverse order
this.undoQueue$.pipe(
    concatMap(operation => this.applyUndo(operation)),   // one at a time, in queue order
).subscribe(() => this.toast.info('Action undone'));

// ── mergeMap — parallel processing ───────────────────────────────────────

// Upload multiple files simultaneously (max 3 at once)
from(files).pipe(
    mergeMap(
        file => this.uploadService.upload(file).pipe(
            tap(progress => this.updateProgress(file.name, progress)),
            catchError(err => {
                this.setFileError(file.name, err.message);
                return of(null);
            }),
        ),
        3   // max 3 concurrent uploads
    ),
    filter(result => result !== null),
).subscribe(url => this.uploadedUrls.update(urls => [...urls, url!]));

// Batch delete — process all simultaneously
from(selectedIds).pipe(
    mergeMap(
        id => this.taskService.delete(id).pipe(catchError(() => of(null))),
        5   // 5 concurrent deletes
    ),
).subscribe(() => this.tasks.update(tasks => tasks.filter(t => !selectedIds.includes(t._id))));

How It Works

Step 1 — All Flattening Operators Subscribe to Inner Observables

Every flattening operator takes a “project function” — switchMap(id => http.get('/tasks/' + id)). When the outer Observable emits id, the operator calls the project function to get the inner Observable (http.get(...)), then subscribes to it. The operator is the connection between the outer (trigger) and inner (work) Observables. What distinguishes the four operators is entirely how they manage this subscription when a new outer value arrives while an inner is running.

Step 2 — switchMap Unsubscribes from the Previous Inner Observable

When switchMap receives a new outer value while an inner Observable is running, it unsubscribes from the current inner Observable (cancelling the HTTP request if it hasn’t completed) and subscribes to the new one. Only values from the latest inner Observable flow downstream. This “switch to the latest” behaviour is exactly right for user-driven queries — you never want an old search result to overwrite a newer one.

Step 3 — concatMap Queues Inner Observables

concatMap subscribes to inner Observables one at a time. If a new outer value arrives while the current inner is running, it does not start the new inner — it waits in a queue. Once the current inner completes, it dequeues the next one. This guarantees sequential, ordered execution. It is the slowest of the four operators (no parallelism) but the only one that guarantees both order and completeness.

Step 4 — exhaustMap Ignores New Outer Values During Inner Execution

exhaustMap subscribes to an inner Observable and ignores all outer emissions until the inner completes. It is the opposite of switchMap: instead of “latest wins,” it is “first wins.” For form submissions, this is ideal — the first click triggers the submission and subsequent clicks are ignored until the response arrives. The user cannot accidentally submit twice no matter how many times they click.

Step 5 — mergeMap Creates Unlimited Concurrent Inner Observables

mergeMap subscribes to a new inner Observable for every outer emission, without waiting for previous ones to complete and without cancelling them. Multiple inner Observables run simultaneously. The output stream contains values from all active inner Observables, interleaved in arrival order. The optional second argument limits concurrency: mergeMap(fn, 3) keeps at most 3 active inner Observables — new outer values wait until one inner completes before starting.

Common Mistakes

Mistake 1 — Using mergeMap for search (race condition)

❌ Wrong — multiple concurrent requests can arrive out of order:

search$.pipe(
    mergeMap(q => this.search(q))   // OLD result for 'ta' may arrive AFTER result for 'task'
)
// User sees stale results from earlier query overwriting newer ones!

✅ Correct — switchMap cancels previous request:

search$.pipe(
    switchMap(q => this.search(q))  // previous request cancelled — only latest results shown
)

Mistake 2 — Using switchMap for form submit (request may be cancelled)

❌ Wrong — double-click sends two requests, second cancels the first:

fromEvent(btn, 'click').pipe(
    switchMap(() => this.createTask(data))  // second click cancels first submission!
)
// First task may or may not have been created — data integrity at risk

✅ Correct — exhaustMap ignores clicks while in-flight:

fromEvent(btn, 'click').pipe(
    exhaustMap(() => this.createTask(data))  // only one submission at a time
)

Mistake 3 — mergeMap without concurrency limit for large batches

❌ Wrong — 500 concurrent HTTP requests for bulk operations:

from(500Items).pipe(
    mergeMap(item => this.process(item))  // 500 simultaneous HTTP requests!
)

✅ Correct — limit concurrency:

from(500Items).pipe(
    mergeMap(item => this.process(item), 5)  // max 5 concurrent requests
)

Quick Reference

Scenario Operator Behaviour
Search autocomplete switchMap Cancel on new query
Route-driven load switchMap Cancel on navigation
Form submit exhaustMap Ignore while in-flight
Login button exhaustMap Prevent double-submit
Parallel uploads mergeMap(fn, n) n concurrent
Sequential processing concatMap One at a time, ordered
Auto-refresh switchMap Cancel stale request
Undo queue concatMap Preserve order

🧠 Test Yourself

A save button emits when clicked. While the save request is pending, the user clicks again. Which operator correctly sends only one request and ignores the second click until the first completes?