Unlocking Java 22 Stream Gather API: Extending Stream Capabilities with Virtual Threads

This article examines the concept of streams, compares existing Java stream libraries, introduces Java 22's Stream Gather API, and demonstrates how virtual threads simplify implementing advanced operators like mapConcurrent, while outlining design goals, code examples, and future extensions.

Alibaba Cloud Developer
Alibaba Cloud Developer
Alibaba Cloud Developer
Unlocking Java 22 Stream Gather API: Extending Stream Capabilities with Virtual Threads

Background

Since its introduction in Java 1.8, the Stream API has become a staple for developers, but its limited set of operators often forces developers to adopt alternative libraries. Java 22’s Stream Gather API addresses these shortcomings and, together with virtual threads, simplifies the implementation of advanced operators such as mapConcurrent.

What Is a Stream?

A stream is an abstract concept representing a continuous flow of data, similar to an assembly line. Streams can be finite or infinite and are typically modeled as a combination of Source , Flow , and Sink components.

The standard Java Stream API represents the simplest single‑direction flow:

Why a New Gather API?

Existing stream libraries provide richer operators, but the standard API lacks extensibility. The Stream Gather API introduces a new extension point that supports a wide range of operator characteristics:

Various data‑transformation patterns (1:1, 1:N, N:1, N:M)

Finite and infinite streams

Stateful and stateless operations

Early termination and full consumption

Sequential and parallel processing

Fold‑like detection of stream end

Collector vs. Gather

The traditional Collector<T,A,R> consists of an accumulator, a finisher, and optionally a combiner, producing a single result (N:1). In contrast, the Gather API adds an integrator and a downstream , allowing the generation of 0…N results via DownStream<R>.

Core Components of Gather

The API is built around four functions:

supplier : creates the initial state

integrator + downstream : transforms and pushes values downstream

finisher : emits the final signal

combiner : merges state for parallel streams

Simple Operator Implementations

Using the Gatherer, common operators can be expressed concisely:

public static <T,R> Gatherer<T,?,R> map(final Function<T,R> mapper) {
    return Gatherer.of((notused, element, downstream) -> downstream.push(mapper.apply(element)));
}

public static <T> Gatherer<T,?,T> filter(final Predicate<T> predicate) {
    return Gatherer.of((notused, element, downstream) -> {
        if (predicate.test(element)) {
            return downstream.push(element);
        }
        return true;
    });
}

Comparison with Other Libraries

Similar concepts exist in Reactor‑core, Mutiny, and Pekko‑Stream. For example, Pekko‑stream’s statefulMap (also known as mapAccumulate) mirrors the Gather integrator pattern.

mapConcurrent Implementation

The following Gatherer uses virtual threads and a semaphore to limit concurrency:

public static <T,R> Gatherer<T,?,R> mapConcurrent(final int maxConcurrency, final Function<? super T,? extends R> mapper) {
    class State {
        final ArrayDeque<Future<R>> window = new ArrayDeque<>(Math.min(maxConcurrency, 16));
        final Semaphore windowLock = new Semaphore(maxConcurrency);
        boolean integrate(T element, Downstream<? super R> downstream) {
            if (!downstream.isRejecting()) createTaskFor(element);
            return flush(0, downstream);
        }
        void createTaskFor(T element) {
            windowLock.acquireUninterruptibly();
            var task = new FutureTask<>(() -> {
                try { return mapper.apply(element); }
                finally { windowLock.release(); }
            });
            window.add(task);
            Thread.startVirtualThread(task);
        }
        boolean flush(long atLeastN, Downstream<? super R> downstream) {
            // omitted: push completed results downstream
            return true;
        }
    }
    return Gatherer.ofSequential(State::new,
        Integrator.<State,T,R>ofGreedy(State::integrate),
        (state, downstream) -> state.flush(Long.MAX_VALUE, downstream));
}

fold Implementation

The Gather fold operator aggregates elements into a single result while preserving sequential execution:

public static <T,R> Gatherer<T,?,R> fold(Supplier<R> initial, BiFunction<? super R,? super T,? extends R> folder) {
    class State { R value = initial.get(); }
    return Gatherer.ofSequential(State::new,
        Integrator.ofGreedy((state, element, downstream) -> {
            state.value = folder.apply(state.value, element);
            return true;
        }),
        (state, downstream) -> downstream.push(state.value));
}

Future Outlook

With Stream Gather and virtual threads, Java’s stream extensibility gap narrows, paving the way for new gathers such as zipWithIndex, zipWithNext, mapConcat, and throttle. A remaining limitation is Java’s lack of extension‑method syntax, which prevents a DSL like Stream.zipWithIndex and forces the current Stream.gather(MyGathers.zipWithIndex) style.

Conclusion

The article analyzed the notion of streams, compared several Java stream libraries, introduced Java 22’s Stream Gather API, and demonstrated how virtual threads simplify implementing operators like mapConcurrent. Upgrading to the latest JDK can empower developers with these new capabilities.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

Backend DevelopmentVirtual ThreadsStreamGather APIJava 22
Alibaba Cloud Developer
Written by

Alibaba Cloud Developer

Alibaba's official tech channel, featuring all of its technology innovations.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.