Real Angular applications rarely work with a single data stream. A dashboard might need the current user AND their tasks AND unread notifications — all loaded before rendering. A form might combine the current form value with a status Observable and a validation signal. Combination operators let you work with multiple Observables simultaneously, joining their values in ways that match your use case: latest from all, only when all complete, any emission from any source, or strict pairing. Choosing the right combination operator is the key to writing clean, correct multi-stream code.
Combination Operators Comparison
| Operator | Emits When | Output | Completes When |
|---|---|---|---|
combineLatest([a$, b$]) |
Any source emits (after ALL have emitted at least once) | Array of latest values: [latestA, latestB] |
All sources complete |
forkJoin([a$, b$]) |
ALL sources complete | Array of last values: [lastA, lastB] |
When it emits (once) |
merge(a$, b$) |
Any source emits | Single values as they arrive (no combination) | All sources complete |
zip(a$, b$) |
ALL sources have emitted a new value (pair-wise) | Paired array: [a1, b1], [a2, b2] |
First source completes |
race(a$, b$) |
First source to emit | Values from the winning source only | When winning source completes |
withLatestFrom(b$) |
Primary source emits (b$ must have emitted) | [primaryValue, latestB] |
Primary source completes |
Choosing the Right Operator
| Scenario | Use | Why |
|---|---|---|
| Dashboard: load user + tasks + notifications | forkJoin |
All are HTTP calls — wait for all to complete, then render |
| Form: update preview when any field changes | combineLatest |
Emit whenever any source changes, always have latest from all |
| Multiple event sources (keyboard, mouse, touch) | merge |
Handle events from any source as a single stream |
| Button click: grab current filter state | withLatestFrom |
Only emit when primary (button) fires, include secondary’s latest |
| Parallel requests that must be paired | zip |
Strict 1:1 pairing of emissions |
| First to respond wins (cache vs network) | race |
Use fastest source only |
forkJoin() is the go-to operator for parallel HTTP requests that all need to complete before you can do anything useful. It is the RxJS equivalent of Promise.all(). An important caveat: if any of the source Observables errors, forkJoin errors immediately and all other requests are cancelled. Use forkJoin with individual catchError handlers on each source if you need partial results when some requests fail.combineLatest() does not emit until every source has emitted at least one value. If one source is slow to start (like a BehaviorSubject that has not been updated yet), combineLatest will wait silently. Use startWith(defaultValue) on each source to provide an immediate initial value, ensuring combineLatest can emit right away: combineLatest([tasks$.pipe(startWith([])), filter$.pipe(startWith(''))]).combineLatest() emits every time ANY source changes — which can be very frequent in a busy application. If you have four sources and they all update simultaneously (like on page load), combineLatest emits four times. Use debounceTime(0) (or auditTime(0)) after combineLatest to coalesce synchronous bursts into a single emission: combineLatest([...]).pipe(debounceTime(0)).Complete Combination Operator Examples
import {
combineLatest, forkJoin, merge, zip, race, of, EMPTY, throwError,
} from 'rxjs';
import {
map, catchError, startWith, withLatestFrom,
switchMap, debounceTime, tap,
} from 'rxjs/operators';
// ── forkJoin — parallel HTTP requests, all complete before render ──────────
@Component({ ... })
export class DashboardComponent implements OnInit {
data = signal<{ user: User; tasks: Task[]; stats: Stats } | null>(null);
loading = signal(true);
ngOnInit(): void {
forkJoin({
user: this.userService.getMe(),
tasks: this.taskService.getAll(),
stats: this.taskService.getStats(),
}).pipe(
tap(() => this.loading.set(false)),
catchError(err => {
this.error.set(err.message);
this.loading.set(false);
return EMPTY;
}),
takeUntilDestroyed(),
).subscribe(data => this.data.set(data));
}
}
// forkJoin with individual error handling — partial success
forkJoin({
tasks: this.taskService.getAll().pipe(catchError(() => of([]))),
stats: this.taskService.getStats().pipe(catchError(() => of(null))),
profile: this.userService.getProfile().pipe(catchError(() => of(null))),
}).subscribe(({ tasks, stats, profile }) => {
// tasks will be [] if request failed
// stats and profile will be null if failed
this.tasks.set(tasks);
if (stats) this.stats.set(stats);
if (profile) this.profile.set(profile);
});
// ── combineLatest — react to any change ───────────────────────────────────
// Filter + sort + page all change independently — update the task view when any changes
combineLatest({
tasks: this.taskStore.tasks$.pipe(startWith([])),
filter: this.filterState.status$.pipe(startWith('')),
sort: this.sortState.field$.pipe(startWith('createdAt')),
page: this.paginationState.page$.pipe(startWith(1)),
}).pipe(
debounceTime(0), // coalesce simultaneous changes (e.g. on page load)
map(({ tasks, filter, sort, page }) => {
const filtered = filter ? tasks.filter(t => t.status === filter) : tasks;
const sorted = [...filtered].sort((a, b) => (a[sort] > b[sort] ? 1 : -1));
const start = (page - 1) * 10;
return sorted.slice(start, start + 10);
}),
takeUntilDestroyed(),
).subscribe(tasks => this.displayedTasks.set(tasks));
// ── withLatestFrom — button click + current state ─────────────────────────
// "Export" button takes the current filter state at the time of click
fromEvent(exportBtn, 'click').pipe(
withLatestFrom(
this.filterState.status$,
this.filterState.priority$,
this.filterState.dateRange$,
),
map(([_, status, priority, dateRange]) => ({
status, priority, dateRange
})),
switchMap(filters => this.exportService.exportTasks(filters)),
takeUntilDestroyed(),
).subscribe(blob => saveAs(blob, 'tasks.csv'));
// ── merge — multiple event sources ───────────────────────────────────────
// Handle save from button click OR Ctrl+S keyboard shortcut
const saveViaButton$ = fromEvent(saveBtn, 'click');
const saveViaKeyboard$ = fromEvent<KeyboardEvent>(document, 'keydown').pipe(
filter(e => e.ctrlKey && e.key === 's'),
tap(e => e.preventDefault()),
);
merge(saveViaButton$, saveViaKeyboard$).pipe(
debounceTime(300), // prevent double-save from simultaneous triggers
switchMap(() => this.taskService.save(this.formValue)),
takeUntilDestroyed(),
).subscribe(() => this.toast.success('Saved'));
// ── combineLatest for real-time form preview ──────────────────────────────
combineLatest([
this.titleControl.valueChanges.pipe(startWith(this.titleControl.value)),
this.descControl.valueChanges.pipe(startWith(this.descControl.value)),
this.priorityControl.valueChanges.pipe(startWith(this.priorityControl.value)),
]).pipe(
map(([title, description, priority]) => ({ title, description, priority })),
debounceTime(100),
takeUntilDestroyed(),
).subscribe(preview => this.preview.set(preview));
How It Works
Step 1 — forkJoin Subscribes to All Sources Simultaneously
forkJoin subscribes to all its source Observables at the same time. It tracks which ones have completed and stores their last emitted value. When the final source completes, forkJoin emits a single value — an array (or object with named keys) containing the last value from each source — and then completes itself. If any source errors before completing, forkJoin unsubscribes from all others and propagates the error.
Step 2 — combineLatest Requires at Least One Emission from Each Source
combineLatest maintains a “latest values” slot for each source. Before it can emit anything, every source must have emitted at least once — filling its slot. After that initial condition is met, any single emission from any source updates that source’s slot and triggers a new combineLatest emission with the updated array of latest values. Sources that are BehaviorSubjects or have startWith() fill their slots immediately.
Step 3 — withLatestFrom Samples a Secondary Stream on Primary Trigger
withLatestFrom(secondary$) does not subscribe the way combineLatest does. It only emits when the primary Observable (the one before the pipe) emits. When that happens, it grabs the most recent value from secondary$ and combines them. This is the correct operator when you want “do X when Y happens, using the current state of Z” — the button click is Y, the current filter state is Z.
Step 4 — merge Interleaves All Sources
merge(a$, b$, c$) subscribes to all sources simultaneously and forwards every emission from every source to a single output stream. Values are interleaved as they arrive — there is no combining, just forwarding. The merged stream completes when all sources complete. Use merge when you want to treat multiple event sources as one stream without pairing or combining their values.
Step 5 — debounceTime(0) Coalesces Synchronous Bursts
When multiple combineLatest sources update in the same JavaScript tick (like when loading page data), combineLatest emits once for each update. A debounceTime(0) after combineLatest defers the emission to the next microtask, coalescing multiple rapid emissions into one. This is a zero-cost optimisation — it does not add any real time delay, just pushes the emission to the next event loop iteration where all synchronous updates have settled.
Common Mistakes
Mistake 1 — Using combineLatest when forkJoin is correct for HTTP
❌ Wrong — combineLatest re-emits if any Observable emits again after initial load:
combineLatest([
this.userService.getMe(), // HTTP — completes after 1 emission
this.taskService.getAll(), // HTTP — completes after 1 emission
]).subscribe(([user, tasks]) => { ... });
// Works here but if either Observable were long-lived, would keep re-emitting
✅ Correct — use forkJoin for parallel HTTP that all complete once:
forkJoin([this.userService.getMe(), this.taskService.getAll()])
.subscribe(([user, tasks]) => { ... });
Mistake 2 — combineLatest never emitting because one source hasn’t started
❌ Wrong — if status$ never emits, combineLatest never emits:
combineLatest([tasks$, status$]).subscribe(...)
// If status$ is a Subject that hasn't emitted yet: nothing happens!
✅ Correct — provide initial values with startWith:
combineLatest([
tasks$.pipe(startWith([])),
status$.pipe(startWith('all')),
]).subscribe(...)
// Emits immediately with [[], 'all']
Mistake 3 — Using merge when values need to be combined
❌ Wrong — merge just passes individual values, no combination:
merge(user$, tasks$).subscribe(value => {
// value is either a User OR a Task[] — not both at once!
// You don't know which one arrived
})
✅ Correct — use combineLatest or forkJoin to get both values together:
combineLatest({ user: user$, tasks: tasks$ })
.subscribe(({ user, tasks }) => { /* both available */ })
Quick Reference
| Need | Operator | Example |
|---|---|---|
| All HTTP complete | forkJoin |
forkJoin({ user: user$, tasks: tasks$ }) |
| React to any change | combineLatest |
combineLatest([filter$, sort$, page$]) |
| Multiple event sources | merge |
merge(click$, keydown$) |
| Trigger + current state | withLatestFrom |
button$.pipe(withLatestFrom(filter$)) |
| Pair emissions 1:1 | zip |
zip(request$, response$) |
| First to respond | race |
race(cache$, network$) |
| Ensure initial value | startWith |
tasks$.pipe(startWith([])) |
| Coalesce burst | debounceTime(0) |
After combineLatest |