Combination Operators — combineLatest, forkJoin, merge, withLatestFrom

Real Angular applications rarely work with a single data stream. A dashboard might need the current user AND their tasks AND unread notifications — all loaded before rendering. A form might combine the current form value with a status Observable and a validation signal. Combination operators let you work with multiple Observables simultaneously, joining their values in ways that match your use case: latest from all, only when all complete, any emission from any source, or strict pairing. Choosing the right combination operator is the key to writing clean, correct multi-stream code.

Combination Operators Comparison

Operator Emits When Output Completes When
combineLatest([a$, b$]) Any source emits (after ALL have emitted at least once) Array of latest values: [latestA, latestB] All sources complete
forkJoin([a$, b$]) ALL sources complete Array of last values: [lastA, lastB] When it emits (once)
merge(a$, b$) Any source emits Single values as they arrive (no combination) All sources complete
zip(a$, b$) ALL sources have emitted a new value (pair-wise) Paired array: [a1, b1], [a2, b2] First source completes
race(a$, b$) First source to emit Values from the winning source only When winning source completes
withLatestFrom(b$) Primary source emits (b$ must have emitted) [primaryValue, latestB] Primary source completes

Choosing the Right Operator

Scenario Use Why
Dashboard: load user + tasks + notifications forkJoin All are HTTP calls — wait for all to complete, then render
Form: update preview when any field changes combineLatest Emit whenever any source changes, always have latest from all
Multiple event sources (keyboard, mouse, touch) merge Handle events from any source as a single stream
Button click: grab current filter state withLatestFrom Only emit when primary (button) fires, include secondary’s latest
Parallel requests that must be paired zip Strict 1:1 pairing of emissions
First to respond wins (cache vs network) race Use fastest source only
Note: forkJoin() is the go-to operator for parallel HTTP requests that all need to complete before you can do anything useful. It is the RxJS equivalent of Promise.all(). An important caveat: if any of the source Observables errors, forkJoin errors immediately and all other requests are cancelled. Use forkJoin with individual catchError handlers on each source if you need partial results when some requests fail.
Tip: combineLatest() does not emit until every source has emitted at least one value. If one source is slow to start (like a BehaviorSubject that has not been updated yet), combineLatest will wait silently. Use startWith(defaultValue) on each source to provide an immediate initial value, ensuring combineLatest can emit right away: combineLatest([tasks$.pipe(startWith([])), filter$.pipe(startWith(''))]).
Warning: combineLatest() emits every time ANY source changes — which can be very frequent in a busy application. If you have four sources and they all update simultaneously (like on page load), combineLatest emits four times. Use debounceTime(0) (or auditTime(0)) after combineLatest to coalesce synchronous bursts into a single emission: combineLatest([...]).pipe(debounceTime(0)).

Complete Combination Operator Examples

import {
    combineLatest, forkJoin, merge, zip, race, of, EMPTY, throwError,
} from 'rxjs';
import {
    map, catchError, startWith, withLatestFrom,
    switchMap, debounceTime, tap,
} from 'rxjs/operators';

// ── forkJoin — parallel HTTP requests, all complete before render ──────────
@Component({ ... })
export class DashboardComponent implements OnInit {
    data = signal<{ user: User; tasks: Task[]; stats: Stats } | null>(null);
    loading = signal(true);

    ngOnInit(): void {
        forkJoin({
            user:  this.userService.getMe(),
            tasks: this.taskService.getAll(),
            stats: this.taskService.getStats(),
        }).pipe(
            tap(() => this.loading.set(false)),
            catchError(err => {
                this.error.set(err.message);
                this.loading.set(false);
                return EMPTY;
            }),
            takeUntilDestroyed(),
        ).subscribe(data => this.data.set(data));
    }
}

// forkJoin with individual error handling — partial success
forkJoin({
    tasks:   this.taskService.getAll().pipe(catchError(() => of([]))),
    stats:   this.taskService.getStats().pipe(catchError(() => of(null))),
    profile: this.userService.getProfile().pipe(catchError(() => of(null))),
}).subscribe(({ tasks, stats, profile }) => {
    // tasks will be [] if request failed
    // stats and profile will be null if failed
    this.tasks.set(tasks);
    if (stats)   this.stats.set(stats);
    if (profile) this.profile.set(profile);
});

// ── combineLatest — react to any change ───────────────────────────────────
// Filter + sort + page all change independently — update the task view when any changes
combineLatest({
    tasks:  this.taskStore.tasks$.pipe(startWith([])),
    filter: this.filterState.status$.pipe(startWith('')),
    sort:   this.sortState.field$.pipe(startWith('createdAt')),
    page:   this.paginationState.page$.pipe(startWith(1)),
}).pipe(
    debounceTime(0),     // coalesce simultaneous changes (e.g. on page load)
    map(({ tasks, filter, sort, page }) => {
        const filtered = filter ? tasks.filter(t => t.status === filter) : tasks;
        const sorted   = [...filtered].sort((a, b) => (a[sort] > b[sort] ? 1 : -1));
        const start    = (page - 1) * 10;
        return sorted.slice(start, start + 10);
    }),
    takeUntilDestroyed(),
).subscribe(tasks => this.displayedTasks.set(tasks));

// ── withLatestFrom — button click + current state ─────────────────────────
// "Export" button takes the current filter state at the time of click
fromEvent(exportBtn, 'click').pipe(
    withLatestFrom(
        this.filterState.status$,
        this.filterState.priority$,
        this.filterState.dateRange$,
    ),
    map(([_, status, priority, dateRange]) => ({
        status, priority, dateRange
    })),
    switchMap(filters => this.exportService.exportTasks(filters)),
    takeUntilDestroyed(),
).subscribe(blob => saveAs(blob, 'tasks.csv'));

