RxJS: Reactive Programming in JavaScript
RxJS (Reactive Extensions for JavaScript) is a library for handling asynchronous data streams.
The core idea is the Observable — a stream that can emit multiple values over time. You subscribe to it, transform it, and compose it with other streams.
RxJS is deeply integrated into Angular, and can be used in React and Vue as well — though the depth of integration differs significantly.
Observable Basics
Observable
An Observable represents a stream of values that arrive over time. Think of it as a Promise that can emit multiple values:
import { Observable } from 'rxjs';
const observable = new Observable(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
});Observer
An Observer defines what to do with incoming values. It has three callbacks:
next— called when a new value arriveserror— called when an error occurscomplete— called when the stream ends
Subscription
Nothing happens until you call subscribe:
const subscription = observable.subscribe({
next: value => console.log(value),
error: err => console.error(err),
complete: () => console.log('done'),
});
subscription.unsubscribe();Output:
1
2
3
doneObservable vs Promise
| Promise | Observable | |
|---|---|---|
| Values emitted | One | Zero to many |
| When it runs | Executor runs on creation | Only when subscribed |
| Cancellable | No | Yes (unsubscribe) |
| Operators | No | Yes |
Common Operators
Operators let you transform and manipulate streams in a declarative way, chained with pipe:
observable.pipe(
operator1(),
operator2(),
).subscribe(value => console.log(value));Transformation
map — transforms each value:
import { of } from 'rxjs';
import { map } from 'rxjs/operators';
of(1, 2, 3).pipe(
map(x => x * 2)
).subscribe(console.log);
// 2, 4, 6switchMap — maps to a new Observable, cancelling the previous one when a new value arrives. Classic use case: search:
fromEvent(input, 'input').pipe(
debounceTime(300),
switchMap(event => searchApi(event.target.value))
).subscribe(results => console.log(results));mergeMap — maps to a new Observable, runs all concurrently:
of(1, 2, 3).pipe(
mergeMap(id => fetchUser(id))
).subscribe(user => console.log(user));concatMap — maps to a new Observable, runs them one at a time in order:
of(1, 2, 3).pipe(
concatMap(id => fetchUser(id))
).subscribe(user => console.log(user));Filtering
filter — only lets values through that pass a condition:
of(1, 2, 3, 4, 5).pipe(
filter(x => x % 2 === 0)
).subscribe(console.log);
// 2, 4debounceTime — waits for a quiet period before emitting:
fromEvent(input, 'input').pipe(
debounceTime(300)
).subscribe(event => console.log(event.target.value));distinctUntilChanged — only emits when the value differs from the previous one:
of(1, 1, 2, 2, 3).pipe(
distinctUntilChanged()
).subscribe(console.log);
// 1, 2, 3take — takes the first n values then completes:
of(1, 2, 3, 4, 5).pipe(
take(3)
).subscribe(console.log);
// 1, 2, 3Combination
combineLatest — emits whenever any source emits, combining the latest value from each:
combineLatest([userStream$, postsStream$]).subscribe(
([user, posts]) => console.log(user, posts)
);forkJoin — waits for all Observables to complete, similar to Promise.all:
forkJoin([fetchUser(1), fetchPosts(1)]).subscribe(
([user, posts]) => console.log(user, posts)
);Subject
A Subject is both an Observable and an Observer — you can subscribe to it and push values into it manually:
import { Subject } from 'rxjs';
const subject = new Subject<number>();
subject.subscribe(value => console.log('Subscriber A:', value));
subject.subscribe(value => console.log('Subscriber B:', value));
subject.next(1);
subject.next(2);Output:
Subscriber A: 1
Subscriber B: 1
Subscriber A: 2
Subscriber B: 2BehaviorSubject
A BehaviorSubject holds a current value and immediately delivers it to new subscribers:
import { BehaviorSubject } from 'rxjs';
const subject = new BehaviorSubject(0);
subject.subscribe(value => console.log('Subscriber:', value)); // immediately gets 0
subject.next(1);
subject.next(2);Output:
Subscriber: 0
Subscriber: 1
Subscriber: 2BehaviorSubject is commonly used for sharing state between components.
Framework Integration
Angular
RxJS is a core Angular dependency — it's built into the framework at every level.
HttpClient returns Observables
@Injectable({ providedIn: 'root' })
export class UserService {
constructor(private http: HttpClient) {}
getUsers(): Observable<User[]> {
return this.http.get<User[]>('/api/users');
}
}AsyncPipe
Use the async pipe in templates to subscribe to Observables. Angular handles unsubscribing automatically:
export class UserComponent {
users$ = this.userService.getUsers();
}<li *ngFor="let user of users$ | async">{{ user.name }}</li>Unsubscribing
When subscribing manually, you must unsubscribe on destroy to avoid memory leaks. takeUntilDestroyed (Angular 16+) is the cleanest approach:
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';
export class UserComponent {
constructor(private userService: UserService) {
this.userService.getUsers()
.pipe(takeUntilDestroyed())
.subscribe(users => {
this.users = users;
});
}
}React
React has no official RxJS integration. You subscribe and unsubscribe manually inside useEffect:
import { useEffect, useState } from 'react';
import { fetchUsers } from './userService'; // returns an Observable
function UserList() {
const [users, setUsers] = useState([]);
useEffect(() => {
const subscription = fetchUsers().subscribe(users => {
setUsers(users);
});
return () => subscription.unsubscribe();
}, []);
return (
<ul>
{users.map(user => <li key={user.id}>{user.name}</li>)}
</ul>
);
}You can wrap this in a custom hook to keep things clean:
function useObservable(observable) {
const [value, setValue] = useState(null);
useEffect(() => {
const subscription = observable.subscribe(setValue);
return () => subscription.unsubscribe();
}, [observable]);
return value;
}
// usage
function UserList() {
const users = useObservable(fetchUsers());
// ...
}In practice, RxJS sees relatively little use in the React ecosystem. Most async data needs are handled with Promise + async/await or libraries like React Query. RxJS shines in React when you need complex stream handling — debouncing, switchMap, composing multiple data sources — scenarios where its operators genuinely simplify the code.
Vue
Vue also has no official RxJS integration. You can use it manually inside setup():
<template>
<ul>
<li v-for="user in users" :key="user.id">{{ user.name }}</li>
</ul>
</template>
<script setup>
import { ref, onMounted, onUnmounted } from 'vue';
import { fetchUsers } from './userService';
const users = ref([]);
let subscription;
onMounted(() => {
subscription = fetchUsers().subscribe(data => {
users.value = data;
});
});
onUnmounted(() => {
subscription?.unsubscribe();
});
</script>@vueuse/rxjs provides a more ergonomic integration:
import { useObservable } from '@vueuse/rxjs';
import { fetchUsers } from './userService';
const users = useObservable(fetchUsers());Similar to React, RxJS isn't common in Vue projects. Most state and async data handling is done with Pinia + async/await. RxJS is worth reaching for in Vue when the problem genuinely calls for reactive streams.
Summary
The core pieces of RxJS:
- Observable — a stream that emits zero to many values over time
- Operators — transform and manipulate streams, chained with
pipe - Subject — both Observable and Observer; push values in manually
- BehaviorSubject — holds a current value and replays it to new subscribers
Framework integration at a glance:
- Angular — deeply integrated;
HttpClient,AsyncPipe, and more all use RxJS directly - React / Vue — usable but requires manual setup; less common in practice, both ecosystems favor
async/awaitfor most use cases