Master RxJS: Build Your Own Observable, Observer, and Subject from Scratch
This article explains the core concepts of RxJS—including Observable, Observer, Subject, and various operators—by walking through their fundamental principles, implementation details, and practical code examples that illustrate unicast and multicast data flows in reactive JavaScript programming.
Fundamental Concepts
JS event library that uses Observable for asynchronous event management.
Reactive programming based on the observer pattern, where producers push data to subscribed consumers.
Pure functions ensure isolated application state and data purity.
Observable, Subject, and Observer enable chainable subscriptions and data transformation via operators.
Core Modules and Basic Principles
Below we implement a simple RxJS to illustrate the core principles.
Observable
Concept: an observable object, a collection of future values or events that can be subscribed to.
Basic Usage
// create observable
let observable = new Observable(function publish(observer) {
observer.next("hello");
var id = setTimeout(() => {
observer.next("world");
observer.complete();
}, 1000);
});
// subscribe observable
observable.subscribe({
next: value => console.log(value),
error: err => console.log(err),
complete: () => console.log("done")
});
// output: hello -> world -> doneImplementation Principle
Based on the basic usage, an Observable can perform synchronous or asynchronous tasks and push data to observers. Implementing the core functionality requires two steps:
Create: as a producer, the Observable defines a publish method that receives an observer and calls its callbacks to deliver values.
Subscribe: the publish method runs when subscribe is called, making Observable a lazy data source, independent for each observer.
class Observable {
constructor(publishFn) {
this.publish = publishFn;
}
subscribe(observer) {
this.publish(observer);
return observer;
}
}Static Factory Methods
To simplify creation of common publish tasks, Observable provides static methods such as of, from, fromEvent, interval, etc.
// every 200ms emit an incrementing number starting from 0
const observable = Observable.interval(200);
observable.subscribe(value => console.log(value));
// output: 0 -> 1 -> 2 -> ...
// listen to document click events and emit the event object
const observable = Observable.fromEvent(document, "click");
observable.subscribe(event => console.log(event));Observer
Concept: an observer is a collection of callback functions that knows how to listen to values provided by an Observable.
Implementation Principle
An observer must implement next, error, and complete callbacks; the simplest observer is an object containing these methods.
A class can encapsulate observer state with an isStopped flag and expose an unsubscribe method that triggers a provided unsubscribeCb.
Calling unsubscribe sets isStopped to true, stops data push, and runs the cleanup callback.
class Observer {
isStopped = false;
unsubscribeCb;
constructor(next, error, complete) {
this._next = next || (() => {});
this._error = error || (() => {});
this._complete = complete || (() => {});
}
next(value) { if (!this.isStopped) this._next(value); }
error(err) { if (!this.isStopped) { this._error(err); this.unsubscribe(); } }
complete() { if (!this.isStopped) { this._complete(); this.unsubscribe(); } }
onUnsubscribe(cb) { this.unsubscribeCb = cb; }
unsubscribe() { this.isStopped = true; this.unsubscribeCb && this.unsubscribeCb(); }
}With the Observer class, Observable can be enhanced to accept either an observer object or callback functions.
class Observable {
constructor(publishFn) { this.publish = publishFn; }
subscribe(observerOrNext, error, complete) {
let observer;
if (observerOrNext instanceof Observer || observerOrNext instanceof Subject) {
observer = observerOrNext;
} else if (typeof observerOrNext === "function") {
observer = new Observer(observerOrNext, error, complete);
} else {
observer = new Observer(observerOrNext.next, observerOrNext.error, observerOrNext.complete);
}
const unsubscribeCb = this.publish(observer);
observer.onUnsubscribe(unsubscribeCb);
return observer;
}
}
let observable = new Observable(function publish(observer) {
var id = setTimeout(() => {
observer.next("helloworld");
observer.complete();
}, 1000);
return () => { console.log("clear"); clearInterval(id); };
});
const observer = observable.subscribe(value => console.log(value));
setTimeout(() => observer.unsubscribe(), 2000);
// output: helloworld -> done -> clearSubject
Concept: a Subject acts like an EventEmitter and is the sole way to multicast values or events to multiple observers.
Observable is unicast (each observer gets its own execution). A Subject bridges Observable and Observer, subscribing to an Observable and then distributing the data to multiple observers (multicast).
Implementation Principle
Subject extends Observable and implements Observer callbacks (next/error/complete).
It maintains a subscribers array; when an observer subscribes, it is added to this array.
When the source Observable pushes data, Subject forwards it to every subscriber.
class Subject extends Observable {
subscribers = [];
isStopped = false;
publish(observer) {
if (this.isStopped) observer.complete();
this.subscribers.push(observer);
}
next(value) {
if (this.isStopped) return;
this.subscribers.forEach(observer => observer.next(value));
}
error(err) {
this.subscribers.forEach(observer => observer.error(err));
this.isStopped = true;
this.subscribers = [];
}
complete() {
this.subscribers.forEach(observer => observer.complete());
this.isStopped = true;
this.subscribers = [];
}
}BehaviorSubject
Extends Subject, stores the latest value ( lastValue) and immediately emits it to new subscribers, preventing loss of state for late subscribers.
class BehaviorSubject extends Subject {
constructor(value) { super(); this.lastValue = value; }
publish(observer) {
if (!observer.isStopped) observer.next(this.lastValue);
super.publish(observer);
}
next(value) { this.lastValue = value; super.next(value); }
}ReplaySubject
Similar to BehaviorSubject but caches multiple recent values based on bufferSize and optional windowSize. When a new observer subscribes, it receives all cached values.
class ReplaySubject extends Subject {
constructor(bufferSize, windowSize) {
super();
this.bufferSize = Math.max(1, bufferSize);
this.windowSize = windowSize || 0;
this.events = [];
}
getEvents() {
let spliceIndex = 0;
const len = this.events.length;
if (this.windowSize > 0) {
const beginTime = Date.now() - this.windowSize;
while (spliceIndex < len && this.events[spliceIndex][0] <= beginTime) spliceIndex++;
}
spliceIndex = Math.max(spliceIndex, len - this.bufferSize);
if (spliceIndex > 0) this.events.splice(0, spliceIndex);
}
publish(observer) {
this.getEvents();
this.events.forEach(event => !observer.isStopped && observer.next(event[1]));
super.publish(observer);
}
next(value) {
this.events.push([Date.now(), value]);
this.getEvents();
super.next(value);
}
}AsyncSubject
Only emits the final value when the source completes.
class AsyncSubject extends Subject {
hasNext = false;
hasComplete = false;
value;
publish(observer) {
if (this.hasComplete && this.hasNext) observer.next(this.value);
super.publish(observer);
}
next(value) { if (!this.hasComplete) { this.value = value; this.hasNext = true; } }
error(err) { if (!this.hasComplete) super.error(err); }
complete() {
this.hasComplete = true;
if (this.hasNext) super.next(this.value);
super.complete();
}
}Operator
Pure functions such as map , filter , concat , flatMap are used to transform streams in a functional programming style.
Basic Usage
Operators are applied via the pipe method, which composes multiple operator functions.
const observable = Observable.interval(200).pipe(
take(6),
map(item => item * 2)
);
observable.subscribe(value => console.log(value));
// output: 0 -> 2 -> 4 -> 6 -> 8 -> 10Implementation Principle
An operator like map returns an operation function that receives a source Observable and returns a new Observable via source.lift. pipe reduces the list of operation functions, chaining them to produce a final Observable.
When the resulting Observable is subscribed, the operator’s call method wraps the original observer, intercepting its next calls.
function map(mapFn) {
return function mapOperation(source) {
return source.lift(new MapOperator(mapFn));
};
}
class MapOperator {
constructor(mapFn) { this.mapFn = mapFn; }
call(observer, source) {
return source.subscribe(new MapObserver(observer, this.mapFn));
}
}
class MapObserver extends Observer {
constructor(destination, mapFn) { super(); this.destination = destination; this.mapFn = mapFn; }
next(value) { const result = this.mapFn(value); this.destination.next(result); }
complete() { this.destination.complete(); }
}takeUntil Example
Stops an interval when a notifier Observable emits (e.g., a click event).
const notifier = Observable.fromEvent(document, "click");
const observable = Observable.interval(1000).pipe(takeUntil(notifier));The implementation adds a NotifierObserver that listens to the notifier and notifies the outer observer to complete.
Reference
RxJS Chinese Documentation
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
