Flattening operators are the most powerful — and most misunderstood — operators in RxJS. They all do the same basic thing: take a value from an outer Observable, use it to create an inner Observable, and “flatten” the inner Observable’s emissions into the outer stream. The critical difference between them is how they handle concurrent inner Observables — what happens when a new outer value arrives while an inner Observable is still running. Getting this wrong produces duplicate requests, race conditions, and stale data in production. Getting it right makes your application rock-solid.
Flattening Operators Comparison
| Operator | On new outer value | Concurrent inner Obs | Order of results |
|---|---|---|---|
switchMap |
Cancel current inner, start new | Only latest — 1 at a time | Latest only |
mergeMap (=flatMap) |
Start new inner alongside existing | Unlimited concurrent | First-come first-served |
concatMap |
Queue — start new only when current completes | 1 at a time (queue) | Guaranteed order |
exhaustMap |
Ignore new while inner is running | Only oldest — 1 at a time | First wins |
Choosing the Right Flattening Operator
| Use Case | Operator | Reason |
|---|---|---|
| Search autocomplete | switchMap |
Cancel previous request when user types a new query |
| Route change → load resource | switchMap |
Cancel previous load when route changes |
| Upload multiple files | mergeMap |
Upload all files simultaneously, don’t wait for each |
| Process tasks in order | concatMap |
One at a time, preserve order, queue the rest |
| Submit form once | exhaustMap |
Ignore duplicate clicks while submission is in-flight |
| Login button | exhaustMap |
Prevent double login from impatient double-click |
| Audit log — sequential writes | concatMap |
Preserve order — write N only after N-1 completes |
| Refresh data on interval | switchMap |
Cancel pending request if interval fires again |
switchMap is the default choice for most Angular use cases because Angular applications are primarily user-driven: the user navigates to a page (triggering a load), types in a search (triggering a query), or selects a filter (triggering a reload). In all these cases, if the user triggers a new action before the previous one completes, you want to cancel the previous operation and start the new one — exactly what switchMap does. Reserve mergeMap, concatMap, and exhaustMap for the specific cases where their behaviour is required.exhaustMap for form submit buttons and login buttons. When the user double-clicks submit, exhaustMap ignores the second click while the first submission is in-flight. This prevents duplicate database records from impatient clicking. Using switchMap here would cancel the first request and send a second — potentially after the form data has been cleared. Using mergeMap would send both and create duplicates.mergeMap without a concurrency limit creates an unbounded number of inner Observables — for a rapidly emitting source, this can create thousands of concurrent HTTP requests. When using mergeMap for bulk operations, always set the concurrency argument: mergeMap(item => process(item), 3) limits to 3 concurrent inner Observables. For file uploads, 3-5 concurrent uploads is typically a good balance between speed and server load.Complete Flattening Operator Examples
import { fromEvent, from, interval, Subject, of } from 'rxjs';
import {
switchMap, mergeMap, concatMap, exhaustMap,
map, filter, tap, delay, catchError, finalize,
} from 'rxjs/operators';
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
// ── switchMap — search / route-driven loading ─────────────────────────────
// Classic search autocomplete
this.searchControl.valueChanges.pipe(
debounceTime(300),
distinctUntilChanged(),
// Each new query CANCELS the previous HTTP request
switchMap(query => query.length >= 2
? this.taskService.search(query).pipe(
catchError(() => of([])) // catch inside inner — outer stream continues
)
: of([])
),
takeUntilDestroyed(),
).subscribe(results => this.results.set(results));
// Route params → load data (cancel previous load on navigation)
this.route.params.pipe(
map(p => p['id']),
switchMap(id => this.taskService.getById(id).pipe(
catchError(err => {
this.error.set(err.message);
return of(null);
})
)),
filter(task => task !== null),
takeUntilDestroyed(),
).subscribe(task => this.task.set(task!));
// Auto-refresh on interval — cancel pending if interval fires before response
interval(30000).pipe(
switchMap(() => this.taskService.getAll()),
takeUntilDestroyed(),
).subscribe(tasks => this.tasks.set(tasks));
// ── exhaustMap — prevent duplicate submissions ────────────────────────────
// Form submit button — ignore while submission in-flight
fromEvent(this.submitBtn.nativeElement, 'click').pipe(
exhaustMap(() => {
this.submitting.set(true);
return this.taskService.create(this.formValue()).pipe(
finalize(() => this.submitting.set(false)),
catchError(err => {
this.error.set(err.message);
return of(null);
}),
);
}),
filter(result => result !== null),
takeUntilDestroyed(),
).subscribe(task => {
this.store.addTask(task!);
this.toast.success('Task created');
this.form.reset();
});
// Login button — exhaustMap prevents double login
this.loginButton$.pipe(
exhaustMap(() =>
this.authService.login(this.credentials()).pipe(
catchError(err => {
this.loginError.set(err.error?.message ?? 'Login failed');
return of(null);
}),
)
),
filter(user => user !== null),
takeUntilDestroyed(),
).subscribe(user => {
this.authStore.setUser(user!);
this.router.navigate(['/tasks']);
});
// ── concatMap — ordered sequential operations ─────────────────────────────
// Process tasks one at a time in order (e.g. sequential API calls with dependencies)
from(taskIds).pipe(
concatMap(id => this.taskService.getById(id).pipe(
catchError(() => of(null))
)),
filter(task => task !== null),
).subscribe(task => this.processTask(task!));
// Undo queue — apply undo operations in the exact reverse order
this.undoQueue$.pipe(
concatMap(operation => this.applyUndo(operation)), // one at a time, in queue order
).subscribe(() => this.toast.info('Action undone'));
// ── mergeMap — parallel processing ───────────────────────────────────────
// Upload multiple files simultaneously (max 3 at once)
from(files).pipe(
mergeMap(
file => this.uploadService.upload(file).pipe(
tap(progress => this.updateProgress(file.name, progress)),
catchError(err => {
this.setFileError(file.name, err.message);
return of(null);
}),
),
3 // max 3 concurrent uploads
),
filter(result => result !== null),
).subscribe(url => this.uploadedUrls.update(urls => [...urls, url!]));
// Batch delete — process all simultaneously
from(selectedIds).pipe(
mergeMap(
id => this.taskService.delete(id).pipe(catchError(() => of(null))),
5 // 5 concurrent deletes
),
).subscribe(() => this.tasks.update(tasks => tasks.filter(t => !selectedIds.includes(t._id))));
How It Works
Step 1 — All Flattening Operators Subscribe to Inner Observables
Every flattening operator takes a “project function” — switchMap(id => http.get('/tasks/' + id)). When the outer Observable emits id, the operator calls the project function to get the inner Observable (http.get(...)), then subscribes to it. The operator is the connection between the outer (trigger) and inner (work) Observables. What distinguishes the four operators is entirely how they manage this subscription when a new outer value arrives while an inner is running.
Step 2 — switchMap Unsubscribes from the Previous Inner Observable
When switchMap receives a new outer value while an inner Observable is running, it unsubscribes from the current inner Observable (cancelling the HTTP request if it hasn’t completed) and subscribes to the new one. Only values from the latest inner Observable flow downstream. This “switch to the latest” behaviour is exactly right for user-driven queries — you never want an old search result to overwrite a newer one.
Step 3 — concatMap Queues Inner Observables
concatMap subscribes to inner Observables one at a time. If a new outer value arrives while the current inner is running, it does not start the new inner — it waits in a queue. Once the current inner completes, it dequeues the next one. This guarantees sequential, ordered execution. It is the slowest of the four operators (no parallelism) but the only one that guarantees both order and completeness.
Step 4 — exhaustMap Ignores New Outer Values During Inner Execution
exhaustMap subscribes to an inner Observable and ignores all outer emissions until the inner completes. It is the opposite of switchMap: instead of “latest wins,” it is “first wins.” For form submissions, this is ideal — the first click triggers the submission and subsequent clicks are ignored until the response arrives. The user cannot accidentally submit twice no matter how many times they click.
Step 5 — mergeMap Creates Unlimited Concurrent Inner Observables
mergeMap subscribes to a new inner Observable for every outer emission, without waiting for previous ones to complete and without cancelling them. Multiple inner Observables run simultaneously. The output stream contains values from all active inner Observables, interleaved in arrival order. The optional second argument limits concurrency: mergeMap(fn, 3) keeps at most 3 active inner Observables — new outer values wait until one inner completes before starting.
Common Mistakes
Mistake 1 — Using mergeMap for search (race condition)
❌ Wrong — multiple concurrent requests can arrive out of order:
search$.pipe(
mergeMap(q => this.search(q)) // OLD result for 'ta' may arrive AFTER result for 'task'
)
// User sees stale results from earlier query overwriting newer ones!
✅ Correct — switchMap cancels previous request:
search$.pipe(
switchMap(q => this.search(q)) // previous request cancelled — only latest results shown
)
Mistake 2 — Using switchMap for form submit (request may be cancelled)
❌ Wrong — double-click sends two requests, second cancels the first:
fromEvent(btn, 'click').pipe(
switchMap(() => this.createTask(data)) // second click cancels first submission!
)
// First task may or may not have been created — data integrity at risk
✅ Correct — exhaustMap ignores clicks while in-flight:
fromEvent(btn, 'click').pipe(
exhaustMap(() => this.createTask(data)) // only one submission at a time
)
Mistake 3 — mergeMap without concurrency limit for large batches
❌ Wrong — 500 concurrent HTTP requests for bulk operations:
from(500Items).pipe(
mergeMap(item => this.process(item)) // 500 simultaneous HTTP requests!
)
✅ Correct — limit concurrency:
from(500Items).pipe(
mergeMap(item => this.process(item), 5) // max 5 concurrent requests
)
Quick Reference
| Scenario | Operator | Behaviour |
|---|---|---|
| Search autocomplete | switchMap |
Cancel on new query |
| Route-driven load | switchMap |
Cancel on navigation |
| Form submit | exhaustMap |
Ignore while in-flight |
| Login button | exhaustMap |
Prevent double-submit |
| Parallel uploads | mergeMap(fn, n) |
n concurrent |
| Sequential processing | concatMap |
One at a time, ordered |
| Auto-refresh | switchMap |
Cancel stale request |
| Undo queue | concatMap |
Preserve order |