Observables, Subjects, and the Observable Contract

RxJS (Reactive Extensions for JavaScript) is the library that powers Angular’s asynchronous operations. While Angular Signals (covered in Chapter 12) handle synchronous reactive state, RxJS handles the fundamentally async world: HTTP responses, user input streams, WebSocket messages, timers, and any operation where values arrive over time. Understanding what an Observable actually is โ€” how it differs from a Promise, how it is created, when it is “cold” vs “hot”, and how Subjects bridge the imperative and reactive worlds โ€” gives you the foundation for using every RxJS operator correctly.

Observable vs Promise vs Signal

Concept Promise Observable Signal
Values over time One value (or rejection) Zero, one, or many values Always has one current value
Lazy / Eager Eager โ€” starts immediately Cold = lazy (starts on subscribe) Computed = lazy
Cancellable No (natively) Yes โ€” unsubscribe() N/A (synchronous)
Composable Limited (.then chaining) Rich operator library (100+ operators) computed() chains
Sync or Async Always async Can be synchronous or async Always synchronous
Error handling .catch() catchError() operator or error callback N/A
Angular use One-off tasks HTTP, streams, events Component/service state

Subject Types

Subject Replay Behaviour Initial Value Best For
Subject<T> None โ€” only future values None Event buses, one-shot notifications
BehaviorSubject<T> Replays last value to new subscribers Required State containers โ€” always has a current value
ReplaySubject<T>(n) Replays last n values to new subscribers None Late subscribers that need history (logs, undo)
AsyncSubject<T> Replays only the last value, on completion None Resource loading โ€” emit once on complete
Note: Cold Observables (like those returned by HttpClient.get()) create a new execution for each subscriber. Two subscriptions to the same http.get('/api/tasks') call make two separate HTTP requests. To share a single execution among multiple subscribers, use shareReplay(1) โ€” this makes the Observable “hot” and caches the last emitted value for late subscribers. This matters in Angular when multiple components subscribe to the same service Observable.
Tip: Use BehaviorSubject in services as a state container when you need the current value to be available synchronously. Expose it as an Observable via .asObservable() to prevent external code from calling .next() on it. Components subscribe to the Observable, while the service controls mutations through methods that call subject.next(newValue). This pattern is the RxJS equivalent of the Signals store from Chapter 12 โ€” choose signals for new code, BehaviorSubject for existing RxJS-heavy codebases.
Warning: Always unsubscribe from Observables when a component is destroyed. An active subscription holds a reference to the subscriber function and any closure variables it captures โ€” preventing garbage collection. Common sources of leaks: interval() timers, fromEvent() DOM listeners, WebSocket streams, and any Observable that never completes. Use takeUntilDestroyed() (Angular 16+), the async pipe, or a manual ngOnDestroy with a Subject + takeUntil pattern.

Observable and Subject Examples

import {
    Observable, Subject, BehaviorSubject, ReplaySubject,
    of, from, fromEvent, interval, timer, EMPTY, throwError,
    combineLatest, forkJoin,
} from 'rxjs';
import { map, filter, take, takeUntil, share, shareReplay } from 'rxjs/operators';

// โ”€โ”€ Creating Observables โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€

// From static values โ€” synchronous
const nums$    = of(1, 2, 3);                      // emits 1, 2, 3, then completes
const array$   = from([10, 20, 30]);               // same โ€” from array
const promise$ = from(fetch('/api/health'));        // from a Promise

// From events
const clicks$  = fromEvent(document, 'click');
const keyup$   = fromEvent<KeyboardEvent>(document, 'keyup');
const resize$  = fromEvent(window, 'resize');

// Time-based
const every1s$ = interval(1000);                  // 0, 1, 2, ... every second (never completes)
const after5s$ = timer(5000);                     // emits 0 once after 5 seconds, then completes
const periodic$= timer(0, 2000);                  // emits immediately, then every 2 seconds

