RxJS Patterns — Search, Polling and Request Cancellation

Production Angular applications require RxJS patterns that go beyond simple subscribe-and-display. Live search with cancellation, periodic polling with pause-on-blur, retry with exponential backoff, and proper cleanup on component destruction are the patterns that separate applications that work under real conditions from those that work only in happy-path demos. Each pattern is a short operator pipeline — the challenge is knowing which operators combine to solve each specific problem.

Live Search with Cancellation

@Component({ standalone: true, imports: [ReactiveFormsModule, AsyncPipe], template: `
  <input [formControl]="searchControl" placeholder="Search posts...">
  @if (isSearching()) { <span>Searching...</span> }
  @for (result of results$ | async; track result.id) {
    <app-post-card [post]="result" />
  }
` })
export class LiveSearchComponent {
  private api         = inject(PostsApiService);
  searchControl       = new FormControl('');
  isSearching         = signal(false);

  // ── Complete live search pipeline ──────────────────────────────────────
  results$ = this.searchControl.valueChanges.pipe(
    debounceTime(350),              // wait for typing to pause
    distinctUntilChanged(),         // skip if value unchanged
    filter(query => (query?.length ?? 0) >= 2),  // min 2 chars
    tap(() => this.isSearching.set(true)),         // show loading
    switchMap(query =>              // cancel previous, start new
      this.api.searchPosts(query!).pipe(
        catchError(() => of([] as PostSummaryDto[])),  // error → empty list
      )
    ),
    tap(() => this.isSearching.set(false)),         // hide loading
  );
}

// ── Polling with page visibility ──────────────────────────────────────────
@Injectable({ providedIn: 'root' })
export class LiveStatsService {
  private stats$?: Observable<DashboardStats>;

  getStats(): Observable<DashboardStats> {
    if (!this.stats$) {
      this.stats$ = merge(
        of(null),          // trigger immediately on subscribe
        interval(30_000),  // then every 30 seconds
      ).pipe(
        // Pause polling when the browser tab is hidden
        switchMap(() => document.visibilityState === 'hidden'
          ? EMPTY
          : this.api.getDashboardStats()
        ),
        shareReplay(1),    // share one poller across all subscribers
      );
    }
    return this.stats$;
  }
}

// ── Retry with exponential backoff ────────────────────────────────────────
function exponentialBackoff(maxRetries = 3, initialDelayMs = 1000) {
  return (source$: Observable<any>) =>
    source$.pipe(
      retryWhen(errors$ =>
        errors$.pipe(
          mergeMap((err, attempt) => {
            if (attempt >= maxRetries) return throwError(() => err);
            const delayMs = initialDelayMs * Math.pow(2, attempt);  // 1s, 2s, 4s
            console.log(`Retrying in ${delayMs}ms (attempt ${attempt + 1})`);
            return timer(delayMs);
          })
        )
      )
    );
}

// Usage:
this.api.getCriticalData().pipe(
  exponentialBackoff(3, 1000),
).subscribe(data => this.criticalData.set(data));

// ── Cancel request on navigation ─────────────────────────────────────────
@Component({ standalone: true, template: '...' })
export class PostDetailComponent implements OnInit {
  private api        = inject(PostsApiService);
  private destroyRef = inject(DestroyRef);
  @Input() slug!: string;

  post = signal<PostDto | null>(null);

  ngOnInit() {
    this.api.getBySlug(this.slug).pipe(
      takeUntilDestroyed(this.destroyRef),  // cancel if user navigates away
      catchError(() => of(null)),
    ).subscribe(post => this.post.set(post));
  }
}
Note: The switchMap inside the search pipeline cancels the previous HTTP request when a new search query arrives. Angular’s HttpClient uses the browser’s XMLHttpRequest or Fetch API — when the Observable is unsubscribed (by switchMap cancelling it), Angular sends an abort() signal to the browser, which cancels the in-flight network request. This means the server may never process the cancelled request (if aborted early enough), and the browser frees the connection. This is a real bandwidth and server load saving for high-frequency search inputs.
Tip: Use shareReplay(1) for polling Observables shared across multiple components. Without it, each component that subscribes starts its own independent polling interval — 3 components subscribing to a 30-second poller creates 3 independent sets of API calls. With shareReplay(1), all subscribers share one polling interval and receive the same (replayed) latest value. The single polling subscription is active as long as at least one subscriber exists.
Warning: The retryWhen operator is deprecated in RxJS 7.5+ — use retry({ count, delay }) with a delay function instead. The modern equivalent of exponential backoff: retry({ count: 3, delay: (err, retryCount) => timer(1000 * Math.pow(2, retryCount)) }). Always bound the maximum retry count — unbounded retries on a persistently failing endpoint can cause a retry storm that exacerbates server problems.

Common Mistakes

Mistake 1 — Not catching errors inside switchMap (error terminates the entire stream)

❌ Wrong — one failed search request terminates the valueChanges stream; subsequent searches never fire.

✅ Correct — catch errors inside the switchMap callback: switchMap(q => api.search(q).pipe(catchError(() => of([])))).

Mistake 2 — Polling while the tab is hidden (wastes battery and bandwidth)

❌ Wrong — interval(30000) polls regardless of tab visibility; wastes resources on hidden tabs.

✅ Correct — check document.visibilityState inside the poll and return EMPTY when hidden.

🧠 Test Yourself

A search pipeline uses switchMap. Request A is in-flight for “ang”. The user types “angu” before A completes. What happens to request A?