Back to articles

RxJS: Reactive Programming in JavaScript

15 min
Front-endJavaScriptAngular

RxJS: Reactive Programming in JavaScript

RxJS (Reactive Extensions for JavaScript) is a library for handling asynchronous data streams.

The core idea is the Observable — a stream that can emit multiple values over time. You subscribe to it, transform it, and compose it with other streams.

RxJS is deeply integrated into Angular, and can be used in React and Vue as well — though the depth of integration differs significantly.


Observable Basics

Observable

An Observable represents a stream of values that arrive over time. Think of it as a Promise that can emit multiple values:

TypeScript
import { Observable } from 'rxjs';

const observable = new Observable(subscriber => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  subscriber.complete();
});

Observer

An Observer defines what to do with incoming values. It has three callbacks:

  • next — called when a new value arrives
  • error — called when an error occurs
  • complete — called when the stream ends

Subscription

Nothing happens until you call subscribe:

TypeScript
const subscription = observable.subscribe({
  next: value => console.log(value),
  error: err => console.error(err),
  complete: () => console.log('done'),
});

subscription.unsubscribe();

Output:

Text
1
2
3
done

Observable vs Promise

PromiseObservable
Values emittedOneZero to many
When it runsExecutor runs on creationOnly when subscribed
CancellableNoYes (unsubscribe)
OperatorsNoYes

Common Operators

Operators let you transform and manipulate streams in a declarative way, chained with pipe:

TypeScript
observable.pipe(
  operator1(),
  operator2(),
).subscribe(value => console.log(value));

Transformation

map — transforms each value:

TypeScript
import { of } from 'rxjs';
import { map } from 'rxjs/operators';

of(1, 2, 3).pipe(
  map(x => x * 2)
).subscribe(console.log);
// 2, 4, 6

switchMap — maps to a new Observable, cancelling the previous one when a new value arrives. Classic use case: search:

TypeScript
fromEvent(input, 'input').pipe(
  debounceTime(300),
  switchMap(event => searchApi(event.target.value))
).subscribe(results => console.log(results));

mergeMap — maps to a new Observable, runs all concurrently:

TypeScript
of(1, 2, 3).pipe(
  mergeMap(id => fetchUser(id))
).subscribe(user => console.log(user));

concatMap — maps to a new Observable, runs them one at a time in order:

TypeScript
of(1, 2, 3).pipe(
  concatMap(id => fetchUser(id))
).subscribe(user => console.log(user));

Filtering

filter — only lets values through that pass a condition:

TypeScript
of(1, 2, 3, 4, 5).pipe(
  filter(x => x % 2 === 0)
).subscribe(console.log);
// 2, 4

debounceTime — waits for a quiet period before emitting:

TypeScript
fromEvent(input, 'input').pipe(
  debounceTime(300)
).subscribe(event => console.log(event.target.value));

distinctUntilChanged — only emits when the value differs from the previous one:

TypeScript
of(1, 1, 2, 2, 3).pipe(
  distinctUntilChanged()
).subscribe(console.log);
// 1, 2, 3

take — takes the first n values then completes:

TypeScript
of(1, 2, 3, 4, 5).pipe(
  take(3)
).subscribe(console.log);
// 1, 2, 3

Combination

combineLatest — emits whenever any source emits, combining the latest value from each:

TypeScript
combineLatest([userStream$, postsStream$]).subscribe(
  ([user, posts]) => console.log(user, posts)
);

forkJoin — waits for all Observables to complete, similar to Promise.all:

TypeScript
forkJoin([fetchUser(1), fetchPosts(1)]).subscribe(
  ([user, posts]) => console.log(user, posts)
);

Subject

A Subject is both an Observable and an Observer — you can subscribe to it and push values into it manually:

TypeScript
import { Subject } from 'rxjs';

const subject = new Subject<number>();

subject.subscribe(value => console.log('Subscriber A:', value));
subject.subscribe(value => console.log('Subscriber B:', value));

subject.next(1);
subject.next(2);

Output:

Text
Subscriber A: 1
Subscriber B: 1
Subscriber A: 2
Subscriber B: 2

