Core Operators — map, filter, tap, debounceTime, scan, catchError

RxJS operators are the verbs of reactive programming — they transform, filter, delay, accumulate, and reshape the data flowing through an Observable stream. Understanding the most-used operators well is more valuable than knowing all 100+. This lesson covers the essential operators you will use in virtually every Angular service and component: map for transformation, filter for selection, tap for side effects, debounceTime for throttling user input, distinctUntilChanged for preventing duplicates, take/takeUntil for completion, catchError for error recovery, and retry for resilience.

Essential Operators Reference

Operator What It Does Marble: input → output
map(fn) Transform each value with a function 1, 2, 32, 4, 6 (×2)
filter(pred) Pass through only values that satisfy the predicate 1, 2, 3, 42, 4 (even)
tap(fn) Side effect without transforming — pass value through unchanged 1, 2, 31, 2, 3 (logs each)
take(n) Take only the first n values, then complete 1, 2, 3, 4, 51, 2, 3
takeUntil(notifier$) Complete when notifier emits Completes stream when destroy$ fires
skip(n) Skip the first n values 1, 2, 3, 43, 4
debounceTime(ms) Emit value only after ms silence — drops intermediate values ---a-b--c----d------b-----d
throttleTime(ms) Emit first value, ignore rest for ms duration a-b-c---da-------d
distinctUntilChanged() Suppress consecutive duplicate values 1, 1, 2, 2, 31, 2, 3
distinctUntilKeyChanged(key) Suppress consecutive duplicates by object key {id:1}, {id:1}, {id:2}{id:1}, {id:2}
scan(accumulator, seed) Stateful reduce — emits running total 1, 2, 31, 3, 6 (running sum)
catchError(fn) Catch errors and return a fallback Observable Error → recovery Observable
retry(n) Resubscribe on error up to n times Error → retry up to n times
finalize(fn) Run fn on complete or error (like try-finally) Always runs on stream end
startWith(value) Prepend an initial value before Observable emits 2, 31, 2, 3
pairwise() Emit previous and current value as a pair 1, 2, 3[1,2], [2,3]
bufferTime(ms) Collect emissions into arrays over a time window 1, 2, 3, 4, 5[1,2,3], [4,5]
Note: tap() is the correct operator for side effects inside an Observable pipeline — logging, setting loading state, dispatching analytics events. The key rule is that tap() must not change the value that flows through — it receives the value, performs a side effect, and passes the unchanged value downstream. If you need to transform the value, use map(). If you find yourself using tap(v => modifiedV = transform(v)) and then using modifiedV downstream, you should be using map() instead.
Tip: The search input pattern — debounceTime(300) + distinctUntilChanged() + switchMap() — is the single most common RxJS pattern in Angular. It waits until the user stops typing for 300ms, drops the emission if it is identical to the previous query (user deleted and retyped the same text), and cancels any pending HTTP request when a new query arrives. This pattern handles 95% of typeahead search implementations correctly and efficiently.
Warning: catchError() catches errors and replaces them with a new Observable. If you return EMPTY, the stream completes silently. If you return of(fallback), the stream emits the fallback and completes. If you return throwError(() => err), you re-throw the error. Critically: after catchError() runs, the original Observable has terminated — a caught error does NOT allow the stream to continue emitting. If you need to recover and continue, you need catchError inside a higher-order operator like switchMap.

Complete Operator Examples

import {
    fromEvent, interval, of, from, EMPTY, throwError, Subject,
} from 'rxjs';
import {
    map, filter, tap, take, takeUntil, skip, debounceTime,
    throttleTime, distinctUntilChanged, scan, catchError, retry,
    retryWhen, delay, finalize, startWith, pairwise, bufferTime,
    shareReplay,
} from 'rxjs/operators';
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';

// ── map — transform each value ────────────────────────────────────────────
of(1, 2, 3).pipe(
    map(n => n * 2),             // transform: double each
    map(n => `Item ${n}`),       // chain: to string
).subscribe(console.log);        // 'Item 2', 'Item 4', 'Item 6'

