Mastering CompletableFuture: From Basics to RocketMQ Integration

This article introduces Java's CompletableFuture, compares it with Future, explains its core APIs, demonstrates common usage patterns—including creation, result retrieval, chaining, exception handling, and task combination—and shows a practical integration example within RocketMQ's message storage workflow.

Sanyou's Java Diary
Sanyou's Java Diary
Sanyou's Java Diary
Mastering CompletableFuture: From Basics to RocketMQ Integration

Future Interface and Its Limitations

In Java, creating a thread via Thread or implementing Runnable cannot return a result. Since JDK 1.5, Callable and Future (with FutureTask) were introduced to obtain asynchronous results.

FutureTask<String> futureTask = new FutureTask<>(() -> "三友");
new Thread(futureTask).start();
System.out.println(futureTask.get());

Using a thread pool:

ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<String> future = executorService.submit(() -> "三友");
System.out.println(future.get());
executorService.shutdown();

The Future API blocks the calling thread when get() is invoked, and polling with isDone() wastes CPU resources.

What Is CompletableFuture?

Introduced in JDK 1.8, CompletableFuture implements both Future and CompletionStage, providing callback‑style composition without blocking the main thread.

Common CompletableFuture APIs

1. Instantiating CompletableFuture

Constructor

CompletableFuture<String> completableFuture = new CompletableFuture<>();
System.out.println(completableFuture.get());

Another thread can complete it:

completableFuture.complete("三友");

Static factory methods

Typical factories are supplyAsync (returns a value) and runAsync (no return value). If no executor is supplied, the default ForkJoinPool is used.

CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> "三友");
System.out.println(cf.get());

2. Retrieving Results

public T get();
public T get(long timeout, TimeUnit unit);
public T getNow(T valueIfAbsent);
public T join();
get()

blocks indefinitely, get(timeout, …) throws TimeoutException after the timeout, getNow returns a default value if not completed, and join() behaves like get() but wraps exceptions in an unchecked form.

3. Manually Completing a Task

public boolean complete(T value);
public boolean completeExceptionally(Throwable ex);
complete

marks the task as successfully finished; completeExceptionally marks it as failed.

4. Processing the Result

Callbacks for successful completion

public <U> CompletionStage<U> thenApply(Function<? super T, ? extends U> fn);
public CompletableFuture<Void> thenRun(Runnable action);
public CompletionStage<Void> thenAccept(Consumer<? super T> action);

Examples:

CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> 10)
    .thenApply(v -> "Result: " + v);
System.out.println(cf.join());
CompletableFuture<Void> cf = CompletableFuture.supplyAsync(() -> 10)
    .thenRun(() -> System.out.println("Task finished"));
CompletableFuture<Void> cf = CompletableFuture.supplyAsync(() -> 10)
    .thenAccept(v -> System.out.println("Result: " + v));

Callback for exceptional completion

public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn);

It provides a fallback value when an exception occurs.

Callbacks for both success and failure

public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);
handle

receives the result or the exception and can return a new value; whenComplete observes the outcome without altering the original result.

5. Combining Tasks

public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,
    BiFunction<? super T,? super U,? extends V> fn);

The two tasks run independently; when both finish, the provided function combines their results.

6. Async‑suffixed Variants

Methods ending with Async (e.g., thenAcceptAsync) execute the next stage in a separate thread, optionally using a supplied executor.

CompletableFuture in RocketMQ

RocketMQ persists a message to disk and replicates it to a slave node as two independent asynchronous tasks. Their completion times are combined to measure total storage latency.

RocketMQ CompletableFuture diagram
RocketMQ CompletableFuture diagram

Implementation:

PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
// Submit flush request
CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
// Submit replica request
CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);
// Combine both async tasks
return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
    if (flushStatus != PutMessageStatus.PUT_OK) {
        putMessageResult.setPutMessageStatus(flushStatus);
    }
    if (replicaStatus != PutMessageStatus.PUT_OK) {
        putMessageResult.setPutMessageStatus(replicaStatus);
    }
    return putMessageResult;
});

Listening to the combined result to record latency:

long beginTime = this.getSystemClock().now();
CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg);
putResultFuture.thenAccept(result -> {
    long elapsedTime = this.getSystemClock().now() - beginTime;
    if (elapsedTime > 500) {
        log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);
    }
    this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
    if (result == null || !result.isOk()) {
        this.storeStatsService.getPutMessageFailedTimes().add(1);
    }
});

Advantages of CompletableFuture

Enables elegant asynchronous functional programming that is easy to maintain.

Provides built‑in exception management for asynchronous tasks.

Offers powerful task orchestration capabilities, allowing flexible composition of complex workflows.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

concurrencyCompletableFutureRocketMQAsync
Sanyou's Java Diary
Written by

Sanyou's Java Diary

Passionate about technology, though not great at solving problems; eager to share, never tire of learning!

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.