// ── merge — multiple event sources ───────────────────────────────────────
// Handle save from button click OR Ctrl+S keyboard shortcut
const saveViaButton$ = fromEvent(saveBtn, 'click');
const saveViaKeyboard$ = fromEvent<KeyboardEvent>(document, 'keydown').pipe(
    filter(e => e.ctrlKey && e.key === 's'),
    tap(e => e.preventDefault()),
);

merge(saveViaButton$, saveViaKeyboard$).pipe(
    debounceTime(300),    // prevent double-save from simultaneous triggers
    switchMap(() => this.taskService.save(this.formValue)),
    takeUntilDestroyed(),
).subscribe(() => this.toast.success('Saved'));

// ── combineLatest for real-time form preview ──────────────────────────────
combineLatest([
    this.titleControl.valueChanges.pipe(startWith(this.titleControl.value)),
    this.descControl.valueChanges.pipe(startWith(this.descControl.value)),
    this.priorityControl.valueChanges.pipe(startWith(this.priorityControl.value)),
]).pipe(
    map(([title, description, priority]) => ({ title, description, priority })),
    debounceTime(100),
    takeUntilDestroyed(),
).subscribe(preview => this.preview.set(preview));

How It Works

Step 1 — forkJoin Subscribes to All Sources Simultaneously

forkJoin subscribes to all its source Observables at the same time. It tracks which ones have completed and stores their last emitted value. When the final source completes, forkJoin emits a single value — an array (or object with named keys) containing the last value from each source — and then completes itself. If any source errors before completing, forkJoin unsubscribes from all others and propagates the error.

Step 2 — combineLatest Requires at Least One Emission from Each Source

combineLatest maintains a “latest values” slot for each source. Before it can emit anything, every source must have emitted at least once — filling its slot. After that initial condition is met, any single emission from any source updates that source’s slot and triggers a new combineLatest emission with the updated array of latest values. Sources that are BehaviorSubjects or have startWith() fill their slots immediately.

Step 3 — withLatestFrom Samples a Secondary Stream on Primary Trigger

withLatestFrom(secondary$) does not subscribe the way combineLatest does. It only emits when the primary Observable (the one before the pipe) emits. When that happens, it grabs the most recent value from secondary$ and combines them. This is the correct operator when you want “do X when Y happens, using the current state of Z” — the button click is Y, the current filter state is Z.

Step 4 — merge Interleaves All Sources

merge(a$, b$, c$) subscribes to all sources simultaneously and forwards every emission from every source to a single output stream. Values are interleaved as they arrive — there is no combining, just forwarding. The merged stream completes when all sources complete. Use merge when you want to treat multiple event sources as one stream without pairing or combining their values.

Step 5 — debounceTime(0) Coalesces Synchronous Bursts

When multiple combineLatest sources update in the same JavaScript tick (like when loading page data), combineLatest emits once for each update. A debounceTime(0) after combineLatest defers the emission to the next microtask, coalescing multiple rapid emissions into one. This is a zero-cost optimisation — it does not add any real time delay, just pushes the emission to the next event loop iteration where all synchronous updates have settled.

Common Mistakes

Mistake 1 — Using combineLatest when forkJoin is correct for HTTP

❌ Wrong — combineLatest re-emits if any Observable emits again after initial load:

combineLatest([
    this.userService.getMe(),      // HTTP — completes after 1 emission
    this.taskService.getAll(),     // HTTP — completes after 1 emission
]).subscribe(([user, tasks]) => { ... });
// Works here but if either Observable were long-lived, would keep re-emitting

✅ Correct — use forkJoin for parallel HTTP that all complete once:

forkJoin([this.userService.getMe(), this.taskService.getAll()])
    .subscribe(([user, tasks]) => { ... });

Mistake 2 — combineLatest never emitting because one source hasn’t started

❌ Wrong — if status$ never emits, combineLatest never emits:

combineLatest([tasks$, status$]).subscribe(...)
// If status$ is a Subject that hasn't emitted yet: nothing happens!

✅ Correct — provide initial values with startWith:

combineLatest([
    tasks$.pipe(startWith([])),
    status$.pipe(startWith('all')),
]).subscribe(...)
// Emits immediately with [[], 'all']

Mistake 3 — Using merge when values need to be combined

❌ Wrong — merge just passes individual values, no combination:

merge(user$, tasks$).subscribe(value => {
    // value is either a User OR a Task[] — not both at once!
    // You don't know which one arrived
})

✅ Correct — use combineLatest or forkJoin to get both values together:

combineLatest({ user: user$, tasks: tasks$ })
    .subscribe(({ user, tasks }) => { /* both available */ })

Quick Reference

Need Operator Example
All HTTP complete forkJoin forkJoin({ user: user$, tasks: tasks$ })
React to any change combineLatest combineLatest([filter$, sort$, page$])
Multiple event sources merge merge(click$, keydown$)
Trigger + current state withLatestFrom button$.pipe(withLatestFrom(filter$))
Pair emissions 1:1 zip zip(request$, response$)
First to respond race race(cache$, network$)
Ensure initial value startWith tasks$.pipe(startWith([]))
Coalesce burst debounceTime(0) After combineLatest

🧠 Test Yourself

A component needs to load a user profile AND their tasks simultaneously, then render both only when both requests have completed. Which operator is correct?