What Is Reactive Programming and Its Application in Backend Development
This article explains the fundamentals of reactive programming as a declarative, data‑stream‑based paradigm, describes its core concepts such as streams, events, and operators, and demonstrates how to replace traditional imperative Java code with RxJava‑based reactive implementations for a backend service that processes MQ messages, fetches data, and updates a cache while handling errors, request deduplication, and rate limiting.
What Is Reactive Programming
Reactive Programming is a declarative programming paradigm based on data streams (Stream) and propagation of change. It treats streams of events as first‑class citizens, allowing developers to react to asynchronous data flows.
Data Stream (Stream)
A Stream (or Observable) is a sequence of events on a timeline. Events have three basic types:
Value
Error
Complete
By subscribing to a Stream , one can continuously capture these events and process them with functions until a Complete event occurs.
Operators
Reactive libraries provide a rich set of operators, grouped into categories such as Creating, Transforming, Filtering, Combining, Error Handling, Utility, Conditional/Boolean, Mathematical and Aggregate, and Others.
Create
[ Create{onNext(1);onNext(2);onComplete} ]
--1--2--|-->Buffer
--1--2--3--4--5--6----|-
[ Buffer ]
--[1,2,3]--[4,5,6]----|->FlatMap
--1--2--3----|-
[ FlatMap(x-> --x--x--|-) ]
--1--1--2--2--3--3----|->Map
--1--2--3----|-
[ Map(x=>x*10) ]
--10--20--30----|->Filter
--1--2--3----|-
[ Filter(x->x>1) ]
-- --2--3----|?Zip
--1--2--3----|-
--a--b--c----|-
[ Zip((x,y)->[x,y]) ]
--[1,a]--[2,b]--[3,c]----|->Applying Reactive Programming in Business
Consider a backend service (Service A) that subscribes to an MQ topic "object-update-message-topic" to receive change notifications, then pulls the corresponding data object from an upstream service and updates a local cache.
MQ messages form an infinite Stream .
Service A subscribes to this Stream .
Each message is an Event .
The fetch‑and‑cache logic is a Function .
Errors are represented as Error events.
Imperative Implementation (Traditional)
public void startObjectUpdateMessageSubscribe() {
this.subscribe("object-update-message-topic", new MessageConsumer
() {
@Override
public void onMessage(@NotNull ObjectUpdateMessage message) {
if (message.getObjectId() <= 0L) { return; }
long objectId = message.getObjectId();
Object obj;
try {
obj = objectProduceService.getObject(objectId);
log.info("Load object success, objectId={}, obj is {}", objectId, obj == null ? "null" : "nonNull");
} catch (Throwable e) {
log.error(String.format("Load object error, objectId=%s, error=%s", objectId, e.getMessage()), e);
return;
}
try {
boolean isSetOk = objectCacheService.set(CacheEntry.builder().id(objectId).obj(obj).build());
log.info("Cache object {}, objectId={}", isSetOk ? "success" : "failed", objectId);
} catch (Throwable e) {
log.error(String.format("Cache object error, objectId=%s, error=%s", objectId, e.getMessage()), e);
}
}
});
}Reactive Implementation with RxJava
public void startObjectUpdateMessageSubscribeWithReactive() {
Flowable.
create(emitter -> this.subscribe(
"object-update-message-topic",
(MessageConsumer
) message -> {
try { emitter.onNext(message); }
catch (Throwable e) { emitter.onError(e); }
}), BackpressureStrategy.BUFFER)
.map(message -> message.getObjectId())
.filter(objectId -> objectId > 0L)
.flatMapMaybe(objectId ->
Maybe.fromCallable(() -> objectProduceService.getObject(objectId))
.doOnSuccess(obj -> log.info("Load object success, objectId={}, obj is nonNull", objectId))
.map(nonNullObject -> CacheEntry.builder().id(objectId).obj(nonNullObject).build())
.switchIfEmpty(Maybe.fromCallable(() -> {
log.info("Load object success, objectId={}, obj is null", objectId);
return CacheEntry.builder().id(objectId).build();
}))
.onErrorResumeNext(e -> {
log.error(String.format("Load object error, objectId=%s, error=%s", objectId, e.getMessage()), e);
return Maybe.empty();
})
)
.doOnNext(cacheEntry ->
Single.fromCallable(() -> objectCacheService.set(cacheEntry))
.subscribe(isSetOk -> {
log.info("Cache object {}, objectId={}", isSetOk ? "success" : "failed", cacheEntry.getId());
}, e -> {
log.error(String.format("Cache object error, objectId=%s, error=%s", cacheEntry.getId(), e.getMessage()), e);
})
)
.subscribe();
}Why Use Reactive Programming
Although the reactive version appears longer, it abstracts away boilerplate error handling, threading, and request‑deduplication logic, allowing developers to focus on business rules. By treating every request as a Stream , complex scenarios such as merging duplicate requests, rate limiting, and concurrent calls to multiple upstream services become concise compositions of operators like groupBy , buffer , zip , and subscribeOn .
Advanced Reactive Scenario
When an object must be assembled from two upstream services, the following reactive pipeline achieves deduplication, rate limiting, parallel fetching, and cache updating:
--m1--m2--m1--...->
[ map(m->m.getObjectId) ]
--id1--id2--id1--...->
[ filter(id->id>0L) ]
--id1--id2--...->
[ groupBy(id) ]
.flatMap(grouped -> grouped.buffer(100L, TimeUnit.MILLISECONDS)
.mapOptional(arr -> arr.stream().findFirst()))
.flatMapMaybe(objectId ->
Maybe.zip(
Maybe.fromCallable(() -> objectProduceService.getObject(objectId)).subscribeOn(Schedulers.io()),
Maybe.fromCallable(() -> extendProduceService.getExtend(objectId)).subscribeOn(Schedulers.io()),
(obj, ext) -> new _Object(obj, ext))
.doOnSuccess(o -> log.info("Load object success, objectId={}, obj is nonNull", objectId))
.map(nonNullObject -> CacheEntry.builder().id(objectId).obj(nonNullObject).build())
.switchIfEmpty(Maybe.fromCallable(() -> {
log.info("Load object success, objectId={}, obj is null", objectId);
return CacheEntry.builder().id(objectId).build();
}))
.onErrorResumeNext(e -> {
log.error(String.format("Load object error, objectId=%s, error=%s", objectId, e.getMessage()), e);
return Maybe.empty();
})
)
.doOnNext(cacheEntry ->
Single.fromCallable(() -> objectCacheService.set(cacheEntry))
.subscribe(isSetOk -> {
log.info("Cache object {}, objectId={}", isSetOk ? "success" : "failed", cacheEntry.getId());
}, e -> {
log.error(String.format("Cache object error, objectId=%s, error=%s", cacheEntry.getId(), e.getMessage()), e);
})
)
.subscribe();Conclusion
Reactive programming raises the abstraction level of backend code, enabling developers to concentrate on business logic while the framework handles asynchronous flow, error propagation, request merging, and back‑pressure. Properly modeling business processes as Stream s and applying the right operators can dramatically simplify complex server‑side implementations.
Sohu Tech Products
A knowledge-sharing platform for Sohu's technology products. As a leading Chinese internet brand with media, video, search, and gaming services and over 700 million users, Sohu continuously drives tech innovation and practice. We’ll share practical insights and tech news here.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.