Mastering CompletableFuture: From Basics to RocketMQ Integration

This article explains the limitations of Java's Future, introduces CompletableFuture with its rich API for non‑blocking asynchronous programming, demonstrates practical usage including task creation, result retrieval, chaining, exception handling, and shows how RocketMQ leverages CompletableFuture to coordinate disk flush and replica sync tasks efficiently.

Su San Talks Tech
Su San Talks Tech
Su San Talks Tech
Mastering CompletableFuture: From Basics to RocketMQ Integration

Hello, I am Su San.

Future Interface and Its Limitations

We know thread creation via Thread or Runnable lacks return values, so JDK 1.5 introduced Callable, Future, and FutureTask to retrieve asynchronous results.

Example using FutureTask:

FutureTask<String> futureTask = new FutureTask<>(() -> "三友");
new Thread(futureTask).start();
System.out.println(futureTask.get());

Or using an executor service:

ExecutorService executorService = Executors.newSingleThreadExecutor();
Future<String> future = executorService.submit(() -> "三友");
System.out.println(future.get());
executorService.shutdown();

Limitations of Future

Future.get() blocks the main thread until the task completes, and polling with isDone() wastes CPU.

Therefore, JDK 1.8 introduced CompletableFuture, which adds callback chaining and non‑blocking composition.

What Is CompletableFuture?

CompletableFuture implements both Future and CompletionStage, providing observer‑style callbacks that trigger subsequent stages without blocking.

Common CompletableFuture APIs

1. Instantiating CompletableFuture

Constructor

CompletableFuture<String> cf = new CompletableFuture<>();
System.out.println(cf.get());
cf.complete("三友");

Static factory methods

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);
public static CompletableFuture<Void> runAsync(Runnable runnable);
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);

SupplyAsync returns a result, runAsync does not. If no executor is supplied, ForkJoinPool is used.

CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> "三友");
System.out.println(cf.get());

2. Getting results

public T get();
public T get(long timeout, TimeUnit unit);
public T getNow(T valueIfAbsent);
public T join();

get() blocks, get(timeout) throws TimeoutException, getNow returns a default if not completed, join() throws unchecked exceptions.

3. Completing tasks manually

public boolean complete(T value);
public boolean completeExceptionally(Throwable ex);

complete returns true if the task was not already completed; completeExceptionally signals an error.

4. Chaining callbacks

Callbacks for successful completion

public <U> CompletionStage<U> thenApply(Function<? super T, ? extends U> fn);
public CompletableFuture<Void> thenRun(Runnable action);
public CompletionStage<Void> thenAccept(Consumer<? super T> action);

thenApply transforms the result, thenRun runs a Runnable without the result, thenAccept consumes the result.

Examples:

CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> 10)
    .thenApply(v -> "Result: " + v);
System.out.println(cf.join());
CompletableFuture<Void> cf = CompletableFuture.supplyAsync(() -> 10)
    .thenRun(() -> System.out.println("Task finished"));
CompletableFuture<Void> cf = CompletableFuture.supplyAsync(() -> 10)
    .thenAccept(v -> System.out.println("Result: " + v));

Callbacks for exceptional completion

public <U> CompletionStage<U> exceptionally(Function<Throwable, ? extends U> fn);

exceptionally handles errors and can provide a fallback value.

Callbacks for both success and failure

public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> action);

handle is invoked in both cases, whenComplete receives the result or exception but does not swallow the exception.

5. Combining results

public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,
    BiFunction<? super T, ? super U, ? extends V> fn);

thenCombine runs when both stages complete and combines their results.

6. Async‑suffixed methods

Methods ending with Async (e.g., thenAcceptAsync) execute the next stage in a separate thread, optionally using a supplied executor.

CompletableFuture in RocketMQ

RocketMQ uses CompletableFuture to run the disk‑flush and replica‑sync tasks concurrently and combine their results.

RocketMQ flow
RocketMQ flow

Code example:

PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);
return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> {
    if (flushStatus != PutMessageStatus.PUT_OK) {
        putMessageResult.setPutMessageStatus(flushStatus);
    }
    if (replicaStatus != PutMessageStatus.PUT_OK) {
        putMessageResult.setPutMessageStatus(replicaStatus);
    }
    return putMessageResult;
});

The combined future is then observed to measure storage latency:

long beginTime = this.getSystemClock().now();
CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg);
putResultFuture.thenAccept(result -> {
    long elapsedTime = this.getSystemClock().now() - beginTime;
    if (elapsedTime > 500) {
        log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);
    }
    this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
    if (result == null || !result.isOk()) {
        this.storeStatsService.getPutMessageFailedTimes().add(1);
    }
});

Advantages of CompletableFuture

Enables functional asynchronous programming that is elegant and maintainable.

Provides built‑in exception handling for asynchronous tasks.

Offers powerful task orchestration capabilities.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

JavaconcurrencyCompletableFutureRocketMQAsyncFuture
Su San Talks Tech
Written by

Su San Talks Tech

Su San, former staff at several leading tech companies, is a top creator on Juejin and a premium creator on CSDN, and runs the free coding practice site www.susan.net.cn.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.