Mastering Asynchronous Orchestration with Java CompletableFuture

This article explains how Java 8's CompletableFuture enables low‑overhead asynchronous composition, walks through its core methods, shows practical code examples for parallel data fetching, error handling, and task combination, and reveals implementation details from the source code.

Shepherd Advanced Notes
Shepherd Advanced Notes
Shepherd Advanced Notes
Mastering Asynchronous Orchestration with Java CompletableFuture

Background

When the number of business projects grows, system services face increasing pressure, causing overall throughput to drop and latency to rise. Core interfaces must maintain high throughput and low latency, which can be achieved by parallelising independent data‑fetching and processing steps to fully utilise CPU resources.

Synchronous execution

Typical interfaces perform multiple database queries or RPC calls sequentially, so the total response time is the sum of each step (T = T1 + T2 + … + Tn). The sequential model wastes CPU cycles when the steps have no data dependency.

Asynchronous execution

Parallel data acquisition dramatically reduces the time spent on fetching and processing, turning a linear latency chain into a concurrent one.

CompletableFuture usage

The following example shows how to assemble a product‑detail response asynchronously. A fixed thread pool is created based on the number of available processors, and each independent sub‑task (SKU, SPU, brand, category) is launched with CompletableFuture.supplyAsync. The dependent tasks use thenApplyAsync to keep the same executor. Finally, CompletableFuture.allOf(...).get() blocks until all futures complete.

private static ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
        .setNameFormat("product-pool-%d").build();

private static ExecutorService fixedThreadPool = new ThreadPoolExecutor(
        Runtime.getRuntime().availableProcessors() * 2,
        Runtime.getRuntime().availableProcessors() * 40,
        0L,
        TimeUnit.MILLISECONDS,
        new LinkedBlockingQueue<Runnable>(Runtime.getRuntime().availableProcessors() * 20),
        namedThreadFactory);

@Override
public SkuInfo getSkuDetail(Long skuId) {
    SkuInfo skuInfo = new SkuInfo();
    CompletableFuture<ProductSku> skuFuture = CompletableFuture.supplyAsync(() -> {
        ProductSku sku = productSkuDAO.selectById(skuId);
        skuInfo.setSku(sku);
        return sku;
    }, fixedThreadPool);
    CompletableFuture<ProductSpu> spuFuture = skuFuture.thenApplyAsync(sku -> {
        ProductSpu spu = productSpuDAO.selectById(sku.getSpuId());
        skuInfo.setSpu(spu);
        return spu;
    }, fixedThreadPool);
    CompletableFuture<BrandDTO> brandFuture = skuFuture.thenApplyAsync(sku -> {
        BrandDTO brandDTO = brandService.getBrandDetail(sku.getBrandId());
        skuInfo.setBrand(brandDTO);
        return brandDTO;
    }, fixedThreadPool);
    CompletableFuture<CategoryDTO> categoryFuture = skuFuture.thenApplyAsync(sku -> {
        CategoryDTO categoryDTO = categoryService.getCategoryDetail(sku.getCategoryId());
        skuInfo.setCategory(categoryDTO);
        return categoryDTO;
    }, fixedThreadPool);
    try {
        CompletableFuture.allOf(skuFuture, spuFuture, brandFuture, categoryFuture).get();
    } catch (Exception e) {
        log.error("<======= Error while waiting for all tasks ======>", e);
    }
    return skuInfo;
}

supplyAsync / runAsync

supplyAsync

creates a task with a return value (equivalent to ExecutorService.submit(Callable)); runAsync creates a task without a return value (equivalent to ExecutorService.submit(Runnable)). If no executor is supplied, the default is ForkJoinPool.commonPool(); on a single‑core machine the default becomes ThreadPerTaskExecutor, which creates a new thread for each task.

/** Test runAsync – no return value */
private static void testRunAsync() {
    CompletableFuture.runAsync(() -> {
        System.out.println("<======Current thread:" + Thread.currentThread().getName() + "=====Thread ID:" + Thread.currentThread().getId());
        System.out.println("supplyAsync is daemon thread " + Thread.currentThread().isDaemon());
        int result = 10 / 2;
        System.out.println("Result: " + result);
    }, fixedThreadPool);
}

/** Test supplyAsync – returns a value */
private static void testSupplyAsync() throws ExecutionException, InterruptedException {
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
        System.out.println("<======Current thread:" + Thread.currentThread().getName() + "=====Thread ID:" + Thread.currentThread().getId());
        int result = 10 / 2;
        return result;
    }, fixedThreadPool);
    Integer res = future.get();
    System.out.println("Returned result: " + res);
}

thenApply / thenApplyAsync

thenApply

registers a callback that receives the result of the preceding stage. The async variant ( thenApplyAsync) submits the callback to the provided executor, potentially running it on a different thread.

private static void testThenApplyAsync() throws ExecutionException, InterruptedException {
    CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
        System.out.println("<======Current thread:" + Thread.currentThread().getName() + "=====Thread ID:" + Thread.currentThread().getId());
        int i = 10 / 2;
        System.out.println("Result: " + i);
        try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); }
        return i;
    }, executor);
    CompletableFuture<String> future2 = future1.thenApplyAsync(res -> {
        System.out.println("======Task 2 started..." + res * 20);
        return "Hello" + res;
    }, executor);
    CompletableFuture<Void> future3 = CompletableFuture.runAsync(() -> {
        System.out.println("======Task 3 executed");
    }, executor);
    CompletableFuture.allOf(future1, future2, future3).get();
    System.out.println("=======Test finished");
}