// Custom Observable โ€” full control over when/what to emit
const custom$ = new Observable<number>(subscriber => {
    subscriber.next(1);
    subscriber.next(2);

    const timeout = setTimeout(() => {
        subscriber.next(3);
        subscriber.complete();     // signal: no more values
    }, 1000);

    // Teardown: called when unsubscribed
    return () => clearTimeout(timeout);
});

// โ”€โ”€ Subscribing โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
const sub = nums$.subscribe({
    next:     value    => console.log('Value:', value),
    error:    err      => console.error('Error:', err),
    complete: ()       => console.log('Complete'),
});

sub.unsubscribe();   // cancel subscription

// โ”€โ”€ Subjects โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€

// Subject โ€” no initial value, only future emissions
const event$ = new Subject<string>();
event$.subscribe(e => console.log('A received:', e));
event$.next('click');         // A receives 'click'
event$.next('hover');         // A receives 'hover'
// Late subscriber misses past events:
event$.subscribe(e => console.log('B received:', e));
event$.next('scroll');        // BOTH A and B receive 'scroll'

// BehaviorSubject โ€” holds current value, replays to new subscribers
const state$ = new BehaviorSubject<string[]>([]);
state$.subscribe(v => console.log('subscriber 1:', v));  // immediately: []
state$.next(['task1']);
state$.next(['task1', 'task2']);
// Late subscriber gets the CURRENT value immediately:
state$.subscribe(v => console.log('subscriber 2:', v));  // immediately: ['task1', 'task2']
console.log('current:', state$.getValue());              // synchronous read: ['task1', 'task2']

// ReplaySubject โ€” replays last N values to new subscribers
const log$ = new ReplaySubject<string>(3);   // buffer last 3
log$.next('event 1');
log$.next('event 2');
log$.next('event 3');
log$.next('event 4');
log$.subscribe(v => console.log(v));  // replays: 'event 2', 'event 3', 'event 4'

// โ”€โ”€ Cold vs Hot Observables โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€

// COLD โ€” each subscriber gets its own execution
// httpClient.get() is cold โ€” two subscriptions = two HTTP requests!
const cold$ = new Observable(sub => {
    console.log('Execution started');
    sub.next(Math.random());
    sub.complete();
});
cold$.subscribe(v => console.log('Sub 1:', v));  // Execution started, 0.7234
cold$.subscribe(v => console.log('Sub 2:', v));  // Execution started, 0.1456 (different!)

// HOT โ€” single execution shared among all subscribers
// Use shareReplay(1) to make a cold Observable hot and cache latest value
const hot$ = cold$.pipe(shareReplay(1));
hot$.subscribe(v => console.log('Sub 1:', v));   // Execution started, 0.7234
hot$.subscribe(v => console.log('Sub 2:', v));   // 0.7234 โ€” same value, no new execution!

// โ”€โ”€ Service pattern with BehaviorSubject โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
import { Injectable } from '@angular/core';

@Injectable({ providedIn: 'root' })
export class ThemeService {
    private _theme$ = new BehaviorSubject<'light' | 'dark'>('light');

    // Expose as read-only Observable
    readonly theme$ = this._theme$.asObservable();

    // Synchronous getter
    get currentTheme(): 'light' | 'dark' { return this._theme$.getValue(); }

    setTheme(theme: 'light' | 'dark'): void {
        this._theme$.next(theme);
        localStorage.setItem('theme', theme);
    }

    toggleTheme(): void {
        this.setTheme(this.currentTheme === 'light' ? 'dark' : 'light');
    }
}

How It Works

Step 1 โ€” An Observable Is a Function That Takes a Subscriber

At its core, an Observable is a function that, when called with a subscriber (an object with next, error, complete callbacks), begins producing values and eventually either completes or errors. Until subscribe() is called, nothing happens โ€” this is the “cold” behaviour. The subscriber receives values via next(), learns about errors via error(), and knows the stream is finished via complete(). The function can optionally return a teardown function called on unsubscription.

Step 2 โ€” Operators Transform the Stream Without Subscribing