BehaviorSubject

A BehaviorSubject holds a current value and immediately delivers it to new subscribers:

TypeScript
import { BehaviorSubject } from 'rxjs';

const subject = new BehaviorSubject(0);

subject.subscribe(value => console.log('Subscriber:', value)); // immediately gets 0

subject.next(1);
subject.next(2);

Output:

Text
Subscriber: 0
Subscriber: 1
Subscriber: 2

BehaviorSubject is commonly used for sharing state between components.


Framework Integration

Angular

RxJS is a core Angular dependency — it's built into the framework at every level.

HttpClient returns Observables

TypeScript
@Injectable({ providedIn: 'root' })
export class UserService {
  constructor(private http: HttpClient) {}

  getUsers(): Observable<User[]> {
    return this.http.get<User[]>('/api/users');
  }
}

AsyncPipe

Use the async pipe in templates to subscribe to Observables. Angular handles unsubscribing automatically:

TypeScript
export class UserComponent {
  users$ = this.userService.getUsers();
}
HTML
<li *ngFor="let user of users$ | async">{{ user.name }}</li>

Unsubscribing

When subscribing manually, you must unsubscribe on destroy to avoid memory leaks. takeUntilDestroyed (Angular 16+) is the cleanest approach:

TypeScript
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';

export class UserComponent {
  constructor(private userService: UserService) {
    this.userService.getUsers()
      .pipe(takeUntilDestroyed())
      .subscribe(users => {
        this.users = users;
      });
  }
}

React

React has no official RxJS integration. You subscribe and unsubscribe manually inside useEffect:

JSX
import { useEffect, useState } from 'react';
import { fetchUsers } from './userService'; // returns an Observable

function UserList() {
  const [users, setUsers] = useState([]);

  useEffect(() => {
    const subscription = fetchUsers().subscribe(users => {
      setUsers(users);
    });

    return () => subscription.unsubscribe();
  }, []);

  return (
    <ul>
      {users.map(user => <li key={user.id}>{user.name}</li>)}
    </ul>
  );
}

You can wrap this in a custom hook to keep things clean:

JavaScript
function useObservable(observable) {
  const [value, setValue] = useState(null);

  useEffect(() => {
    const subscription = observable.subscribe(setValue);
    return () => subscription.unsubscribe();
  }, [observable]);

  return value;
}

// usage
function UserList() {
  const users = useObservable(fetchUsers());
  // ...
}

In practice, RxJS sees relatively little use in the React ecosystem. Most async data needs are handled with Promise + async/await or libraries like React Query. RxJS shines in React when you need complex stream handling — debouncing, switchMap, composing multiple data sources — scenarios where its operators genuinely simplify the code.


Vue

Vue also has no official RxJS integration. You can use it manually inside setup():

vue
<template>
  <ul>
    <li v-for="user in users" :key="user.id">{{ user.name }}</li>
  </ul>
</template>

<script setup>
import { ref, onMounted, onUnmounted } from 'vue';
import { fetchUsers } from './userService';

const users = ref([]);
let subscription;

onMounted(() => {
  subscription = fetchUsers().subscribe(data => {
    users.value = data;
  });
});

onUnmounted(() => {
  subscription?.unsubscribe();
});
</script>

@vueuse/rxjs provides a more ergonomic integration:

JavaScript
import { useObservable } from '@vueuse/rxjs';
import { fetchUsers } from './userService';

const users = useObservable(fetchUsers());

Similar to React, RxJS isn't common in Vue projects. Most state and async data handling is done with Pinia + async/await. RxJS is worth reaching for in Vue when the problem genuinely calls for reactive streams.


Summary

The core pieces of RxJS:

  • Observable — a stream that emits zero to many values over time
  • Operators — transform and manipulate streams, chained with pipe
  • Subject — both Observable and Observer; push values in manually
  • BehaviorSubject — holds a current value and replays it to new subscribers

Framework integration at a glance:

  • Angular — deeply integrated; HttpClient, AsyncPipe, and more all use RxJS directly
  • React / Vue — usable but requires manual setup; less common in practice, both ecosystems favor async/await for most use cases