Mastering CompletableFuture: From Basics to RocketMQ Integration

Explore how Java's CompletableFuture enhances asynchronous programming by overcoming Future's limitations, learn its core APIs, practical code examples, and see a real-world application in RocketMQ where combined futures streamline message persistence and replication.

Su San Talks Tech
Su San Talks Tech
Su San Talks Tech
Mastering CompletableFuture: From Basics to RocketMQ Integration

Future Interface and Its Limitations

Before Java 8, asynchronous tasks were handled with Future and FutureTask, which allow retrieving results but block the calling thread when using get(). Polling with isDone() also wastes CPU resources.

FutureTask<String> futureTask = new FutureTask<>(() -> "三友");
new Thread(futureTask).start();
System.out.println(futureTask.get());

Using a thread pool:

ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<String> future = executorService.submit(() -> "三友");
System.out.println(future.get());
executorService.shutdown();

Future Interface Limitations

The get() method blocks until the task finishes, and polling with isDone() consumes CPU. These drawbacks motivated the introduction of CompletableFuture in JDK 1.8.

What Is CompletableFuture?

CompletableFuture

implements both Future and CompletionStage. It provides callback‑style composition, allowing the next stage to run automatically after the previous one completes, without blocking.

Common CompletableFuture APIs

1. Instantiating CompletableFuture

Constructor

CompletableFuture<String> cf = new CompletableFuture<>();
System.out.println(cf.get()); // blocks until completed

Complete it from another thread:

cf.complete("三友");

Static Factory Methods

Factory methods start the asynchronous task immediately.

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);
public static CompletableFuture<Void> runAsync(Runnable runnable);
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);

Example:

CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> "三友");
System.out.println(cf.get());

2. Getting Task Results

public T get();
public T get(long timeout, TimeUnit unit);
public T getNow(T valueIfAbsent);
public T join();
get()

blocks indefinitely, while get(long, TimeUnit) times out. getNow returns a default value if the task isn’t finished, and join() behaves like get() but wraps checked exceptions in CompletionException.

3. Manually Completing a Task

public boolean complete(T value);
public boolean completeExceptionally(Throwable ex);
complete

marks the task as successfully finished; completeExceptionally finishes it with an exception.

4. Processing Results in the Next Stage

Callbacks for Successful Completion

public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn);
public CompletableFuture<Void> thenRun(Runnable action);
public CompletionStage<Void> thenAccept(Consumer<? super T> action);

Examples:

CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> 10)
    .thenApply(v -> "Result: " + v);
System.out.println(cf.join());
CompletableFuture<Void> cf = CompletableFuture.supplyAsync(() -> 10)
    .thenRun(() -> System.out.println("Task finished"));
CompletableFuture<Void> cf = CompletableFuture.supplyAsync(() -> 10)
    .thenAccept(v -> System.out.println("Result: " + v));

Callbacks for Exceptional Completion

public CompletionStage<T> exceptionally(Function<Throwable,? extends T> fn);

When an exception occurs, the supplied function provides a fallback value.

CompletableFuture<Integer> cf = CompletableFuture.supplyAsync(() -> {
    int i = 1 / 0; // throws
    return 100;
}).exceptionally(e -> {
    System.out.println("Exception caught, returning default");
    return 110;
});
System.out.println(cf.join());

Callbacks for Both Success and Failure

public <U> CompletionStage<U> handle(BiFunction<? super T,Throwable,? extends U> fn);
public CompletionStage<T> whenComplete(BiConsumer<? super T,? super Throwable> action);
handle

receives the result or the exception and can return a new value; whenComplete runs after the stage regardless of outcome but does not alter the result.

5. Combining Results of Multiple Tasks

public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,
    BiFunction<? super T,? super U,? extends V> fn);

The two stages run independently; when both finish, fn combines their results.

6. Async‑Suffixed Variants

public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor);

These variants execute the callback in a separate thread (default ForkJoinPool or a provided executor).

CompletableFuture in RocketMQ

RocketMQ uses two independent asynchronous tasks for persisting a message to disk and replicating it to a slave broker. The overall storage time is measured after both tasks finish.

RocketMQ CompletableFuture flow
RocketMQ CompletableFuture flow

Code for the two asynchronous tasks:

PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
// Submit flush request
CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
// Submit replica request
CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);

// Combine both futures
return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
    if (flushStatus != PutMessageStatus.PUT_OK) {
        putMessageResult.setPutMessageStatus(flushStatus);
    }
    if (replicaStatus != PutMessageStatus.PUT_OK) {
        putMessageResult.setPutMessageStatus(replicaStatus);
    }
    return putMessageResult;
});

Listening to the combined result to record latency:

long beginTime = this.getSystemClock().now();
CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg);
putResultFuture.thenAccept(result -> {
    long elapsedTime = this.getSystemClock().now() - beginTime;
    if (elapsedTime > 500) {
        log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);
    }
    this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
    if (result == null || !result.isOk()) {
        this.storeStatsService.getPutMessageFailedTimes().add(1);
    }
});

Advantages of CompletableFuture

Enables elegant functional asynchronous programming that is easier to maintain.

Provides built‑in exception handling, allowing you to capture and manage errors in asynchronous flows.

Offers powerful task composition, letting you define complex execution order, rules, and parallelism with minimal boilerplate.

Reference:

https://zhuanlan.zhihu.com/p/344431341

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

JavaBackend DevelopmentconcurrencyCompletableFutureRocketMQasynchronous programming
Su San Talks Tech
Written by

Su San Talks Tech

Su San, former staff at several leading tech companies, is a top creator on Juejin and a premium creator on CSDN, and runs the free coding practice site www.susan.net.cn.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.