Operators like map(), filter(), and switchMap() do not subscribe to the Observable โ€” they return a new Observable that wraps the original. Each operator in a pipe() chain adds a layer of transformation. When you finally subscribe, the subscription propagates up through all the operator wrappers back to the source. This lazy chain is what makes RxJS so composable and testable โ€” you can describe the entire transformation pipeline before deciding to subscribe.

Step 3 โ€” Subjects Are Both Observable and Observer

A Subject implements both the Observable interface (you can subscribe to it) and the Observer interface (you can call next(), error(), complete() on it). This makes Subjects the bridge between imperative code (a button click, an API response arriving) and reactive pipelines. When you call subject.next(value), it immediately notifies all current subscribers with that value โ€” this is multicast, not unicast like cold Observables.

Step 4 โ€” BehaviorSubject Solves the Late Subscriber Problem

When a component subscribes to a service’s Observable after the data has already been loaded (e.g. navigating back to a page), a regular Subject would show nothing โ€” the emissions already happened. A BehaviorSubject stores the last emitted value and immediately replays it to any new subscriber. This ensures late-arriving components always have the current state rather than seeing an empty view while waiting for the next emission.

Step 5 โ€” shareReplay(1) Makes HTTP Observables Shareable

shareReplay(1) makes a cold Observable hot and caches its last value. Multiple subscribers get the same execution โ€” one HTTP request, shared result. For API calls where the data does not change frequently (user profile, configuration, reference data), caching with shareReplay(1) avoids duplicate network requests. The 1 parameter means cache one value โ€” late subscribers immediately receive the last cached value without triggering a new request.

Common Mistakes

Mistake 1 โ€” Subscribing multiple times to a cold HttpClient Observable

โŒ Wrong โ€” two HTTP requests for the same data:

const tasks$ = this.http.get<Task[]>('/api/tasks');
tasks$.subscribe(t => this.tasks = t);   // HTTP request 1
tasks$.subscribe(t => this.count = t.length);  // HTTP request 2 โ€” duplicate!

✅ Correct โ€” share a single execution:

const tasks$ = this.http.get<Task[]>('/api/tasks').pipe(shareReplay(1));
tasks$.subscribe(t => this.tasks = t);       // one request
tasks$.subscribe(t => this.count = t.length); // gets cached result

Mistake 2 โ€” Not unsubscribing โ€” memory leak

โŒ Wrong โ€” interval keeps running after component is destroyed:

ngOnInit(): void {
    interval(5000).subscribe(() => this.refresh());  // never cleaned up!
}

✅ Correct โ€” use takeUntilDestroyed():

ngOnInit(): void {
    interval(5000)
        .pipe(takeUntilDestroyed())
        .subscribe(() => this.refresh());
}

Mistake 3 โ€” Exposing a Subject directly โ€” external code can emit values

โŒ Wrong โ€” any code can call tasks$.next():

@Injectable({ providedIn: 'root' })
export class TaskService {
    tasks$ = new BehaviorSubject<Task[]>([]);  // public Subject!
}
// Anywhere: taskService.tasks$.next([])  โ€” bypasses service logic

✅ Correct โ€” expose as read-only Observable:

private _tasks$ = new BehaviorSubject<Task[]>([]);
readonly tasks$ = this._tasks$.asObservable();   // no .next() access

Quick Reference

Need Use
Static values of(1, 2, 3) or from([1, 2, 3])
From Promise from(promise)
DOM events fromEvent(element, 'click')
Timer timer(delay) or interval(period)
Emit + complete now of(value)
Emit nothing EMPTY
Throw error throwError(() => new Error('msg'))
Event bus new Subject<T>()
State with current value new BehaviorSubject<T>(initial)
Share single HTTP execution http.get().pipe(shareReplay(1))

🧠 Test Yourself

A service has private _data$ = new BehaviorSubject<string[]>(['a']). A component subscribes after ‘b’ has been emitted. What does it receive immediately on subscription?