Mastering Reactor: From Mono & Flux Basics to Advanced Async Patterns
This article explains the fundamentals of Reactor's reactive programming model—including Mono and Flux types, map and flatMap operators, asynchronous execution, scheduler choices, and error handling—while providing practical code examples to help developers efficiently use Spring WebFlux.
Reactor Reactive Programming Overview
Reactor is a fully non‑blocking reactive programming foundation for the JVM that efficiently manages back‑pressure and integrates directly with Java 8 functional APIs such as CompletableFuture, Stream, and Duration. It provides composable asynchronous sequence APIs— Flux for N elements and Mono for 0 or 1 element—implementing the Reactive Streams specification.
Background
Our backend projects mainly use Spring WebFlux, whose core reactive library is Reactor. Because Reactor has a learning curve, newcomers often encounter bugs related to map, flatMap, asynchronous processing, and concurrency. This guide focuses on those confusing points to help developers get up to speed with Spring WebFlux.
Mono
Mono<T>is a special Publisher<T> that emits at most one item followed by either onComplete (successful) or onError. Certain operators can convert a Mono to a Flux, e.g., Mono.concatWith(Publisher) returns a Flux, while Mono.then(Mono) returns another Mono.
Flux
Flux<T>is a standard Publisher<T> that can emit 0 to N elements, terminating with either onComplete or onError. All signals correspond to downstream method calls ( onNext, onComplete, onError). An empty Flux can be turned into an infinite empty sequence by removing onComplete. Infinite sequences such as Flux.interval(Duration) produce unbounded elements.
map, flatMap and flatMapSequential
Method signatures
<span>// Map signature</span>
<V> Flux<V> map(Function<? super T, ? extends V> mapper) <span>// FlatMap signature</span>
<R> Flux<R> flatMap(Function<? super T, ? extends Publisher<? extends R>> mapper) mapapplies a synchronous function to each element, producing a one‑to‑one transformation. flatMap transforms each element into a Publisher and merges the inner publishers, resulting in a one‑to‑many transformation; the order of emitted items is not guaranteed. flatMapSequential behaves like flatMap but preserves the original order when merging.
Example 1
void demo() {
// 1. Produce an infinite stream of Long values every 100 ms
final Flux<Long> flux = Flux.interval(Duration.of(100, ChronoUnit.MILLIS))
.map(log()) // log runs on the current thread
.flatMap(logOfFlux()); // flatMap runs on the current thread
flux.subscribe();
}
Function<Long, Long> log() {
return aLong -> {
log.info("num is {}", aLong);
return aLong;
};
}
Function<Long, Flux<Long>> logOfFlux() {
return aLong -> {
log.info("num is {}", aLong);
return Flux.just(aLong);
};
}In this example the flatMap operation is synchronous because Flux.just() emits the element immediately. The article later shows how to make flatMap asynchronous.
Empty Mono Handling
void monoOfEmpty() {
Mono.empty()
.map(m -> func())
.subscribe(message -> {
responseObserver.onNext(message);
responseObserver.onCompleted();
});
}When Mono.empty() is used, the onComplete signal prevents the subscribe block from executing. The correct approach is:
void monoOfEmpty() {
Mono.empty()
.map(m -> func())
.doOnSuccess(v -> responseObserver.onCompleted())
.subscribe(responseObserver::onNext);
}Asynchronous Execution and Multithreading
Reactor operators do not automatically switch threads; most operators run on the thread that invoked the previous operator unless a scheduler is specified. Two main ways to change execution context are publishOn and subscribeOn.
publishOn
void demo() {
final Flux<Long> flux = Flux.interval(Duration.of(100, ChronoUnit.MILLIS))
.map(log()) // runs on caller thread
.publishOn(Schedulers.parallel()) // subsequent operators run on parallel pool
.map(log()) // runs on parallel thread
.publishOn(Schedulers.elastic()) // switch to elastic pool
.flatMap(logOfMono());
flux.subscribe();
} publishOnaffects downstream operators until another publishOn appears.
subscribeOn
void demo() {
final Flux<Long> flux = Flux.interval(Duration.of(100, ChronoUnit.MILLIS))
.map(log())
.publishOn(Schedulers.parallel())
.map(log())
.subscribeOn(Schedulers.elastic()) // entire upstream runs on elastic pool
.flatMap(logOfMono());
flux.subscribe();
} subscribeOninfluences the source subscription thread; later publishOn can override it for downstream operators.
Parallel Execution
To run each element on a separate thread, convert a Flux to ParallelFlux and specify a scheduler with runOn. After parallel processing, sequential() restores ordered processing.
void demo() {
final Flux<Long> flux = Flux.fromIterable(Lists.newArrayList(3L, 1L, 2L))
.parallel()
.runOn(Schedulers.elastic())
.flatMap(logOfMono())
.sequential();
flux.subscribe();
}Custom Thread Pools
final Executor executor = new ThreadPoolExecutor(...);
Mono.create(sink -> executor.execute(() -> {
sink.success(blockFunction());
}));Creating Mono or Flux with a custom executor gives fine‑grained control over the execution context and reduces cognitive load for callers.
Scheduler Options in Reactor
Current thread – Schedulers.immediate() Single reusable thread – Schedulers.single() (or Schedulers.newSingle() for a dedicated thread)
Unbounded elastic pool – Schedulers.elastic() (creates threads as needed, may be unsafe)
Bounded elastic pool – Schedulers.boundedElastic() (limits thread count, suitable for blocking I/O)
Parallel pool – Schedulers.parallel() (threads equal to CPU cores)
Error Handling
In reactive streams, an error terminates the sequence and propagates downstream until a subscriber handles it. Reactor provides operators that correspond to traditional try/catch patterns: onErrorReturn – return a static fallback value. onErrorResume – switch to another Publisher based on the exception. doOnError – execute side‑effects such as logging. doFinally – run cleanup logic regardless of termination type.
Flux.just(10)
.flatMap(this::function)
.onErrorReturn("Error");
Flux.just(10)
.flatMap(this::function)
.onErrorResume(e -> handleErr(e));
Flux.just(10)
.flatMap(this::function)
.doOnError(e -> log.error(e.getMessage(), e));Choosing the Right Operator
Depending on the use case, select operators that preserve order ( flatMapSequential), provide parallelism ( parallel() + runOn), or handle back‑pressure ( onBackpressureDrop, onBackpressureBuffer, etc.).
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
GrowingIO Tech Team
The official technical account of GrowingIO, showcasing our tech innovations, experience summaries, and cutting‑edge black‑tech.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
