Fundamentals 17 min read

Detailed Source‑Code Analysis of RxJava Observable Subscription Flow

This article provides a comprehensive source‑code walkthrough of RxJava’s Observable subscription flow, detailing object creation, reverse subscription, thread scheduling, and operator handling with step‑by‑step code analysis.

Sohu Tech Products
Sohu Tech Products
Sohu Tech Products
Detailed Source‑Code Analysis of RxJava Observable Subscription Flow

In this article we dissect the complete RxJava Observable subscription process, covering three main stages: object creation, reverse subscription, and task execution. The analysis is based on the sample code that creates an Observable, applies map, doOnNext, subscribeOn, observeOn, and finally subscribes with Consumer callbacks.

1. Object Creation

The chain starts with

Observable.create(new ObservableOnSubscribe<Integer>() { ... })

. Inside subscribe() the emitter calls onNext(1). The create operator is implemented as:

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

The ObservableCreate constructor simply stores the source:

public ObservableCreate(ObservableOnSubscribe<T> source) {
    this.source = source;
}

2. Operator map

The map operator wraps the upstream source with ObservableMap:

public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
    ObjectHelper.requireNonNull(mapper, "mapper is null");
    return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
ObservableMap

stores the mapping function:

public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
    super(source);
    this.function = function;
}

The actual mapping occurs in MapObserver.onNext() where mapper.apply(t) is called and the result is forwarded downstream.

public void onNext(T t) {
    if (done) return;
    if (sourceMode != NONE) { actual.onNext(null); return; }
    U v;
    try {
        v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
    } catch (Throwable ex) {
        fail(ex);
        return;
    }
    actual.onNext(v);
}

3. Operator doOnNext doOnNext creates an ObservableDoOnEach that invokes a side‑effect consumer before passing the item downstream:

public final Observable<T> doOnNext(Consumer<? super T> onNext) {
    return doOnEach(onNext, Functions.emptyConsumer(), Functions.EMPTY_ACTION, Functions.EMPTY_ACTION);
}

Its observer implementation runs the consumer and then forwards the item:

public void onNext(T t) {
    if (done) return;
    try {
        onNext.accept(t);
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        s.dispose();
        onError(e);
        return;
    }
    actual.onNext(t);
}

4. Thread Scheduling – subscribeOn and observeOn subscribeOn schedules the subscription side on the given Scheduler:

public final Observable<T> subscribeOn(Scheduler scheduler) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}

Its subscribeActual creates a SubscribeOnObserver, calls s.onSubscribe(parent), and schedules the upstream source.subscribe(parent) on the scheduler’s worker.

public void subscribeActual(final Observer<? super T> s) {
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
    s.onSubscribe(parent);
    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
observeOn

moves downstream emissions to another scheduler:

public final Observable<T> observeOn(Scheduler scheduler) {
    return observeOn(scheduler, false, bufferSize());
}

Its observer stores a Scheduler.Worker and queues items; the run() method drains the queue and calls actual.onNext() on the target thread.

@Override
public void run() {
    if (outputFused) {
        drainFused();
    } else {
        drainNormal();
    }
}

5. Final Subscription

The terminal subscribe creates a LambdaObserver that forwards onNext, onError, and onComplete to the user‑provided callbacks:

public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {
    ObjectHelper.requireNonNull(onNext, "onNext is null");
    ObjectHelper.requireNonNull(onError, "onError is null");
    ObjectHelper.requireNonNull(onComplete, "onComplete is null");
    ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
    LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
    subscribe(ls);
    return ls;
}

When the upstream finally emits the integer 1, the chain transforms it to the string "1 is String!!!" via map, logs it in doOnNext, and delivers it to the subscriber’s onNext callback, after which onComplete is called.

6. Overall Flow Diagram

The article concludes with a flowchart (image) that visualises the reverse‑order subscription from downstream observers back to the upstream source, illustrating how each operator wraps the previous one and how thread scheduling points are inserted.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

RxJavaobservableReactiveProgrammingSourceCodeAnalysis
Sohu Tech Products
Written by

Sohu Tech Products

A knowledge-sharing platform for Sohu's technology products. As a leading Chinese internet brand with media, video, search, and gaming services and over 700 million users, Sohu continuously drives tech innovation and practice. We’ll share practical insights and tech news here.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.