Understanding RxJava Scheduler: Source Code Analysis of subscribeOn() and observeOn()
This article explains RxJava's Scheduler abstraction, lists the built‑in schedulers, demonstrates thread control with subscribeOn() and observeOn() through a detailed code example, and walks through the underlying source code to show how tasks are scheduled and executed on different threads.
RxJava defines Scheduler as an abstraction for thread control and provides several built‑in implementations such as single , newThread , computation , io , trampoline and Schedulers.from . Each scheduler has a specific purpose, for example io uses an unbounded thread pool suitable for I/O operations.
The article first introduces the concept of thread scheduling in RxJava, stating that subscribeOn() influences the upstream thread (only the first call takes effect) while observeOn() influences the downstream thread (every call takes effect).
An example Observable chain is presented:
private void schedulerTest() {
Observable
observable = Observable.create(new ObservableOnSubscribe
() {
@Override
public void subscribe(ObservableEmitter
observableEmitter) throws Exception {
Log.i(TAG, "create:" + Thread.currentThread().getName());
observableEmitter.onNext(1);
observableEmitter.onComplete();
}
});
observable.subscribeOn(Schedulers.newThread())
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.map(new Function
() { ... })
.observeOn(Schedulers.newThread())
.map(new Function
() { ... })
.observeOn(AndroidSchedulers.mainThread())
.doOnSubscribe(new Consumer
() { ... })
.subscribeOn(Schedulers.single())
.subscribe(new Consumer
() { ... });
}The source analysis of subscribeOn() shows that calling Schedulers.single() creates a SingleScheduler which internally builds a single‑threaded ScheduledExecutorService . The subscribeActual method creates a SubscribeOnObserver , schedules a SubscribeTask via scheduleDirect , which eventually creates a Worker , wraps the task in a DisposeTask , and submits it to the executor.
@Override
public void subscribeActual(Observer
s) {
SubscribeOnObserver
parent = new SubscribeOnObserver
(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}The analysis of observeOn() reveals that its subscribeActual creates a Worker and wraps the downstream observer in an ObserveOnObserver . When onNext() is called, the item is queued and schedule() is invoked, which posts a runnable to the worker. The worker’s run() method then drains the queue, delivering items to the downstream observer on the chosen thread.
@Override
public void onNext(T t) {
if (done) return;
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}The article walks through the execution order of the example, showing how the chain first runs on a new thread, switches to io for the first map , then to another new thread for the second map , and finally back to the main thread for the final subscribe callback, with log output illustrating each transition.
In summary, subscribeOn() sets the thread for upstream subscription actions (only the first call matters), while observeOn() sets the thread for downstream processing (each call takes effect), allowing fine‑grained control of concurrency in RxJava pipelines.
Sohu Tech Products
A knowledge-sharing platform for Sohu's technology products. As a leading Chinese internet brand with media, video, search, and gaming services and over 700 million users, Sohu continuously drives tech innovation and practice. We’ll share practical insights and tech news here.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.