Understanding Java Stream Gather API and the mapConcurrent Operator
The article explains Java 22’s new Stream Gather API, which extends the traditional Stream model with custom Gatherers that can emit zero‑to‑many elements, enabling stateful, parallel operations such as a concurrency‑controlled mapConcurrent operator built on virtual threads, and compares it to Reactor‑core and Pekko‑Stream.
Since its introduction in Java 1.8, the Stream API has become a staple for data processing, but its limited set of operators often forces developers to turn to third‑party libraries. Java 22 adds the Stream Gather API, a new extension point that addresses these shortcomings and enables richer, stateful and parallel operations.
A stream can be viewed as a pipeline composed of a Source, a series of Flow stages and a Sink. The standard Java Stream represents a simple, single‑direction flow, while the Gather API allows custom Gatherer implementations that can emit zero, one or many elements per input.
The article compares several Java stream libraries (standard Stream, Reactor‑core, Pekko‑Stream, etc.) and highlights the need for operators that support:
1:1, 1:N, N:1 and N:M data transformations;
finite and infinite streams;
stateful and stateless processing;
early termination and full consumption;
single‑threaded and multi‑threaded execution;
awareness of stream termination (e.g., fold).
Java’s existing extension point is Collector<T,A,R>, which only provides an accumulator (producing a single result) and a finisher. The Gather API introduces integrator and downstream, allowing operators to emit 0…N results (e.g., mapConcurrent, zipWithIndex).
Two basic operators are shown:
public static <T,R> Gatherer<T,?,R> map(final Function<T,R> mapper) {<br/> return Gatherer.of((notused, element, downstream) -> downstream.push(mapper.apply(element)));<br/>}<br/><br/>public static <T> Gatherer<T,?,T> filter(final Predicate<T> predicate) {<br/> return Gatherer.of((notused, element, downstream) -> {<br/> if (predicate.test(element)) {<br/> return downstream.push(element);<br/> }<br/> return true;<br/> });<br/>}The mapConcurrent operator demonstrates how virtual threads and a semaphore can limit concurrency while preserving order:
public static <T,R> Gatherer<T,?,R> mapConcurrent(final int maxConcurrency,<br/> final Function<? super T, ? extends R> mapper) {<br/> class State {<br/> final ArrayDeque<Future<R>> window = new ArrayDeque<>(Math.min(maxConcurrency,16));<br/> final Semaphore windowLock = new Semaphore(maxConcurrency);<br/> boolean integrate(T element, Downstream<? super R> downstream) {<br/> if (!downstream.isRejecting()) createTaskFor(element);<br/> return flush(0, downstream);<br/> }<br/> void createTaskFor(T element) {<br/> windowLock.acquireUninterruptibly();<br/> var task = new FutureTask<>(() -> {<br/> try { return mapper.apply(element); }<br/> finally { windowLock.release(); }<br/> });<br/> window.add(task);<br/> Thread.startVirtualThread(task);<br/> }<br/> boolean flush(long atLeastN, Downstream<? super R> downstream) {<br/> // omitted: poll completed futures and push results<br/> return true;<br/> }<br/> }<br/> return Gatherer.ofSequential(State::new,<br/> Integrator.<State,T,R>ofGreedy(State::integrate),<br/> (state, downstream) -> state.flush(Long.MAX_VALUE, downstream));<br/>}The fold operator illustrates a simple stateful reduction:
public static <T,R> Gatherer<T,?,R> fold(Supplier<R> initial,<br/> BiFunction<? super R, ? super T, ? extends R> folder) {<br/> class State { R value = initial.get(); }<br/> return Gatherer.ofSequential(State::new,<br/> Integrator.ofGreedy((state, element, downstream) -> {<br/> state.value = folder.apply(state.value, element);<br/> return true;<br/> }),<br/> (state, downstream) -> downstream.push(state.value));<br/>}Compared with Reactor‑core’s flatMap and Pekko‑Stream’s mapAsync, the Gather implementations are concise and leverage Java 22’s virtual threads and semaphores for controlled parallelism.
Future work includes adding more gathers (e.g., zipWithIndex, zipWithNext, mapConcat, throttle) and possibly introducing extension methods to make the DSL more ergonomic.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