// Transform API response
this.http.get<ApiResponse<Task[]>>('/api/tasks').pipe(
    map(response => response.data),          // extract .data
    map(tasks => tasks.filter(t => !t.deletedAt)),  // filter deleted
    map(tasks => tasks.sort((a, b) => b.priority.localeCompare(a.priority))),
).subscribe(tasks => this.tasks = tasks);

// ── filter — pass only matching values ───────────────────────────────────
of(1, 2, 3, 4, 5, 6).pipe(
    filter(n => n % 2 === 0),    // only even numbers
).subscribe(console.log);        // 2, 4, 6

// Filter keyboard events to specific keys
fromEvent<KeyboardEvent>(document, 'keyup').pipe(
    filter(e => e.key === 'Enter' || e.key === 'Escape'),
    map(e => e.key),
).subscribe(key => this.handleKey(key));

// ── tap — side effects without transformation ─────────────────────────────
this.taskService.getAll().pipe(
    tap(() => this.loading.set(true)),           // before values arrive
    tap(tasks => console.log('Loaded:', tasks.length)),  // inspect without changing
    finalize(() => this.loading.set(false)),      // always runs on complete or error
).subscribe(tasks => this.tasks.set(tasks));

// ── debounceTime + distinctUntilChanged — search input pattern ─────────────
@Component({ ... })
export class SearchComponent implements OnInit {
    searchControl = new FormControl('');

    ngOnInit(): void {
        this.searchControl.valueChanges.pipe(
            debounceTime(350),              // wait 350ms after last keystroke
            distinctUntilChanged(),         // skip if same as previous query
            filter(q => (q?.length ?? 0) >= 2 || q === ''),  // min 2 chars
            tap(q => this.loading.set(true)),
            switchMap(q => q
                ? this.taskService.search(q)
                : of([])
            ),
            tap(() => this.loading.set(false)),
            takeUntilDestroyed(),
        ).subscribe(results => this.results.set(results));
    }
}

// ── scan — stateful accumulation ──────────────────────────────────────────
// Running count of completed tasks
this.taskEvents$.pipe(
    filter(e => e.type === 'COMPLETED'),
    scan((count, _) => count + 1, 0),         // count: 0 → 1 → 2 → ...
).subscribe(count => this.completedCount.set(count));

// Accumulate items into an array
const cartItems$ = this.addItem$.pipe(
    scan((items, newItem) => [...items, newItem], [] as Item[]),
);

// ── catchError — handle errors gracefully ────────────────────────────────
this.taskService.getById(id).pipe(
    catchError(err => {
        if (err.status === 404) {
            this.router.navigate(['/tasks']);
            return EMPTY;             // complete without error
        }
        if (err.status === 403) {
            return of(null);          // return null for forbidden
        }
        return throwError(() => err); // re-throw unexpected errors
    }),
    filter(task => task !== null),
).subscribe(task => this.task.set(task!));

// ── retry — automatic retry on transient failures ─────────────────────────
this.taskService.getAll().pipe(
    retry(3),                         // retry up to 3 times on any error
).subscribe(tasks => this.tasks.set(tasks));

// Retry with delay (exponential backoff)
this.taskService.getAll().pipe(
    retryWhen(errors => errors.pipe(
        scan((retryCount, err) => {
            if (retryCount >= 3) throw err;  // give up after 3 retries
            return retryCount + 1;
        }, 0),
        delay(1000),    // wait 1 second between retries
    )),
).subscribe(tasks => this.tasks.set(tasks));

// ── take — limit emissions ────────────────────────────────────────────────
// Get exactly one value from a stream
const user$ = this.userSubject$.pipe(
    filter(u => u !== null),
    take(1),              // complete after first non-null user
);

// ── startWith — add initial value ────────────────────────────────────────
// Show loading state immediately before data arrives
this.taskService.getAll().pipe(
    map(tasks => ({ loading: false, tasks })),
    startWith({ loading: true, tasks: [] }),
).subscribe(state => {
    this.loading.set(state.loading);
    this.tasks.set(state.tasks);
});

