Understanding RxJava: Basics, Usage, and Thread Scheduling
RxJava brings reactive, event‑driven programming to Java by using Observables and Observers, offering operators, schedulers for thread control, and asynchronous, non‑blocking execution that avoids callback hell, improves performance, and fits UI events, I/O, RPC, and complex service orchestration, as demonstrated by Alibaba’s Xianyu.
RxJava is a Java implementation of reactive programming, offering an event‑driven, asynchronous programming model. Since 2018, Alibaba’s Xianyu (Idle Fish) has used RxJava to upgrade its architecture, improving overall performance, resource utilization, network latency, and providing agile support for rapid business innovation.
Traditional callback code suffers from “callback hell”, tangled error handling, and unreadable execution order. RxJava solves these problems by applying the Observer pattern: Observable acts as the data source, while Observer consumes the emitted items.
Basic Components
Observable – the producer of data; Observer – the consumer. An Observable can register multiple Observers. By default, each subscription triggers the Observable’s production logic; Observable.cached() can be used to produce data only once.
Variants such as Single (single‑item source) and Flowable (supports back‑pressure) extend the core concept.
Scheduler provides thread‑pool support for asynchronous execution. Schedulers.io() , Schedulers.computation() , Schedulers.newThread() , etc., can be assigned to the upstream or downstream via subscribeOn and observeOn .
Creating an Observable
Observable
observable = Observable.create(emitter -> {
System.out.println(Thread.currentThread().getName() + "----TestRx.subscribe");
List
result = userService.getAllUser();
for (UserDo u : result) {
emitter.onNext(u);
}
});
Observable
map = observable.map(Object::toString);
map.subscribe(o -> System.out.println(Thread.currentThread().getName() + "----sub1 = " + o),
e -> System.out.println("e = " + e),
() -> System.out.println("finish"));The above code demonstrates that userService.getAllUser() is called each time a subscription is created, and the emission is synchronous by default. To achieve true asynchronous, non‑blocking behavior, subscribeOn and observeOn are used.
Thread Scheduling Example
Observable.fromCallable(() -> {
System.out.println(Thread.currentThread().getName() + "----observable fromCallable");
Thread.sleep(1000);
return "event";
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.single())
.map(i -> {
System.out.println(Thread.currentThread().getName() + "----observable map");
return i;
})
.observeOn(Schedulers.newThread())
.subscribe(str -> System.out.println(Thread.currentThread().getName() + "----inputStr=" + str));
System.out.println(Thread.currentThread().getName() + "----end");
Thread.sleep(2000);Here fromCallable creates a lazy Observable that runs only when subscribed. subscribeOn(Schedulers.io()) moves the upstream work to an I/O thread pool, while observeOn switches the downstream processing to other pools, achieving asynchronous, non‑blocking execution.
Common Operators
map – one‑to‑one transformation (e.g., observable.map(Object::toString) ).
flatMap – one‑to‑many transformation, enabling concurrent inner streams.
merge – merges multiple streams preserving the temporal order of emitted items.
zip – combines items from multiple streams pairwise.
Usage Tips
Be aware that without explicit scheduling, RxJava runs synchronously and may call upstream methods multiple times (e.g., userService.getAllUser() is invoked per subscription).
Set timeouts with timeout and fallback logic with onErrorReturn to avoid thread leakage and OOM.
Prefer Schedulers.from(customExecutor) over default schedulers to control thread‑pool size and prevent long‑tail tasks from blocking shared pools.
Suitable Scenarios
Handling UI events in Android or desktop applications.
Asynchronous I/O and RPC calls where responses need to be combined.
Processing streams of events from uncontrolled producers.
Complex dependency graphs of multiple service calls with timeout and fallback requirements.
In Xianyu’s batch‑processing of product data, RxJava orchestrates multiple dependent service calls, applies per‑call and overall timeouts, and merges results efficiently, dramatically reducing development effort and improving reliability.
Xianyu Technology
Official account of the Xianyu technology team
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.