Mastering Project Reactor: Mono & Flux Basics with Practical Code
Learn the fundamentals of Project Reactor’s Mono and Flux, including creation, subscription, error handling, and common operators, with detailed code examples and visual diagrams, enabling you to effectively implement reactive, asynchronous streams in WebFlux-based Java backend applications.
Environment: projectreactor 2020.0.14
1. Introduction
For WebFlux framework introduction, see the related article. In reactive programming, Project Reactor provides two core concepts: Mono and Flux. Both are Publishers that emit data and can be subscribed to. They are widely used in WebFlux to achieve asynchronous, non‑blocking programming.
This article explores the basic usage of Mono and Flux: creation, subscription, error and completion handling, allowing you to understand WebFlux’s reactive model and apply Mono and Flux in your projects.
2. Environment Dependencies
<code><dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-bom</artifactId>
<version>${reactor.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement></code>3. Mono & Flux Overview
Flux
Flux<T> is a standard Publisher<T> representing an asynchronous sequence of 0 to N items, optionally terminated by a completion or error signal. In the Reactive Streams specification these correspond to onNext, onComplete, and onError calls.
Flux is a generic reactive type; all events, including termination, are optional. For example, Flux.interval(Duration) creates an infinite Flux<Long> emitting timestamps.
Mono
Mono<T> represents a sequence of 0 or 1 element. It emits at most one onNext signal followed by onComplete (success) or a single onError (failure). Mono.never() never emits any signal.
Mono provides a subset of Flux operators; some operators return a Flux (e.g., Mono.concatWith(Publisher) ) or another Mono (e.g., Mono.then(Mono) ).
Mono can also represent a void asynchronous process using Mono<Void> .
4. Common Mono & Flux Operations
Mono Operations
Creating elements
Mono.just(T value) creates a Mono emitting the given value.
<code>Mono.just(10).subscribe(System.out::println);</code>Mono.empty() creates an empty Mono.
Mono.justOrEmpty(T value) creates a Mono with the value if non‑null, otherwise an empty Mono.
<code>// Output 10
Mono.justOrEmpty(10).subscribe(System.out::println);
// No output for null
Mono.justOrEmpty(null).subscribe(System.out::println);
</code>Mono.fromCallable(Callable<? extends T> supplier) creates a Mono from a Callable.
<code>Mono.fromCallable(() -> 666).subscribe(System.out::println);
</code>Mono.fromSupplier(Supplier<? extends T> supplier) creates a Mono from a Supplier.
<code>Mono.fromSupplier(() -> 666).subscribe(System.out::println);
</code>Mono.fromFuture(CompletableFuture<? extends T> future) creates a Mono from a CompletableFuture.
<code>Mono.fromFuture(CompletableFuture.supplyAsync(() -> 666))
.subscribe(System.out::println);
</code>Error handling
Example showing error handling with Mono.error and fallback operators:
<code>public static Mono<Users> invoke(Mono<Users> user) {
return user.flatMap(u -> {
if ("admin".equals(u.getName())) {
return Mono.error(new RuntimeException("越权"));
}
u.setName(u.getName() + " - ");
return Mono.just(u);
});
}
public static void main(String[] args) {
invoke(Mono.just(new Users("admin")))
.doOnNext(System.out::println)
.doOnError(e -> System.out.println(e.getMessage()))
.onErrorReturn(new Users("return"))
.doOnNext(System.out::println)
.subscribe();
}
</code>Result:
<code>越权
Users[name=return]
</code>Other operators
Concat: Mono.just(10).concatWith(Mono.just(20)).subscribe(System.out::println);
Then: Mono.just(10).doOnNext(System.out::println).then(Mono.just(666)).doOnNext(System.out::println).subscribe();
Flux Operations
Creating elements
Flux.just(...) creates a Flux from given items.
<code>Flux.just(1, 2, 3);
</code>Flux.fromArray , Flux.fromIterable and Flux.fromStream create Flux from arrays, iterables, or streams.
<code>Flux.fromArray(new String[]{"1","2","3"});
Flux.fromIterable(List.of("a","b","c"));
Flux.fromStream(List.of("a","b","c").stream());
</code>Flux.empty() creates an empty Flux that only completes.
Flux.range(int start, int count) creates a sequence of integers.
<code>Flux.range(1, 10);
</code>Error handling
<code>Flux.error(new RuntimeException("错误"))
.onErrorResume(ex -> Mono.just("发生异常:" + ex.getMessage()))
.subscribe(System.out::println);
</code>when operator
Combines multiple Publishers into a Mono that completes when all have finished.
<code>Flux.just(1,3,6).flatMap(id -> {
Mono<Integer> query = Mono.fromSupplier(() -> {
System.out.println("查询数据...");
return id * 10;
}).delayElement(Duration.ofSeconds(2));
Mono<String> save = Mono.fromSupplier(() -> {
System.out.println("保存数据...");
return "success - " + id;
});
return Mono.when(query, save);
}).doOnComplete(() -> System.out.println("执行完成..."))
.subscribe();
</code>Overall, Flux and Mono are core components of Reactor, offering rich operators for handling asynchronous data streams, making them essential for developers using WebFlux.
Finished!
Spring Full-Stack Practical Cases
Full-stack Java development with Vue 2/3 front-end suite; hands-on examples and source code analysis for Spring, Spring Boot 2/3, and Spring Cloud.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.