Master RxJS: Build Your Own Observable, Observer, and Subject from Scratch

This article explains the core concepts of RxJS—including Observable, Observer, Subject, and various operators—by walking through their fundamental principles, implementation details, and practical code examples that illustrate unicast and multicast data flows in reactive JavaScript programming.

ELab Team
ELab Team
ELab Team
Master RxJS: Build Your Own Observable, Observer, and Subject from Scratch

Fundamental Concepts

JS event library that uses Observable for asynchronous event management.

Reactive programming based on the observer pattern, where producers push data to subscribed consumers.

Pure functions ensure isolated application state and data purity.

Observable, Subject, and Observer enable chainable subscriptions and data transformation via operators.

Core Modules and Basic Principles

Below we implement a simple RxJS to illustrate the core principles.

Observable

Concept: an observable object, a collection of future values or events that can be subscribed to.

Basic Usage

// create observable
let observable = new Observable(function publish(observer) {
  observer.next("hello");
  var id = setTimeout(() => {
    observer.next("world");
    observer.complete();
  }, 1000);
});

// subscribe observable
observable.subscribe({
  next: value => console.log(value),
  error: err => console.log(err),
  complete: () => console.log("done")
});
// output: hello -> world -> done

Implementation Principle

Based on the basic usage, an Observable can perform synchronous or asynchronous tasks and push data to observers. Implementing the core functionality requires two steps:

Create: as a producer, the Observable defines a publish method that receives an observer and calls its callbacks to deliver values.

Subscribe: the publish method runs when subscribe is called, making Observable a lazy data source, independent for each observer.

class Observable {
  constructor(publishFn) {
    this.publish = publishFn;
  }
  subscribe(observer) {
    this.publish(observer);
    return observer;
  }
}

Static Factory Methods

To simplify creation of common publish tasks, Observable provides static methods such as of, from, fromEvent, interval, etc.

// every 200ms emit an incrementing number starting from 0
const observable = Observable.interval(200);
observable.subscribe(value => console.log(value));
// output: 0 -> 1 -> 2 -> ...

// listen to document click events and emit the event object
const observable = Observable.fromEvent(document, "click");
observable.subscribe(event => console.log(event));

Observer

Concept: an observer is a collection of callback functions that knows how to listen to values provided by an Observable.

Implementation Principle

An observer must implement next, error, and complete callbacks; the simplest observer is an object containing these methods.

A class can encapsulate observer state with an isStopped flag and expose an unsubscribe method that triggers a provided unsubscribeCb.

Calling unsubscribe sets isStopped to true, stops data push, and runs the cleanup callback.

class Observer {
  isStopped = false;
  unsubscribeCb;
  constructor(next, error, complete) {
    this._next = next || (() => {});
    this._error = error || (() => {});
    this._complete = complete || (() => {});
  }
  next(value) { if (!this.isStopped) this._next(value); }
  error(err) { if (!this.isStopped) { this._error(err); this.unsubscribe(); } }
  complete() { if (!this.isStopped) { this._complete(); this.unsubscribe(); } }
  onUnsubscribe(cb) { this.unsubscribeCb = cb; }
  unsubscribe() { this.isStopped = true; this.unsubscribeCb && this.unsubscribeCb(); }
}

With the Observer class, Observable can be enhanced to accept either an observer object or callback functions.

class Observable {
  constructor(publishFn) { this.publish = publishFn; }
  subscribe(observerOrNext, error, complete) {
    let observer;
    if (observerOrNext instanceof Observer || observerOrNext instanceof Subject) {
      observer = observerOrNext;
    } else if (typeof observerOrNext === "function") {
      observer = new Observer(observerOrNext, error, complete);
    } else {
      observer = new Observer(observerOrNext.next, observerOrNext.error, observerOrNext.complete);
    }
    const unsubscribeCb = this.publish(observer);
    observer.onUnsubscribe(unsubscribeCb);
    return observer;
  }
}

let observable = new Observable(function publish(observer) {
  var id = setTimeout(() => {
    observer.next("helloworld");
    observer.complete();
  }, 1000);
  return () => { console.log("clear"); clearInterval(id); };
});
const observer = observable.subscribe(value => console.log(value));
setTimeout(() => observer.unsubscribe(), 2000);
// output: helloworld -> done -> clear

Subject

Concept: a Subject acts like an EventEmitter and is the sole way to multicast values or events to multiple observers.

Observable is unicast (each observer gets its own execution). A Subject bridges Observable and Observer, subscribing to an Observable and then distributing the data to multiple observers (multicast).

Subject diagram
Subject diagram

