Backend Development 15 min read

Designing Stateful and Resource‑Safe Reactive Stream Operators with statefulMap

The article shows how a unified abstraction called statefulMap, together with a resource‑aware mapWithResource primitive, lets developers implement a wide range of complex reactive‑stream operators—such as buffering, indexing, deduplication, and safe DB access—in a concise, composable, thread‑safe manner, dramatically reducing boilerplate code.

DaTaobao Tech
DaTaobao Tech
DaTaobao Tech
Designing Stateful and Resource‑Safe Reactive Stream Operators with statefulMap

By using abstraction and composition we can write code that is more concise, understandable and robust. The article uses a contribution to an Akka project as a starting point to share insights on building custom reactive‑stream operators.

Typical scenarios where existing libraries (Java 8 Stream, Kotlin/Scala collections, reactive‑stream libraries) fall short are listed, e.g.:

In a live‑streaming case we need to buffer and aggregate likes, merging several likes into one and sending the batch after N messages.

In an IoT scenario we need to emit the current value together with the previous one or the last three values to compute a trend, using operators such as zipWithNext , zipWithPrevious , zipWithPreviousAndNext or sliding .

In a chatroom we want to close the connection when the user types bye , which can be expressed with takeWhile .

When executing a sequence of SQL statements we need to open a connection, run each statement and close the connection at the end – a use‑case for mapWithResource (resource safety).

Other common needs include batch processing ( batch(3) ), indexing ( zipWithIndex ), conditional buffering ( bufferUntil(predicate) , bufferWhile(predicate) ), deduplication ( distinct , distinctUntilChanged ) and limiting/skipping elements ( limit(N) , take(N) , skip(N) , dropWhile ).

Implementing these operators from scratch is non‑trivial because they must fully comply with the Reactive‑Streams specification, be composable, thread‑safe and well‑tested. The article analyses existing Reactor‑core implementations (e.g. FluxIndexFuseable – 370 lines, FluxBuffer – 575 lines, FluxDistinct – 609 lines) to illustrate the amount of code involved.

Unified abstraction

All the above operators can be expressed as a state transition:

state + input → (newState, output)

with lifecycle hooks onCreate and onComplete . By adding a takeWhile step we can also support early termination.

Based on this idea the article introduces a generic operator statefulMap :

public <S, In, Out> statefulMap(
    Supplier<S> create,
    BiFunction<S, In, Pair<S, Out>> f,
    Function<S, Optional<Out>> onComplete) { ... }

Example – implementing zipWithIndex :

Source.from(Arrays.asList("A", "B", "C", "D"))
    .statefulMap(
        () -> 0L,
        (index, element) -> Pair.create(index + 1, Pair.create(element, index)),
        idx -> Optional.empty())
    .runForeach(System.out::println, system);
// prints
// Pair(A,0)
// Pair(B,1)
// Pair(C,2)
// Pair(D,3)

Implementing bufferUntilChanged :

Source.from(Arrays.asList("A","B","B","C","C","C","D"))
    .statefulMap(
        () -> new LinkedList<String>(),
        (buffer, element) -> {
            if (!buffer.isEmpty() && !buffer.get(0).equals(element)) {
                return Pair.create(new LinkedList<>(Collections.singletonList(element)),
                                   Collections.unmodifiableList(buffer));
            } else {
                buffer.add(element);
                return Pair.create(buffer, Collections.emptyList());
            }
        },
        Optional::ofNullable)
    .filterNot(List::isEmpty)
    .runForeach(System.out::println, system);
// prints
// [A]
// [B, B]
// [C, C, C]
// [D]

Using the same pattern we can build distinctUntilChanged , buffer , etc.

Resource‑aware operators

When a stream needs to create and close resources (files, DB connections) we can model the resource as immutable state and expose an optional value on completion. The generic signature is:

public <R, In, Out> mapWithResource(
    Supplier<? extends R> create,
    BiFunction<? super R, ? super In, ? extends Out> function,
    Function<? super R, ? extends Optional<? extends Out>> close) { ... }

Example – querying multiple SQL statements safely:

Source.from(Arrays.asList(
        "SELECT * FROM shop ORDER BY article-0000 DESC LIMIT 100;",
        "SELECT * FROM shop ORDER BY article-0001 DESC LIMIT 100;"))
    .mapWithResource(
        () -> dbDriver.create(url, user, pwd),
        (conn, sql) -> db.doQuery(conn, sql).toList(),
        conn -> { conn.close(); return Optional.empty(); })
    .mapConcat(elems -> elems)
    .runForeach(System.out::println, system);

The article concludes that by building a small set of well‑designed primitives such as statefulMap and mapWithResource , many complex operators become trivial to compose, reducing code size, improving testability and easing maintenance.

JavaResource ManagementAkkaReactive StreamsReactorstateful operators
DaTaobao Tech
Written by

DaTaobao Tech

Official account of DaTaobao Technology

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.