Introduction to RxJava: Concepts, Operators, and Backpressure
This article introduces RxJava as a reactive functional programming framework based on the observer pattern, explains its core concepts, stream types, a wide range of operators, backpressure strategies, and demonstrates its practical use for data migration between MongoDB and MySQL.
Preface
Improving development efficiency, simplifying logic, and standardizing development processes are top priorities for technical teams. RxJava helps address these issues by providing a reactive programming model.
Using RxJava enables:
Simplified logic and decoupled module dependencies
A rich set of operators covering most scenarios
Built‑in parallel asynchronous operations for higher performance
Elegant error handling
The goal of this article is to give a basic understanding of RxJava and show how it can boost development efficiency while illustrating the power of reactive functional programming.
What Is RxJava?
RxJava is commonly described as a reactive functional programming framework implemented via the Observer pattern .
Observer Pattern
The observer pattern is the core principle of RxJava: when an object changes, all dependent objects receive a notification.
Reactive Functional Programming
Reactive functional programming consists of two concepts: reactive and functional. To illustrate, we compare them with traditional imperative programming.
Reactive vs. Imperative
Reactive: event‑driven processing based on a data stream
Imperative: a sequence of commands telling the computer what to do
Functional vs. Imperative
Functional focuses on data mapping relationships
Imperative focuses on step‑by‑step problem solving
RxJava Detailed Usage
RxJava provides operators to manipulate data streams and uses a backpressure strategy to prevent memory overflow when the consumer is slower than the producer.
The following sections detail streams, operators, and backpressure.
RxJava Streams
Stream Types
Hot streams: may start emitting events regardless of subscribers (e.g., ConnectableObservable ).
Cold streams: start emitting only when there is a subscriber (e.g., streams created with create or from ).
Creating Streams
Streams can be created manually with Observable.create or conveniently with operators such as from , just , and range :
Observable<Integer> createTest = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
Utils.log("before");
subscriber.onNext(1);
subscriber.onNext(2);
subscriber.onNext(3);
Utils.log("after");
}
}); // from
List<Integer> numbers = new ArrayList<>();
Observable.from(numbers);
// just
Observable.just(1, 2, 3, 4, 5, 6);
// range
Observable.range(5, 3); // emits 5,6,7Subscribing to Streams
Subscription is performed by calling subscribe on the observable, establishing a relationship with an observer.
private Action1<Integer> observer = x -> {
System.out.println(x);
}; Observable<Integer> observable = Observable.range(5, 3);
observable.subscribe(observer); Observable<Integer> observable = Observable.range(5, 3);
observable.subscribe(x -> {
System.out.println(x);
});RxJava Operators
Operators are the strength of RxJava, covering most use‑cases. They are grouped into basic, advanced, and multi‑stream operators.
Basic Operators
map : transforms data (A → B)
Observable<Integer> testObs = Observable.just(1, 2, 3, 4, 5);
Observable<String> testObsMap = testObs.map(s -> s + "test");
testObs.subscribe(Utils::log);
testObsMap.subscribe(Utils::log);filter : filters data
Observable<Integer> testObs = Observable.just(1, 2, 3, 4, 5);
Observable<Integer> testObsFilter = testObs.filter(s -> s.equals(1));
testObsFilter.subscribe(Utils::log);flatMap : flattens nested data and enables asynchronous parallelism
List<Order> user1Orders = new ArrayList<Order>(){{
add(Order.builder().id("user1-order1").build());
add(Order.builder().id("user1-order2").build());
add(Order.builder().id("user1-order3").build());
}};
List<Order> user2Orders = new ArrayList<Order>(){{
add(Order.builder().id("user2-order1").build());
add(Order.builder().id("user2-order2").build());
add(Order.builder().id("user2-order3").build());
}};
User user1 = User.builder().name("user1").orders(user1Orders).build();
User user2 = User.builder().name("user2").orders(user2Orders).build();
Observable<User> testObs = Observable.just(user1, user2);
Observable<Order> orderObs = testObs.map(User::getOrders).flatMap(Observable::from);
Observable<Order> orderObs2 = testObs.flatMap(User::getOrders);
orderObs.subscribe(o -> Utils.log("订阅order流1:" + o.getId()));
orderObs2.subscribe(o -> Utils.log("订阅order流2:" + o.getId()));concatMap : similar to flatMap but preserves order (synchronous)
delay : delays emission of items
// delay each item by 5 seconds
Observable<Integer> testObs = Observable.just(1, 2, 3).delay(5000, TimeUnit.MILLISECONDS);
// delay based on a selector function
Observable<Integer> testObs2 = Observable.just(1, 2, 3).delay(i -> timer(i, TimeUnit.SECONDS));doOnNext : side‑effect for each emitted item (e.g., logging) without altering data
Observable<Integer> testObs = Observable.just(1, 2, 3, 4, 5)
.doOnNext(s -> Utils.log("first doOnNext: " + s))
.filter(s -> s / 2 == 0)
.doOnNext(s -> Utils.log("second doOnNext: " + s))
.map(s -> s + 10)
.doOnNext(s -> Utils.log("third doOnNext: " + s));
testObs.subscribe(Utils::log);doOnError : side‑effect for errors
Advanced Operators
scan : accumulates a value and emits each intermediate result
Observable<BigInteger> factorials = Observable.range(2, 10)
.scan(BigInteger.ONE, (big, current) -> big.add(BigInteger.valueOf(current)));
factorials.subscribe(Utils::log);reduce : aggregates all items into a single final result
Observable<BigInteger> factorials = Observable.range(2, 10)
.reduce(BigInteger.ONE, (big, current) -> big.add(BigInteger.valueOf(current)));
factorials.subscribe(Utils::log);collect : collects items into a mutable container (e.g., a List)
// collect into a List
Observable<List<Integer>> all = Observable.range(2, 10)
.collect(ArrayList::new, List::add);
all.subscribe(Utils::log);distinct / distinctUntilChanged : removes duplicate events
Observable<Integer> obs = Observable.just(1, 2, 3, 1, 2);
Observable<Integer> obs2 = obs.distinct(x -> x);
obs2.subscribe(Utils::log);groupBy : groups items by a key
Order order1 = Order.builder().id("1").build();
Order order2 = Order.builder().id("1").build();
Order order3 = Order.builder().id("2").build();
Observable<Order> obs = Observable.just(order1, order2, order3);
obs.groupBy(order -> {
if (order.getId().equals("1")) return 1;
else if (order.getId().equals("2")) return 2;
else return 3;
}).subscribe(grouped -> grouped.subscribe(o ->
Utils.log("key is " + grouped.getKey() + " value is " + o)));publish , replay , refCount , share : convert cold streams to hot streams and manage subscription lifecycles
ConnectableObservable<Integer> testObs = Observable.just(1,2,3,4,5).publish();
testObs.connect(); Observable<Integer> testObs = Observable.just(1,2,3,4,5).publish().refCount(); Observable<Integer> testObs = Observable.just(1,2,3,4,5).share();Multi‑Stream Operators
merge : merges multiple observables, allowing interleaved emissions
Observable<Integer> testObs1 = Observable.just(1,3,5);
Observable<Integer> testObs2 = Observable.just(2,4,8,9,6);
Observable<Integer> mergeObs = Observable.merge(testObs1, testObs2);
mergeObs.subscribe(Utils::log);concat : concatenates observables sequentially without interleaving
zip : combines items from multiple observables based on a combinator function
Observable<Integer> testObs1 = Observable.just(1,2,3);
Observable<Integer> testObs2 = Observable.just(4,5,6);
Observable<Integer> testObsZip1 = Observable.zip(testObs1, testObs2, Integer::sum);
testObsZip1.subscribe(Utils::log);combineLatest : emits when any source emits, using the latest values from all sources
Observable.combineLatest(
Observable.interval(17, TimeUnit.MILLISECONDS).map(x -> "S" + x),
Observable.interval(10, TimeUnit.MILLISECONDS).map(x -> "F" + x),
(x, y) -> x + ":" + y)
.forEach(System.out::println);withLatestFrom (named testLatestFrom in the article): combines a main stream with the latest value of a secondary stream
Observable<String> slow = Observable.interval(17, TimeUnit.MILLISECONDS).map(x -> "S" + x);
Observable<String> fast = Observable.interval(10, TimeUnit.MILLISECONDS).map(x -> "F" + x);
slow.withLatestFrom(fast, (s, f) -> s + ":" + f).forEach(System.out::println);amb : subscribes to multiple sources and mirrors the first one that emits, cancelling the others
Observable<String> slow = Observable.interval(17, TimeUnit.MILLISECONDS).map(x -> "S" + x)
.doOnSubscribe(() -> Utils.log("subscribe to S"));
Observable<String> fast = Observable.interval(10, TimeUnit.MILLISECONDS).map(x -> "F" + x)
.delay(300, TimeUnit.MILLISECONDS)
.doOnSubscribe(() -> Utils.log("subscribe to F"))
.doOnUnsubscribe(() -> Utils.log("unsubscribe to F"));
slow.ambWith(fast).subscribe(Utils::log);RxJava Backpressure
When the upstream emits faster than the downstream can consume, memory overflow may occur. RxJava offers backpressure mechanisms to mitigate this.
Backpressure Concepts
Upstream speed > downstream speed leads to accumulation and possible OOM.
In RxJava 1.x, backpressure is handled via buffering, windowing, and a "pull‑push" model.
Buffering : buffer collects items into a List (uncontrolled memory), window collects into an Observable (controlled).
// buffer into lists of size 3
Observable.range(1, 7).buffer(3).subscribe(Utils::log);
// window into observables of size 3
Observable<Observable<Integer>> testObj = Observable.range(1, 7).window(3);
testObj.subscribe(x -> x.subscribe(y -> {
System.out.println("------------------");
System.out.println(y);
}));Backpressure Strategies : onBackpressureDrop discards items when the buffer is full; onBackpressureBuffer buffers with a size limit and optional overflow handling.
private Observable<Integer> myRange(int from, int count) {
return Observable.unsafeCreate(subscriber -> {
int i = from;
while (i < from + count) {
if (!subscriber.isUnsubscribed()) {
subscriber.onNext(i++);
} else {
return;
}
}
subscriber.onCompleted();
});
}
myRange(1, 100000000)
.map(Dish::new)
.onBackpressureBuffer(100, () -> log.warn("buffer full"))
.observeOn(Schedulers.io())
.subscribe(x -> {
log.info("Washing: {}", x);
try { Thread.sleep(50); } catch (InterruptedException e) { e.printStackTrace(); }
});
myRange(1, 100000000)
.map(Dish::new)
.onBackpressureDrop(dish -> log.info("Throw away {}", dish))
.observeOn(Schedulers.io())
.subscribe(x -> {
log.info("Washing: {}", x);
try { Thread.sleep(50); } catch (InterruptedException e) { e.printStackTrace(); }
});Pull‑Push Model : using SyncOnSubscribe.createStateful to request the next batch only after the previous one is processed.
// create a stateful pull‑push observable
Observable.OnSubscribe<Long> onSubscribe = SyncOnSubscribe.createStateful(
() -> 0L,
(cur, observer) -> {
observer.onNext(cur);
return cur + 1;
}
);
Observable<Long> naturals = Observable.unsafeCreate(onSubscribe);
naturals.observeOn(schedulerB).subscribe(x -> {
log.info("Thread-{}-Washing: {}", Thread.currentThread().getName(), x);
try { Thread.sleep(50); } catch (InterruptedException e) { e.printStackTrace(); }
});RxJava Application Example
The article demonstrates a simple scenario: migrating data from MongoDB to MySQL. The data in MongoDB is nested and needs to be flattened, with some fields requiring mapping.
Analysis:
Three layers: extraction, transformation, and loading.
Use a pull‑push model to respect consumer speed.
Apply flatMap to flatten nested structures.
Apply map for field mapping.
Use doOnNext and doOnError for logging.
By leveraging RxJava, the workflow becomes clear, modular, and loosely coupled compared to traditional loop‑based pagination approaches.
References
Book: "RxJava Reactive Programming"
Official site: https://reactivex.io/
Other Notes
The concepts and most operators described are based on RxJava 1.x, but they remain applicable to RxJava 2.x and later versions.
The next article, "RxJava 2.x – Sharing", will detail operators and backpressure in RxJava 2.
New Oriental Technology
Practical internet development experience, tech sharing, knowledge consolidation, and forward-thinking insights.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.