Implementation Principle

Subject extends Observable and implements Observer callbacks (next/error/complete).

It maintains a subscribers array; when an observer subscribes, it is added to this array.

When the source Observable pushes data, Subject forwards it to every subscriber.

class Subject extends Observable {
  subscribers = [];
  isStopped = false;
  publish(observer) {
    if (this.isStopped) observer.complete();
    this.subscribers.push(observer);
  }
  next(value) {
    if (this.isStopped) return;
    this.subscribers.forEach(observer => observer.next(value));
  }
  error(err) {
    this.subscribers.forEach(observer => observer.error(err));
    this.isStopped = true;
    this.subscribers = [];
  }
  complete() {
    this.subscribers.forEach(observer => observer.complete());
    this.isStopped = true;
    this.subscribers = [];
  }
}

BehaviorSubject

Extends Subject, stores the latest value ( lastValue) and immediately emits it to new subscribers, preventing loss of state for late subscribers.

class BehaviorSubject extends Subject {
  constructor(value) { super(); this.lastValue = value; }
  publish(observer) {
    if (!observer.isStopped) observer.next(this.lastValue);
    super.publish(observer);
  }
  next(value) { this.lastValue = value; super.next(value); }
}

ReplaySubject

Similar to BehaviorSubject but caches multiple recent values based on bufferSize and optional windowSize. When a new observer subscribes, it receives all cached values.

class ReplaySubject extends Subject {
  constructor(bufferSize, windowSize) {
    super();
    this.bufferSize = Math.max(1, bufferSize);
    this.windowSize = windowSize || 0;
    this.events = [];
  }
  getEvents() {
    let spliceIndex = 0;
    const len = this.events.length;
    if (this.windowSize > 0) {
      const beginTime = Date.now() - this.windowSize;
      while (spliceIndex < len && this.events[spliceIndex][0] <= beginTime) spliceIndex++;
    }
    spliceIndex = Math.max(spliceIndex, len - this.bufferSize);
    if (spliceIndex > 0) this.events.splice(0, spliceIndex);
  }
  publish(observer) {
    this.getEvents();
    this.events.forEach(event => !observer.isStopped && observer.next(event[1]));
    super.publish(observer);
  }
  next(value) {
    this.events.push([Date.now(), value]);
    this.getEvents();
    super.next(value);
  }
}

AsyncSubject

Only emits the final value when the source completes.

class AsyncSubject extends Subject {
  hasNext = false;
  hasComplete = false;
  value;
  publish(observer) {
    if (this.hasComplete && this.hasNext) observer.next(this.value);
    super.publish(observer);
  }
  next(value) { if (!this.hasComplete) { this.value = value; this.hasNext = true; } }
  error(err) { if (!this.hasComplete) super.error(err); }
  complete() {
    this.hasComplete = true;
    if (this.hasNext) super.next(this.value);
    super.complete();
  }
}

Operator

Pure functions such as map , filter , concat , flatMap are used to transform streams in a functional programming style.

Basic Usage

Operators are applied via the pipe method, which composes multiple operator functions.

const observable = Observable.interval(200).pipe(
  take(6),
  map(item => item * 2)
);
observable.subscribe(value => console.log(value));
// output: 0 -> 2 -> 4 -> 6 -> 8 -> 10

Implementation Principle

An operator like map returns an operation function that receives a source Observable and returns a new Observable via source.lift. pipe reduces the list of operation functions, chaining them to produce a final Observable.

When the resulting Observable is subscribed, the operator’s call method wraps the original observer, intercepting its next calls.

function map(mapFn) {
  return function mapOperation(source) {
    return source.lift(new MapOperator(mapFn));
  };
}
class MapOperator {
  constructor(mapFn) { this.mapFn = mapFn; }
  call(observer, source) {
    return source.subscribe(new MapObserver(observer, this.mapFn));
  }
}
class MapObserver extends Observer {
  constructor(destination, mapFn) { super(); this.destination = destination; this.mapFn = mapFn; }
  next(value) { const result = this.mapFn(value); this.destination.next(result); }
  complete() { this.destination.complete(); }
}

takeUntil Example

Stops an interval when a notifier Observable emits (e.g., a click event).

const notifier = Observable.fromEvent(document, "click");
const observable = Observable.interval(1000).pipe(takeUntil(notifier));

The implementation adds a NotifierObserver that listens to the notifier and notifies the outer observer to complete.

Reference

RxJS Chinese Documentation

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

JavaScriptreactive-programmingrxjsObserverobservableSubject
ELab Team
Written by

ELab Team

Sharing fresh technical insights

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.