Mastering CompletableFuture: From Basics to RocketMQ Integration

This article explains Java's CompletableFuture, its advantages over the traditional Future API, demonstrates common methods with code examples, and shows how RocketMQ leverages CompletableFuture to coordinate asynchronous disk flush and replica synchronization tasks.

Su San Talks Tech
Su San Talks Tech
Su San Talks Tech
Mastering CompletableFuture: From Basics to RocketMQ Integration

Hello, I am Su San.

Recently I have been exploring the RocketMQ source code and noticed extensive use of CompletableFuture, so I will introduce JDK 1.8's asynchronous tool CompletableFuture and finally analyze its usage in RocketMQ.

Future Interface and Its Limitations

In Java, threads are usually created by extending Thread or implementing Runnable, both of which cannot return a result. Since JDK 1.5, Callable and Future (with its implementation FutureTask) were introduced to obtain asynchronous results.

Example using FutureTask:

FutureTask<String> futureTask = new FutureTask<>(() -> "三友");
new Thread(futureTask).start();
System.out.println(futureTask.get());

Example using a thread pool:

ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<String> future = executorService.submit(() -> "三友");
System.out.println(future.get());
executorService.shutdown();

The thread pool also wraps the submitted Callable into a FutureTask and executes it asynchronously.

Limitations of Future

Calling Future.get() blocks the main thread until the task completes. Polling with isDone() consumes CPU resources, and handling complex asynchronous workflows often requires additional synchronization components.

To overcome these drawbacks, JDK 1.8 introduced the more powerful CompletableFuture class.

What Is CompletableFuture?

CompletableFuture

implements both Future and CompletionStage. It adds observer‑style callbacks, allowing a subsequent stage to run automatically after the previous one finishes without blocking.

Common CompletableFuture APIs

1. Instantiating CompletableFuture

Constructor

CompletableFuture<String> completableFuture = new CompletableFuture<>();
System.out.println(completableFuture.get());

Another thread can complete it with:

completableFuture.complete("三友");

Static factory methods

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);
public static CompletableFuture<Void> runAsync(Runnable runnable);
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);
supplyAsync

returns a result, runAsync does not. If no executor is supplied, the default ForkJoinPool is used.

CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> "三友");
System.out.println(cf.get());

2. Obtaining Results

public T get();
public T get(long timeout, TimeUnit unit);
public T getNow(T valueIfAbsent);
public T join();
get()

blocks indefinitely, get(timeout, …) throws TimeoutException after the timeout, getNow returns a default value if not completed, and join() behaves like get() but wraps checked exceptions in CompletionException.

3. Manually Completing a Task

public boolean complete(T value);
public boolean completeExceptionally(Throwable ex);
complete

marks the task as successfully finished; completeExceptionally finishes it with an exception.

4. Processing the Result

Callbacks for successful completion

public <U> CompletionStage<U> thenApply(Function<? super T, ? extends U> fn);
public CompletableFuture<Void> thenRun(Runnable action);
public CompletionStage<Void> thenAccept(Consumer<? super T> action);

Examples:

CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> 10)
    .thenApply(v -> "Result: " + v);
System.out.println(cf.join());
CompletableFuture<Void> cf = CompletableFuture.supplyAsync(() -> 10)
    .thenRun(() -> System.out.println("Task finished"));
CompletableFuture<Void> cf = CompletableFuture.supplyAsync(() -> 10)
    .thenAccept(v -> System.out.println("Result: " + v));

Exception handling

public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn);

When an exception occurs, exceptionally provides a fallback value.

CompletableFuture<Integer> cf = CompletableFuture.supplyAsync(() -> 100)
    .exceptionally(e -> { System.out.println("Exception, use default"); return 110; });
System.out.println(cf.join());

Callbacks for both success and failure

public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);
handle

runs regardless of success or failure and can transform the result; whenComplete observes the outcome without altering it.

5. Combining Tasks

public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,
    BiFunction<? super T, ? super U, ? extends V> fn);

The two stages run independently; when both finish, the provided function combines their results.

6. Async‑suffixed Methods

public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor);
public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action);

These methods execute the next stage in a separate thread (default ForkJoinPool or a supplied executor).

CompletableFuture in RocketMQ

RocketMQ uses CompletableFuture to coordinate the disk‑flush task and the master‑slave replication task when storing a message.

RocketMQ CompletableFuture diagram
RocketMQ CompletableFuture diagram

Implementation:

PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
// Submit flush request
CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
// Submit replica request
CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);
// Combine both asynchronous tasks
return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
    if (flushStatus != PutMessageStatus.PUT_OK) {
        putMessageResult.setPutMessageStatus(flushStatus);
    }
    if (replicaStatus != PutMessageStatus.PUT_OK) {
        putMessageResult.setPutMessageStatus(replicaStatus);
    }
    return putMessageResult;
});

Listening to the combined result to measure storage latency:

long beginTime = this.getSystemClock().now();
CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg);
putResultFuture.thenAccept(result -> {
    long elapsedTime = this.getSystemClock().now() - beginTime;
    if (elapsedTime > 500) {
        log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);
    }
    this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
    if (result == null || !result.isOk()) {
        this.storeStatsService.getPutMessageFailedTimes().add(1);
    }
});

Advantages of CompletableFuture

Enables elegant asynchronous functional programming that is easy to maintain.

Provides built‑in exception management for asynchronous tasks.

Offers powerful task orchestration capabilities, allowing flexible composition of complex workflows.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

JavaconcurrencyCompletableFutureRocketMQasynchronous programmingFuture
Su San Talks Tech
Written by

Su San Talks Tech

Su San, former staff at several leading tech companies, is a top creator on Juejin and a premium creator on CSDN, and runs the free coding practice site www.susan.net.cn.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.