Mastering RxJS: Build Your Own Observable, Observer, and Subject from Scratch

This article explains the core concepts of RxJS—including Observable, Observer, Subject, and various operators—by walking through their definitions, basic usage, static creation methods, and detailed implementation with JavaScript code examples, while comparing unicast and multicast behaviors.

ELab Team
ELab Team
ELab Team
Mastering RxJS: Build Your Own Observable, Observer, and Subject from Scratch

Basic Concepts

JavaScript event library using Observable for asynchronous event management.

Reactive programming based on the observer pattern, where producers actively push multiple data items to subscribed consumers.

Pure functions guarantee isolated application state and data purity.

Chainable subscription via Observable, Subject, Observer and operators ensures data flow within the Observable.

Core Modules and Basic Principles

Below we implement a simple RxJS to illustrate the core principles.

Observable

Concept: an observable object, a collection of future values or events.

Basic Usage

// create observable
let observable = new Observable(function publish(observer) {
  observer.next("hello");
  var id = setTimeout(() => {
    observer.next("world");
    observer.complete();
  }, 1000);
});

// subscribe observable
observable.subscribe({
  next: value => console.log(value),
  error: err => console.log(err),
  complete: () => console.log("done")
});
// output: hello -> world -> done

Implementation Principle

Observable can execute synchronous or asynchronous tasks and push data to an observer. Implementing its core functionality requires two steps:

Creation: the Observable sets a publish method that receives an observer and calls the observer's callbacks to deliver values.

Subscription: the publish method runs when the Observable is subscribed, making the Observable lazy and ensuring each observer gets an independent execution.

class Observable {
  constructor(publishFn) {
    this.publish = publishFn;
  }
  subscribe(observer) {
    this.publish(observer);
    return observer;
  }
}

Static Construction Methods

Observable provides static factories such as of, from, fromEvent, and interval for common publishing patterns.

// interval example – emit an incrementing number every 200 ms
const observable = Observable.interval(200);
observable.subscribe(value => console.log(value)); // 0 → 1 → 2 → …

// fromEvent example – listen to document clicks and emit the event object
const observable = Observable.fromEvent(document, "click");
observable.subscribe(event => console.log(event));

Observer

Concept: a collection of callback functions that know how to listen to values provided by an Observable.

Implementation Principle

An observer must implement next, error, and complete callbacks; the simplest observer is an object containing these methods.

A wrapper class can maintain subscription state via an isStopped flag and expose an onUnsubscribe hook.

The unsubscribe method sets isStopped to true, stops data emission, and invokes the provided unsubscribe callback.

class Observer {
  isStopped = false;
  unsubscribeCb;
  constructor(next, error, complete) {
    this._next = next || (() => {});
    this._error = error || (() => {});
    this._complete = complete || (() => {});
  }
  next(value) { if (!this.isStopped) this._next(value); }
  error(err) { if (!this.isStopped) { this._error(err); this.unsubscribe(); } }
  complete() { if (!this.isStopped) { this._complete(); this.unsubscribe(); } }
  onUnsubscribe(cb) { this.unsubscribeCb = cb; }
  unsubscribe() { this.isStopped = true; this.unsubscribeCb && this.unsubscribeCb(); }
}

With this Observer class, the Observable implementation can be enhanced to accept either an observer object or separate callbacks.

class Observable {
  constructor(publishFn) { this.publish = publishFn; }
  subscribe(observerOrNext, error, complete) {
    let observer;
    if (observerOrNext instanceof Observer || observerOrNext instanceof Subject) {
      observer = observerOrNext;
    } else if (typeof observerOrNext === "function") {
      observer = new Observer(observerOrNext, error, complete);
    } else {
      observer = new Observer(observerOrNext.next, observerOrNext.error, observerOrNext.complete);
    }
    const unsubscribeCb = this.publish(observer);
    observer.onUnsubscribe(unsubscribeCb);
    return observer;
  }
}

let observable = new Observable(function publish(observer) {
  var id = setTimeout(() => {
    observer.next("helloworld");
    observer.complete();
  }, 1000);
  return () => { console.log("clear"); clearInterval(id); };
});

const observer = observable.subscribe(value => console.log(value));
setTimeout(() => observer.unsubscribe(), 2000);
// output: helloworld -> done -> clear

Subject

Concept: similar to an EventEmitter; the sole way to multicast values or events to multiple observers.

Observable is unicast—each observer triggers a separate execution of the publish function. Subject acts as both an Observable and an Observer, allowing multicast behavior.

// unicast observable example
const observable = Observable.interval(200).pipe(take(6));
const observerA = new Observer(x => console.log(`A next ${x}`));
const observerB = new Observer(x => console.log(`B next ${x}`));
observable.subscribe(observerA);
setTimeout(() => observable.subscribe(observerB), 500);
// A receives 0‑5, B receives 0‑5 after its later subscription

// multicast using Subject
const subject = new Subject();
observable.subscribe(subject);
subject.subscribe(observerA);
setTimeout(() => subject.subscribe(observerB), 500);
// A receives 0‑5, B receives 2‑5 (missed first two values)

Implementation Principle

Subject extends Observable and implements the Observer callbacks ( next, error, complete).

It maintains a subscribers array; when an observer subscribes, it is pushed into this array.

When the source Observable pushes data, Subject forwards the data to every stored observer.