exceptionally / whenComplete / handle

exceptionally

runs only when the stage completes exceptionally and receives the thrown exception. whenComplete runs after completion regardless of success or failure, receiving both the result (or null) and the exception (or null). handle combines both behaviours, allowing transformation of the result or provision of a fallback value.

private static void testWhenCompleteAndExceptionally() throws ExecutionException, InterruptedException {
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
        System.out.println("<======Current thread:" + Thread.currentThread().getName() + "=====Thread ID:" + Thread.currentThread().getId());
        int i = 10 / 2;
        System.out.println("Result: " + i);
        return i;
    }, executor)
    .whenComplete((res, ex) -> {
        System.out.println("<=====Task succeeded, result: " + res + "; exception: " + ex);
    })
    .exceptionally(throwable -> {
        System.out.println("<=====Task threw exception: " + throwable);
        return 10; // default value
    });
    Integer result = future.get();
    System.out.println("<=====Final result=" + result + "=====>");
}

private static void testHandle() {
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
        System.out.println("<======Current thread:" + Thread.currentThread().getName() + "=====Thread ID:" + Thread.currentThread().getId());
        int i = 10 / 2;
        System.out.println("Result: " + i);
        return i;
    }, executor).handle((result, thr) -> {
        if (result != null) {
            return result * 2;
        }
        if (thr != null) {
            System.out.println("Async task completed with exception: " + thr);
            return 0;
        }
        return 0;
    });
}

Combining futures – thenCombine / thenAcceptBoth / runAfterBoth

These methods wait for two CompletableFuture instances to finish. thenCombine passes both results to a function and returns a new value; thenAcceptBoth consumes both results without returning a value; runAfterBoth runs a runnable after both complete and receives no inputs. If either stage completes exceptionally, the exception propagates to the combined stage.

private static void thenCombine() throws Exception {
    CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "hello1", fixedThreadPool);
    CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> "hello2", fixedThreadPool);
    CompletableFuture<String> result = f1.thenCombine(f2, (t, u) -> t + " " + u);
    System.out.println(result.get());
}

private static void thenAcceptBoth() throws Exception {
    CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {
        int t = new Random().nextInt(3);
        TimeUnit.SECONDS.sleep(t);
        System.out.println("f1=" + t);
        return t;
    }, fixedThreadPool);
    CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(() -> {
        int t = new Random().nextInt(3);
        TimeUnit.SECONDS.sleep(t);
        System.out.println("f2=" + t);
        return t;
    }, fixedThreadPool);
    f1.thenAcceptBothAsync(f2, (a, b) -> System.out.println("Both results: " + a + ", " + b), fixedThreadPool);
}

applyToEither / acceptEither / runAfterEither

These methods trigger when the first of two futures completes. applyToEither receives the winner's result and returns a new value; acceptEither receives the result but returns nothing; runAfterEither runs a runnable with no inputs. If the winning future completed exceptionally, the exception propagates.

private static void applyToEither() throws Exception {
    CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> new Random().nextInt(3), fixedThreadPool);
    CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(() -> new Random().nextInt(3), fixedThreadPool);
    CompletableFuture<Integer> result = f1.applyToEither(f2, t -> {
        System.out.println("applyEither:" + t);
        return t * 2;
    });
}

allOf / anyOf

allOf

returns a new CompletableFuture that completes only when all supplied futures finish successfully; if any stage fails, the resulting future completes exceptionally. anyOf completes when the first supplied future completes, propagating its result or exception.

private static void testAllOf() throws ExecutionException, InterruptedException {
    CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {
        System.out.println("<======Current thread:" + Thread.currentThread().getName() + "=====Thread ID:" + Thread.currentThread().getId());
        int i = 10 / 2;
        System.out.println("Result: " + i);
        TimeUnit.SECONDS.sleep(5);
        return i;
    }, executor);
    CompletableFuture<String> f2 = f1.thenApplyAsync(res -> {
        System.out.println("======Task 2 started..." + res * 20);
        return "Hello" + res;
    }, executor);
    CompletableFuture<Void> f3 = CompletableFuture.runAsync(() -> {
        System.out.println("======Task 3 executed");
    }, executor);
    CompletableFuture.allOf(f1, f2, f3).get();
    System.out.println("=======Test finished");
}

A common pitfall is that CompletableFuture tasks run in daemon threads. If the main thread finishes without calling get() or otherwise waiting, the JVM may exit before daemon threads complete, causing some async tasks never to execute.

Implementation details

Reading the source code shows that CompletableFuture maintains two fields: result , which stores the outcome of the future, and stack , a Treiber‑stack of Completion objects representing dependent actions that must be triggered once the future completes. For a deeper dive, see the Meituan technical article "CompletableFuture原理与实践" (https://mp.weixin.qq.com/s?__biz=MjM5NjQ5MTI5OA==∣=2651768670&idx=2&sn=c9eaef84499409bb402b37f2e51d9d88&scene=21#wechat_redirect).

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

JavaconcurrencyThreadPoolAsynchronousCompletableFutureJava8
Shepherd Advanced Notes
Written by

Shepherd Advanced Notes

Dedicated to sharing advanced Java technical insights, daily work snippets, and the power of persistent effort.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.