Mastering RxJS: Build Your Own Observable, Observer, and Subject from Scratch
This article explains the core concepts of RxJS—including Observable, Observer, Subject, and various operators—by walking through their definitions, basic usage, static creation methods, and detailed implementation with JavaScript code examples, while comparing unicast and multicast behaviors.
Basic Concepts
JavaScript event library using Observable for asynchronous event management.
Reactive programming based on the observer pattern, where producers actively push multiple data items to subscribed consumers.
Pure functions guarantee isolated application state and data purity.
Chainable subscription via Observable, Subject, Observer and operators ensures data flow within the Observable.
Core Modules and Basic Principles
Below we implement a simple RxJS to illustrate the core principles.
Observable
Concept: an observable object, a collection of future values or events.
Basic Usage
// create observable
let observable = new Observable(function publish(observer) {
observer.next("hello");
var id = setTimeout(() => {
observer.next("world");
observer.complete();
}, 1000);
});
// subscribe observable
observable.subscribe({
next: value => console.log(value),
error: err => console.log(err),
complete: () => console.log("done")
});
// output: hello -> world -> doneImplementation Principle
Observable can execute synchronous or asynchronous tasks and push data to an observer. Implementing its core functionality requires two steps:
Creation: the Observable sets a publish method that receives an observer and calls the observer's callbacks to deliver values.
Subscription: the publish method runs when the Observable is subscribed, making the Observable lazy and ensuring each observer gets an independent execution.
class Observable {
constructor(publishFn) {
this.publish = publishFn;
}
subscribe(observer) {
this.publish(observer);
return observer;
}
}Static Construction Methods
Observable provides static factories such as of, from, fromEvent, and interval for common publishing patterns.
// interval example – emit an incrementing number every 200 ms
const observable = Observable.interval(200);
observable.subscribe(value => console.log(value)); // 0 → 1 → 2 → …
// fromEvent example – listen to document clicks and emit the event object
const observable = Observable.fromEvent(document, "click");
observable.subscribe(event => console.log(event));Observer
Concept: a collection of callback functions that know how to listen to values provided by an Observable.
Implementation Principle
An observer must implement next, error, and complete callbacks; the simplest observer is an object containing these methods.
A wrapper class can maintain subscription state via an isStopped flag and expose an onUnsubscribe hook.
The unsubscribe method sets isStopped to true, stops data emission, and invokes the provided unsubscribe callback.
class Observer {
isStopped = false;
unsubscribeCb;
constructor(next, error, complete) {
this._next = next || (() => {});
this._error = error || (() => {});
this._complete = complete || (() => {});
}
next(value) { if (!this.isStopped) this._next(value); }
error(err) { if (!this.isStopped) { this._error(err); this.unsubscribe(); } }
complete() { if (!this.isStopped) { this._complete(); this.unsubscribe(); } }
onUnsubscribe(cb) { this.unsubscribeCb = cb; }
unsubscribe() { this.isStopped = true; this.unsubscribeCb && this.unsubscribeCb(); }
}With this Observer class, the Observable implementation can be enhanced to accept either an observer object or separate callbacks.
class Observable {
constructor(publishFn) { this.publish = publishFn; }
subscribe(observerOrNext, error, complete) {
let observer;
if (observerOrNext instanceof Observer || observerOrNext instanceof Subject) {
observer = observerOrNext;
} else if (typeof observerOrNext === "function") {
observer = new Observer(observerOrNext, error, complete);
} else {
observer = new Observer(observerOrNext.next, observerOrNext.error, observerOrNext.complete);
}
const unsubscribeCb = this.publish(observer);
observer.onUnsubscribe(unsubscribeCb);
return observer;
}
}
let observable = new Observable(function publish(observer) {
var id = setTimeout(() => {
observer.next("helloworld");
observer.complete();
}, 1000);
return () => { console.log("clear"); clearInterval(id); };
});
const observer = observable.subscribe(value => console.log(value));
setTimeout(() => observer.unsubscribe(), 2000);
// output: helloworld -> done -> clearSubject
Concept: similar to an EventEmitter; the sole way to multicast values or events to multiple observers.
Observable is unicast—each observer triggers a separate execution of the publish function. Subject acts as both an Observable and an Observer, allowing multicast behavior.
// unicast observable example
const observable = Observable.interval(200).pipe(take(6));
const observerA = new Observer(x => console.log(`A next ${x}`));
const observerB = new Observer(x => console.log(`B next ${x}`));
observable.subscribe(observerA);
setTimeout(() => observable.subscribe(observerB), 500);
// A receives 0‑5, B receives 0‑5 after its later subscription
// multicast using Subject
const subject = new Subject();
observable.subscribe(subject);
subject.subscribe(observerA);
setTimeout(() => subject.subscribe(observerB), 500);
// A receives 0‑5, B receives 2‑5 (missed first two values)Implementation Principle
Subject extends Observable and implements the Observer callbacks ( next, error, complete).
It maintains a subscribers array; when an observer subscribes, it is pushed into this array.
When the source Observable pushes data, Subject forwards the data to every stored observer.
class Subject extends Observable {
subscribers = [];
isStopped = false;
publish(observer) {
if (this.isStopped) observer.complete();
this.subscribers.push(observer);
}
next(value) {
if (this.isStopped) return;
this.subscribers.forEach(observer => observer.next(value));
}
error(err) {
this.subscribers.forEach(observer => observer.error(err));
this.isStopped = true;
this.subscribers = [];
}
complete() {
this.subscribers.forEach(observer => observer.complete());
this.isStopped = true;
this.subscribers = [];
}
}BehaviorSubject
Extends Subject and holds the latest value ( lastValue). When a new observer subscribes, it immediately receives this latest value, preventing loss due to late subscription.
class BehaviorSubject extends Subject {
constructor(value) { super(); this.lastValue = value; }
publish(observer) {
if (!observer.isStopped) observer.next(this.lastValue);
super.publish(observer);
}
next(value) { this.lastValue = value; super.next(value); }
}ReplaySubject
Caches a configurable number of recent values (or values within a time window) and re‑emits them to new subscribers.
class ReplaySubject extends Subject {
constructor(bufferSize, windowSize) {
super();
this.bufferSize = Math.max(1, bufferSize);
this.windowSize = windowSize || 0;
this.events = [];
}
getEvents() {
let spliceIndex = 0;
const len = this.events.length;
if (this.windowSize > 0) {
const beginTime = Date.now() - this.windowSize;
while (spliceIndex < len && this.events[spliceIndex][0] <= beginTime) spliceIndex++;
}
spliceIndex = Math.max(spliceIndex, len - this.bufferSize);
if (spliceIndex > 0) this.events.splice(0, spliceIndex);
}
publish(observer) {
this.getEvents();
this.events.forEach(event => !observer.isStopped && observer.next(event[1]));
super.publish(observer);
}
next(value) {
this.events.push([Date.now(), value]);
this.getEvents();
super.next(value);
}
}AsyncSubject
Only emits the final value when the source completes.
class AsyncSubject extends Subject {
hasNext = false;
hasComplete = false;
value;
publish(observer) {
if (this.hasComplete && this.hasNext) observer.next(this.value);
super.publish(observer);
}
next(value) { if (!this.hasComplete) { this.value = value; this.hasNext = true; } }
error(err) { if (!this.hasComplete) super.error(err); }
complete() {
this.hasComplete = true;
if (this.hasNext) super.next(this.value);
super.complete();
}
}Comparison
Observing the data flow for Observer A (synchronous subscription) and Observer B (asynchronous subscription after 500 ms) reveals how unicast Observable, multicast Subject, and the various Subject subclasses differ in the values they receive.
Operator
Pure functions such as map , filter , concat , flatMap are used in a functional‑programming style to transform streams.
Basic Usage
The pipe method accepts operator functions to transform, filter, or otherwise process emitted values.
const observable = Observable.interval(200).pipe(
take(6),
map(item => item * 2)
);
observable.subscribe(value => console.log(value)); // 0 → 2 → 4 → 6 → 8 → 10Implementation Principle
Operators like map return an operation function that receives a source Observable and returns a new Observable via source.lift. pipe reduces the list of operation functions, chaining them together.
When the resulting Observable is subscribed, the operator’s call method wraps the original observer, applying the transformation before forwarding values.
function map(mapFn) {
return function mapOperation(source) {
return source.lift(new MapOperator(mapFn));
};
}
class MapOperator {
constructor(mapFn) { this.mapFn = mapFn; }
call(observer, source) {
return source.subscribe(new MapObserver(observer, this.mapFn));
}
}
class MapObserver extends Observer {
constructor(destination, mapFn) { super(); this.destination = destination; this.mapFn = mapFn; }
next(value) { this.destination.next(this.mapFn(value)); }
complete() { this.destination.complete(); }
}takeUntil Example
takeUntilstops the source Observable when a notifier Observable emits or completes.
const notifier = Observable.fromEvent(document, "click");
const observable = Observable.interval(1000).pipe(takeUntil(notifier));The implementation creates a notifier observer that, upon receiving a value or completion, signals the outer observer to complete.
function takeUntil(notifier) {
return function takeUntilOperation(source) {
return source.lift(new TakeUntilOperator(notifier));
};
}
class TakeUntilOperator {
constructor(notifier) { this.notifier = notifier; }
call(observer, source) {
const outer = new TakeUntilObserver(observer);
const notifierObs = new NotifierObserver(outer);
this.notifier.subscribe(notifierObs);
if (!outer.seenValue) return source.subscribe(outer);
}
}
class NotifierObserver extends Observer {
constructor(outer) { super(); this.outer = outer; }
next(value) { this.outer.notifyNext(value); }
error(err) { this.outer.notifyError(err); this.unsubscribe(); }
complete() { this.outer.notifyComplete(); this.unsubscribe(); }
}
class TakeUntilObserver extends Observer {
constructor(dest) { super(); this.destination = dest; this.seenValue = false; }
notifyNext() { this.seenValue = true; this.destination.complete(); }
notifyComplete() { this.seenValue = true; this.destination.complete(); }
next(value) { if (!this.seenValue) this.destination.next(value); }
}The article concludes with references to the RxJS Chinese documentation.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
