Master RxJS: Functional Foundations and Reactive Programming Essentials
This article introduces RxJS, explaining its roots in functional and reactive programming, detailing core concepts such as Observables, Observers, Subscriptions, Subjects, and operators, and demonstrates practical code examples that illustrate how to manage asynchronous event streams effectively in modern front‑end development.
RxJS Introduction and Application
Why is RxJS Called Functional Reactive Programming?
Functional Programming
Functional programming allows functions to be used as arguments or return values; the main idea is to decompose complex calculations into a series of nested functions, progressing step by step until the computation is complete. Common array methods such as map and filter embody functional programming concepts.
const arr = [4,1,5,2,3];
const newArr = arr
.sort((a,b) => a-b)
.filter(value => value>2);
console.log(newArr); // [3,4,5]Another crucial characteristic of functional programming is purity (pure functions).
A pure function has no side effects; its output depends solely on its input, meaning that calling f(x) will always produce the same result.
Side effects occur when a function, besides returning a value, also influences the calling context, such as modifying global variables, parameters, or external storage.
Pure functions satisfy two conditions:
1) The execution process is completely determined by input parameters, unaffected by any external data.
2) The function does not modify any external state, such as global variables or the passed‑in argument objects.
Creating a non‑pure function that shares variables outside its scope can make application state chaotic and hard to maintain.
var count = 0;
var button = document.querySelector('button');
button.addEventListener('click',
() => console.log(`Clicked ${++count} times`)
);Using RxJS, application state can be isolated so that it neither affects nor is affected by the external environment.
import { fromEvent, scan } from 'rxjs';
var button = document.querySelector('button');
const example = fromEvent(button, 'click').pipe(
scan(count => count + 1, 0)
);
example.subscribe(count => console.log(`Clicked ${count} times`));Reactive Programming
Wiki definition: In computing, reactive programming (or responsive programming ) is a declarative programming paradigm oriented around data streams and the propagation of change. It allows easy expression of static or dynamic data flows, with the underlying computation model automatically propagating changes through the data stream.
What is a data stream?
A data stream is the path through which data propagates within a system, representing a series of events occurring over a period of time.
Anything can be a stream: variables, user input, network responses, timers, data structures, etc.
What is change propagation?
During data‑stream propagation, events may be combined, created, or filtered, mapping an old stream to a new one. Instead of polling for changes, we listen to events; after an event executes, the system automatically responds—this is change propagation.
Combining Front‑End Frameworks with RxJS
Front‑end frameworks (e.g., React) synchronize data with UI views; when data updates, the view updates.
UI = f(data);Reactive programming (e.g., RxJS) focuses on data, from source through processing to subscription and consumption.
data = g(origin data)The two are not conflicting; in some scenarios they complement each other, with the framework acting as a consumer of reactive data.
UI = f(g(origin data))What is RxJS Used For?
RxJS is a library for handling asynchronous event streams. By using observable sequences, developers can write asynchronous and event‑driven programs. Typical use cases include wrapping requests as observables and applying basic operators (map, filter, etc.) to process returned data and handle errors, treating asynchronous operations as collections.
Why use RxJS? (Excerpt from Zhihu): Consider the essence of asynchrony: asynchronous operations differ from synchronous ones by having timing . Synchronous operations can be understood as data + function . Asynchronous operations are data + function + timing . Rx extracts timing into a timeline and performs synchronous operations on that timeline, delegating timing handling to Rx operators. If your application is timing‑intensive, RxJS helps clarify complex async logic; otherwise, it may be unnecessary.
Basic Concepts for Managing Asynchronous Events in RxJS
1. Observable (Observable Object)
An observable treats a data stream as a collection of events that occur during its propagation.
Single Value
Multiple Values
Pull
Function
Iterator
Push
Promise
Observable
Pull and push describe two protocols for how a producer communicates with a consumer .
Pull system
In JavaScript, every function belongs to the pull system; the function produces data, and the consumer calls the function to obtain a single return value. An iterator allows the consumer to call iterator.next() to obtain multiple values.
Push system
In modern JavaScript, Promise is the most common push system. A promise, as a producer, delivers a resolved value to a consumer‑registered callback. In RxJS, observable also belongs to the push system and can push one or many values.
Example illustrating the difference:
Function
function foo() {
console.log('Hello');
return 'world';
}
const x = foo();
console.log(x);
const y = foo();
console.log(y); Observable
const foo = Observable.create(function (observer) {
console.log('Hello');
observer.next('world');
});
// .subscribe() is similar to calling a function
foo.subscribe(function (x) {
console.log(x);
});
foo.subscribe(function (y) {
console.log(y);
});Observable can return (push) multiple values over time, which a plain function cannot.
Function
function foo() {
return 'Hello';
return 'world'; // never executed
}
const a = foo();
console.log(a);
// console output: 'Hello' Observable
const foo = Observable.create(function (observer) {
observer.next('Hello');
observer.next('world');
});
foo.subscribe(function (x) {
console.log(x);
});
// console output: 'Hello' 'world'
// Asynchronous push example
const foo = Observable.create(function (observer) {
observer.next('Hello');
setTimeout(() => {
observer.next('rxjs');
}, 0);
observer.next('world');
});
// console output: 'Hello' 'world' 'rxjs'2. Observer (Observer)
An observer is a collection of callback functions, each corresponding to a notification type sent by an observable: next, error, and complete.
const observer = {
next: () => {}, // called on next notification
error: () => {}, // called on error notification
complete: () => {} // called on completion
};
// Subscribe with observer object
observable.subscribe(observer);
// Or pass callbacks in order
observable.subscribe(value => {}, error => {}, () => {});3. Subscription (Subscription)
A subscription is a resource‑clean‑up object representing the execution of an observable. It allows you to call unsubscribe() to release resources or cancel execution.
4. Subject (Subject)
RxJS introduces Cold Observable / Hot Observable concepts. A Subject is a special type of observable that multicasts values to multiple observers. Unlike a regular observable (unicast), a subject shares a single execution among all subscribed observers.
Four Types of Subjects
Observable
Subject
BehaviorSubject
AsyncSubject
ReplaySubject
Push values from source to observers each time
Multicast values to subscribed observers
Send the latest (current) value to new observers (requires initial value)
Send only the last value when the observable completes
Replay missed values to new observers
4.1.1 BehaviorSubject
BehaviorSubject holds a "current value" and immediately emits it to any new subscriber.
var subject = new BehaviorSubject(0); // 0 is the initial value
subject.subscribe({
next: v => console.log('observerA: ' + v)
});
subject.next(1);
subject.next(2);
subject.subscribe({
next: v => console.log('observerB: ' + v)
});
subject.next(3);
// Output:
// observerA: 0
// observerA: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 34.1.2 AsyncSubject
AsyncSubject emits only the last value when the observable completes.
var subject = new Rx.AsyncSubject();
subject.subscribe({
next: v => console.log('observerA: ' + v)
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.subscribe({
next: v => console.log('observerB: ' + v)
});
subject.next(5);
subject.complete();
// Output after completion:
// observerA: 5
// observerB: 54.1.3 ReplaySubject
ReplaySubject can buffer a number of previous values (or a time window) and replay them to new subscribers.
var subject = new ReplaySubject(3); // buffer last 3 values
subject.subscribe({
next: v => console.log('observerA: ' + v)
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.subscribe({
next: v => console.log('observerB: ' + v)
});
subject.next(5);
// Output:
// observerA: 1
// observerA: 2
// observerA: 3
// observerA: 4
// observerB: 2
// observerB: 3
// observerB: 4
// observerA: 5
// observerB: 5
// ReplaySubject can also specify a time window, e.g., new ReplaySubject(3, 500) buffers 3 values for 500 ms.5. Operators (Operators)
Operators are the fundamental building blocks that allow complex asynchronous code to be composed declaratively. An operator is a pure function; invoking it does not mutate the existing observable but creates a new observable based on the current one. In practice, an observable is rarely created directly and subscribed to; instead, a series of operators transform the data stream before it reaches the observer, much like a pipeline.
Each operator links an upstream to a downstream.
Marble Diagrams
To explain how streams evolve over time, textual descriptions are often insufficient, so marble diagrams are used to visualize the temporal behavior of operators.
References
https://cn.rx.js.org/manual/overview.html
https://rxjs-cn.github.io/learn-rxjs-operators/
[1] Creation operators: https://rxjs-cn.github.io/learn-rxjs-operators/operators/creation/
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
