RxJS (Reactive Extensions for JavaScript) is the library that powers Angular’s asynchronous operations. While Angular Signals (covered in Chapter 12) handle synchronous reactive state, RxJS handles the fundamentally async world: HTTP responses, user input streams, WebSocket messages, timers, and any operation where values arrive over time. Understanding what an Observable actually is โ how it differs from a Promise, how it is created, when it is “cold” vs “hot”, and how Subjects bridge the imperative and reactive worlds โ gives you the foundation for using every RxJS operator correctly.
Observable vs Promise vs Signal
| Concept | Promise | Observable | Signal |
|---|---|---|---|
| Values over time | One value (or rejection) | Zero, one, or many values | Always has one current value |
| Lazy / Eager | Eager โ starts immediately | Cold = lazy (starts on subscribe) | Computed = lazy |
| Cancellable | No (natively) | Yes โ unsubscribe() | N/A (synchronous) |
| Composable | Limited (.then chaining) | Rich operator library (100+ operators) | computed() chains |
| Sync or Async | Always async | Can be synchronous or async | Always synchronous |
| Error handling | .catch() | catchError() operator or error callback | N/A |
| Angular use | One-off tasks | HTTP, streams, events | Component/service state |
Subject Types
| Subject | Replay Behaviour | Initial Value | Best For |
|---|---|---|---|
Subject<T> |
None โ only future values | None | Event buses, one-shot notifications |
BehaviorSubject<T> |
Replays last value to new subscribers | Required | State containers โ always has a current value |
ReplaySubject<T>(n) |
Replays last n values to new subscribers | None | Late subscribers that need history (logs, undo) |
AsyncSubject<T> |
Replays only the last value, on completion | None | Resource loading โ emit once on complete |
HttpClient.get()) create a new execution for each subscriber. Two subscriptions to the same http.get('/api/tasks') call make two separate HTTP requests. To share a single execution among multiple subscribers, use shareReplay(1) โ this makes the Observable “hot” and caches the last emitted value for late subscribers. This matters in Angular when multiple components subscribe to the same service Observable.BehaviorSubject in services as a state container when you need the current value to be available synchronously. Expose it as an Observable via .asObservable() to prevent external code from calling .next() on it. Components subscribe to the Observable, while the service controls mutations through methods that call subject.next(newValue). This pattern is the RxJS equivalent of the Signals store from Chapter 12 โ choose signals for new code, BehaviorSubject for existing RxJS-heavy codebases.interval() timers, fromEvent() DOM listeners, WebSocket streams, and any Observable that never completes. Use takeUntilDestroyed() (Angular 16+), the async pipe, or a manual ngOnDestroy with a Subject + takeUntil pattern.Observable and Subject Examples
import {
Observable, Subject, BehaviorSubject, ReplaySubject,
of, from, fromEvent, interval, timer, EMPTY, throwError,
combineLatest, forkJoin,
} from 'rxjs';
import { map, filter, take, takeUntil, share, shareReplay } from 'rxjs/operators';
// โโ Creating Observables โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
// From static values โ synchronous
const nums$ = of(1, 2, 3); // emits 1, 2, 3, then completes
const array$ = from([10, 20, 30]); // same โ from array
const promise$ = from(fetch('/api/health')); // from a Promise
// From events
const clicks$ = fromEvent(document, 'click');
const keyup$ = fromEvent<KeyboardEvent>(document, 'keyup');
const resize$ = fromEvent(window, 'resize');
// Time-based
const every1s$ = interval(1000); // 0, 1, 2, ... every second (never completes)
const after5s$ = timer(5000); // emits 0 once after 5 seconds, then completes
const periodic$= timer(0, 2000); // emits immediately, then every 2 seconds
// Custom Observable โ full control over when/what to emit
const custom$ = new Observable<number>(subscriber => {
subscriber.next(1);
subscriber.next(2);
const timeout = setTimeout(() => {
subscriber.next(3);
subscriber.complete(); // signal: no more values
}, 1000);
// Teardown: called when unsubscribed
return () => clearTimeout(timeout);
});
// โโ Subscribing โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
const sub = nums$.subscribe({
next: value => console.log('Value:', value),
error: err => console.error('Error:', err),
complete: () => console.log('Complete'),
});
sub.unsubscribe(); // cancel subscription
// โโ Subjects โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
// Subject โ no initial value, only future emissions
const event$ = new Subject<string>();
event$.subscribe(e => console.log('A received:', e));
event$.next('click'); // A receives 'click'
event$.next('hover'); // A receives 'hover'
// Late subscriber misses past events:
event$.subscribe(e => console.log('B received:', e));
event$.next('scroll'); // BOTH A and B receive 'scroll'
// BehaviorSubject โ holds current value, replays to new subscribers
const state$ = new BehaviorSubject<string[]>([]);
state$.subscribe(v => console.log('subscriber 1:', v)); // immediately: []
state$.next(['task1']);
state$.next(['task1', 'task2']);
// Late subscriber gets the CURRENT value immediately:
state$.subscribe(v => console.log('subscriber 2:', v)); // immediately: ['task1', 'task2']
console.log('current:', state$.getValue()); // synchronous read: ['task1', 'task2']
// ReplaySubject โ replays last N values to new subscribers
const log$ = new ReplaySubject<string>(3); // buffer last 3
log$.next('event 1');
log$.next('event 2');
log$.next('event 3');
log$.next('event 4');
log$.subscribe(v => console.log(v)); // replays: 'event 2', 'event 3', 'event 4'
// โโ Cold vs Hot Observables โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
// COLD โ each subscriber gets its own execution
// httpClient.get() is cold โ two subscriptions = two HTTP requests!
const cold$ = new Observable(sub => {
console.log('Execution started');
sub.next(Math.random());
sub.complete();
});
cold$.subscribe(v => console.log('Sub 1:', v)); // Execution started, 0.7234
cold$.subscribe(v => console.log('Sub 2:', v)); // Execution started, 0.1456 (different!)
// HOT โ single execution shared among all subscribers
// Use shareReplay(1) to make a cold Observable hot and cache latest value
const hot$ = cold$.pipe(shareReplay(1));
hot$.subscribe(v => console.log('Sub 1:', v)); // Execution started, 0.7234
hot$.subscribe(v => console.log('Sub 2:', v)); // 0.7234 โ same value, no new execution!
// โโ Service pattern with BehaviorSubject โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ
import { Injectable } from '@angular/core';
@Injectable({ providedIn: 'root' })
export class ThemeService {
private _theme$ = new BehaviorSubject<'light' | 'dark'>('light');
// Expose as read-only Observable
readonly theme$ = this._theme$.asObservable();
// Synchronous getter
get currentTheme(): 'light' | 'dark' { return this._theme$.getValue(); }
setTheme(theme: 'light' | 'dark'): void {
this._theme$.next(theme);
localStorage.setItem('theme', theme);
}
toggleTheme(): void {
this.setTheme(this.currentTheme === 'light' ? 'dark' : 'light');
}
}
How It Works
Step 1 โ An Observable Is a Function That Takes a Subscriber
At its core, an Observable is a function that, when called with a subscriber (an object with next, error, complete callbacks), begins producing values and eventually either completes or errors. Until subscribe() is called, nothing happens โ this is the “cold” behaviour. The subscriber receives values via next(), learns about errors via error(), and knows the stream is finished via complete(). The function can optionally return a teardown function called on unsubscription.
Step 2 โ Operators Transform the Stream Without Subscribing
Operators like map(), filter(), and switchMap() do not subscribe to the Observable โ they return a new Observable that wraps the original. Each operator in a pipe() chain adds a layer of transformation. When you finally subscribe, the subscription propagates up through all the operator wrappers back to the source. This lazy chain is what makes RxJS so composable and testable โ you can describe the entire transformation pipeline before deciding to subscribe.
Step 3 โ Subjects Are Both Observable and Observer
A Subject implements both the Observable interface (you can subscribe to it) and the Observer interface (you can call next(), error(), complete() on it). This makes Subjects the bridge between imperative code (a button click, an API response arriving) and reactive pipelines. When you call subject.next(value), it immediately notifies all current subscribers with that value โ this is multicast, not unicast like cold Observables.
Step 4 โ BehaviorSubject Solves the Late Subscriber Problem
When a component subscribes to a service’s Observable after the data has already been loaded (e.g. navigating back to a page), a regular Subject would show nothing โ the emissions already happened. A BehaviorSubject stores the last emitted value and immediately replays it to any new subscriber. This ensures late-arriving components always have the current state rather than seeing an empty view while waiting for the next emission.
Step 5 โ shareReplay(1) Makes HTTP Observables Shareable
shareReplay(1) makes a cold Observable hot and caches its last value. Multiple subscribers get the same execution โ one HTTP request, shared result. For API calls where the data does not change frequently (user profile, configuration, reference data), caching with shareReplay(1) avoids duplicate network requests. The 1 parameter means cache one value โ late subscribers immediately receive the last cached value without triggering a new request.
Common Mistakes
Mistake 1 โ Subscribing multiple times to a cold HttpClient Observable
โ Wrong โ two HTTP requests for the same data:
const tasks$ = this.http.get<Task[]>('/api/tasks');
tasks$.subscribe(t => this.tasks = t); // HTTP request 1
tasks$.subscribe(t => this.count = t.length); // HTTP request 2 โ duplicate!
✅ Correct โ share a single execution:
const tasks$ = this.http.get<Task[]>('/api/tasks').pipe(shareReplay(1));
tasks$.subscribe(t => this.tasks = t); // one request
tasks$.subscribe(t => this.count = t.length); // gets cached result
Mistake 2 โ Not unsubscribing โ memory leak
โ Wrong โ interval keeps running after component is destroyed:
ngOnInit(): void {
interval(5000).subscribe(() => this.refresh()); // never cleaned up!
}
✅ Correct โ use takeUntilDestroyed():
ngOnInit(): void {
interval(5000)
.pipe(takeUntilDestroyed())
.subscribe(() => this.refresh());
}
Mistake 3 โ Exposing a Subject directly โ external code can emit values
โ Wrong โ any code can call tasks$.next():
@Injectable({ providedIn: 'root' })
export class TaskService {
tasks$ = new BehaviorSubject<Task[]>([]); // public Subject!
}
// Anywhere: taskService.tasks$.next([]) โ bypasses service logic
✅ Correct โ expose as read-only Observable:
private _tasks$ = new BehaviorSubject<Task[]>([]);
readonly tasks$ = this._tasks$.asObservable(); // no .next() access
Quick Reference
| Need | Use |
|---|---|
| Static values | of(1, 2, 3) or from([1, 2, 3]) |
| From Promise | from(promise) |
| DOM events | fromEvent(element, 'click') |
| Timer | timer(delay) or interval(period) |
| Emit + complete now | of(value) |
| Emit nothing | EMPTY |
| Throw error | throwError(() => new Error('msg')) |
| Event bus | new Subject<T>() |
| State with current value | new BehaviorSubject<T>(initial) |
| Share single HTTP execution | http.get().pipe(shareReplay(1)) |