class Subject extends Observable {
  subscribers = [];
  isStopped = false;
  publish(observer) {
    if (this.isStopped) observer.complete();
    this.subscribers.push(observer);
  }
  next(value) {
    if (this.isStopped) return;
    this.subscribers.forEach(observer => observer.next(value));
  }
  error(err) {
    this.subscribers.forEach(observer => observer.error(err));
    this.isStopped = true;
    this.subscribers = [];
  }
  complete() {
    this.subscribers.forEach(observer => observer.complete());
    this.isStopped = true;
    this.subscribers = [];
  }
}

BehaviorSubject

Extends Subject and holds the latest value ( lastValue). When a new observer subscribes, it immediately receives this latest value, preventing loss due to late subscription.

class BehaviorSubject extends Subject {
  constructor(value) { super(); this.lastValue = value; }
  publish(observer) {
    if (!observer.isStopped) observer.next(this.lastValue);
    super.publish(observer);
  }
  next(value) { this.lastValue = value; super.next(value); }
}

ReplaySubject

Caches a configurable number of recent values (or values within a time window) and re‑emits them to new subscribers.

class ReplaySubject extends Subject {
  constructor(bufferSize, windowSize) {
    super();
    this.bufferSize = Math.max(1, bufferSize);
    this.windowSize = windowSize || 0;
    this.events = [];
  }
  getEvents() {
    let spliceIndex = 0;
    const len = this.events.length;
    if (this.windowSize > 0) {
      const beginTime = Date.now() - this.windowSize;
      while (spliceIndex < len && this.events[spliceIndex][0] <= beginTime) spliceIndex++;
    }
    spliceIndex = Math.max(spliceIndex, len - this.bufferSize);
    if (spliceIndex > 0) this.events.splice(0, spliceIndex);
  }
  publish(observer) {
    this.getEvents();
    this.events.forEach(event => !observer.isStopped && observer.next(event[1]));
    super.publish(observer);
  }
  next(value) {
    this.events.push([Date.now(), value]);
    this.getEvents();
    super.next(value);
  }
}

AsyncSubject

Only emits the final value when the source completes.

class AsyncSubject extends Subject {
  hasNext = false;
  hasComplete = false;
  value;
  publish(observer) {
    if (this.hasComplete && this.hasNext) observer.next(this.value);
    super.publish(observer);
  }
  next(value) { if (!this.hasComplete) { this.value = value; this.hasNext = true; } }
  error(err) { if (!this.hasComplete) super.error(err); }
  complete() {
    this.hasComplete = true;
    if (this.hasNext) super.next(this.value);
    super.complete();
  }
}

Comparison

Observing the data flow for Observer A (synchronous subscription) and Observer B (asynchronous subscription after 500 ms) reveals how unicast Observable, multicast Subject, and the various Subject subclasses differ in the values they receive.

Operator

Pure functions such as map , filter , concat , flatMap are used in a functional‑programming style to transform streams.

Basic Usage

The pipe method accepts operator functions to transform, filter, or otherwise process emitted values.

const observable = Observable.interval(200).pipe(
  take(6),
  map(item => item * 2)
);
observable.subscribe(value => console.log(value)); // 0 → 2 → 4 → 6 → 8 → 10

Implementation Principle

Operators like map return an operation function that receives a source Observable and returns a new Observable via source.lift. pipe reduces the list of operation functions, chaining them together.

When the resulting Observable is subscribed, the operator’s call method wraps the original observer, applying the transformation before forwarding values.

function map(mapFn) {
  return function mapOperation(source) {
    return source.lift(new MapOperator(mapFn));
  };
}
class MapOperator {
  constructor(mapFn) { this.mapFn = mapFn; }
  call(observer, source) {
    return source.subscribe(new MapObserver(observer, this.mapFn));
  }
}
class MapObserver extends Observer {
  constructor(destination, mapFn) { super(); this.destination = destination; this.mapFn = mapFn; }
  next(value) { this.destination.next(this.mapFn(value)); }
  complete() { this.destination.complete(); }
}

takeUntil Example

takeUntil

stops the source Observable when a notifier Observable emits or completes.

const notifier = Observable.fromEvent(document, "click");
const observable = Observable.interval(1000).pipe(takeUntil(notifier));

The implementation creates a notifier observer that, upon receiving a value or completion, signals the outer observer to complete.

function takeUntil(notifier) {
  return function takeUntilOperation(source) {
    return source.lift(new TakeUntilOperator(notifier));
  };
}
class TakeUntilOperator {
  constructor(notifier) { this.notifier = notifier; }
  call(observer, source) {
    const outer = new TakeUntilObserver(observer);
    const notifierObs = new NotifierObserver(outer);
    this.notifier.subscribe(notifierObs);
    if (!outer.seenValue) return source.subscribe(outer);
  }
}
class NotifierObserver extends Observer {
  constructor(outer) { super(); this.outer = outer; }
  next(value) { this.outer.notifyNext(value); }
  error(err) { this.outer.notifyError(err); this.unsubscribe(); }
  complete() { this.outer.notifyComplete(); this.unsubscribe(); }
}
class TakeUntilObserver extends Observer {
  constructor(dest) { super(); this.destination = dest; this.seenValue = false; }
  notifyNext() { this.seenValue = true; this.destination.complete(); }
  notifyComplete() { this.seenValue = true; this.destination.complete(); }
  next(value) { if (!this.seenValue) this.destination.next(value); }
}

The article concludes with references to the RxJS Chinese documentation.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

JavaScriptreactive-programmingrxjsObserverobservableSubject
ELab Team
Written by

ELab Team

Sharing fresh technical insights

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.