// ── pairwise — compare consecutive values ────────────────────────────────
this.route.params.pipe(
    map(p => p['id']),
    pairwise(),           // [previousId, currentId]
).subscribe(([prevId, currentId]) => {
    if (prevId !== currentId) this.loadTask(currentId);
});

How It Works

Step 1 — Operators Are Pure Functions That Return Observables

Each operator is a function that takes an Observable and returns a new Observable. map(fn) returns an Observable that subscribes to the source, transforms each value with fn, and emits the result. Operators do not subscribe themselves — they just wrap. Only calling subscribe() at the end triggers the actual subscription chain. This means you can build and store the entire pipeline as a variable and only start it when needed.

Step 2 — debounceTime Waits for Silence

When the source emits a value, debounceTime(300) starts a 300ms timer. If the source emits again before the timer expires, the timer resets. Only when 300ms passes without a new emission does the latest value flow downstream. This is perfect for search inputs — it means the search query only triggers after the user has paused typing, not on every keystroke. The intermediate values (each character typed) are discarded.

Step 3 — distinctUntilChanged Uses Reference Equality by Default

distinctUntilChanged() compares the current value to the previous value using strict equality (===). For primitives (strings, numbers, booleans), this works as expected. For objects and arrays, since each emission creates a new reference, distinctUntilChanged() would never suppress any emission. Use distinctUntilChanged((prev, curr) => prev.id === curr.id) to compare by a specific property, or distinctUntilKeyChanged('id') for the common case.

Step 4 — scan Maintains State Across Emissions

scan(accumulator, seed) works like Array.prototype.reduce() but emits the running total after each emission rather than waiting until the array ends. The accumulator function receives the current accumulated value and the new emission, and returns the new accumulated state. This is the operator for building stateful streams — running totals, accumulated arrays, state machines, and undo history.

Step 5 — catchError Replaces Error with a New Observable

When the source Observable errors, catchError(fn) calls your function with the error and expects an Observable in return. This new Observable becomes the replacement stream — its values flow to subscribers as if nothing happened. Once the original stream has errored, it is finished — catchError cannot restore it. The replacement Observable you return (whether EMPTY, of(fallback), or a retry Observable) is what subscribers receive from that point.

Common Mistakes

Mistake 1 — Using tap() for transformation instead of map()

❌ Wrong — tap doesn’t change the value flowing downstream:

source$.pipe(
    tap(v => transformed = v * 2),   // v still flows unchanged!
    // downstream still gets the original v, not transformed
)

✅ Correct — map() transforms the value:

source$.pipe(
    map(v => v * 2),    // v * 2 flows downstream
)

Mistake 2 — Catching errors inside switchMap and swallowing them

❌ Wrong — error causes the outer stream to complete silently:

searchQuery$.pipe(
    switchMap(q => this.search(q)),
    catchError(() => EMPTY),   // error anywhere → stream ends! future queries ignored
)

✅ Correct — catch errors inside the inner Observable:

searchQuery$.pipe(
    switchMap(q => this.search(q).pipe(
        catchError(() => of([]))   // only the inner obs fails, outer stream continues
    )),
)

Mistake 3 — Forgetting to chain debounceTime before distinctUntilChanged

❌ Wrong — distinctUntilChanged fires on every keystroke, no debounce:

input$.pipe(
    distinctUntilChanged(),   // only filters exact duplicates — still fires on every key
    debounceTime(300),        // wrong order — debounce should come first
)

✅ Correct — debounce first, then de-duplicate:

input$.pipe(
    debounceTime(300),         // wait for pause in typing
    distinctUntilChanged(),    // then skip if same as before
)

Quick Reference

Task Operator
Transform value map(v => transform(v))
Select values filter(v => condition)
Side effect only tap(v => sideEffect(v))
Debounce input debounceTime(300)
Skip duplicates distinctUntilChanged()
Take first n take(n)
Running total scan((acc, v) => acc + v, 0)
Handle error catchError(err => of(fallback))
Retry on error retry(3)
Always cleanup finalize(() => cleanup())
Initial value startWith(initialValue)

🧠 Test Yourself

A search input stream uses catchError(() => EMPTY) at the end of the pipe. When the API returns an error, what happens to future search queries?