Mastering Asynchronous Orchestration with Java CompletableFuture
This article explains how Java 8's CompletableFuture enables low‑overhead asynchronous composition, walks through its core methods, shows practical code examples for parallel data fetching, error handling, and task combination, and reveals implementation details from the source code.
Background
When the number of business projects grows, system services face increasing pressure, causing overall throughput to drop and latency to rise. Core interfaces must maintain high throughput and low latency, which can be achieved by parallelising independent data‑fetching and processing steps to fully utilise CPU resources.
Synchronous execution
Typical interfaces perform multiple database queries or RPC calls sequentially, so the total response time is the sum of each step (T = T1 + T2 + … + Tn). The sequential model wastes CPU cycles when the steps have no data dependency.
Asynchronous execution
Parallel data acquisition dramatically reduces the time spent on fetching and processing, turning a linear latency chain into a concurrent one.
CompletableFuture usage
The following example shows how to assemble a product‑detail response asynchronously. A fixed thread pool is created based on the number of available processors, and each independent sub‑task (SKU, SPU, brand, category) is launched with CompletableFuture.supplyAsync. The dependent tasks use thenApplyAsync to keep the same executor. Finally, CompletableFuture.allOf(...).get() blocks until all futures complete.
private static ThreadFactory namedThreadFactory = new ThreadFactoryBuilder()
.setNameFormat("product-pool-%d").build();
private static ExecutorService fixedThreadPool = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors() * 2,
Runtime.getRuntime().availableProcessors() * 40,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(Runtime.getRuntime().availableProcessors() * 20),
namedThreadFactory);
@Override
public SkuInfo getSkuDetail(Long skuId) {
SkuInfo skuInfo = new SkuInfo();
CompletableFuture<ProductSku> skuFuture = CompletableFuture.supplyAsync(() -> {
ProductSku sku = productSkuDAO.selectById(skuId);
skuInfo.setSku(sku);
return sku;
}, fixedThreadPool);
CompletableFuture<ProductSpu> spuFuture = skuFuture.thenApplyAsync(sku -> {
ProductSpu spu = productSpuDAO.selectById(sku.getSpuId());
skuInfo.setSpu(spu);
return spu;
}, fixedThreadPool);
CompletableFuture<BrandDTO> brandFuture = skuFuture.thenApplyAsync(sku -> {
BrandDTO brandDTO = brandService.getBrandDetail(sku.getBrandId());
skuInfo.setBrand(brandDTO);
return brandDTO;
}, fixedThreadPool);
CompletableFuture<CategoryDTO> categoryFuture = skuFuture.thenApplyAsync(sku -> {
CategoryDTO categoryDTO = categoryService.getCategoryDetail(sku.getCategoryId());
skuInfo.setCategory(categoryDTO);
return categoryDTO;
}, fixedThreadPool);
try {
CompletableFuture.allOf(skuFuture, spuFuture, brandFuture, categoryFuture).get();
} catch (Exception e) {
log.error("<======= Error while waiting for all tasks ======>", e);
}
return skuInfo;
}supplyAsync / runAsync
supplyAsynccreates a task with a return value (equivalent to ExecutorService.submit(Callable)); runAsync creates a task without a return value (equivalent to ExecutorService.submit(Runnable)). If no executor is supplied, the default is ForkJoinPool.commonPool(); on a single‑core machine the default becomes ThreadPerTaskExecutor, which creates a new thread for each task.
/** Test runAsync – no return value */
private static void testRunAsync() {
CompletableFuture.runAsync(() -> {
System.out.println("<======Current thread:" + Thread.currentThread().getName() + "=====Thread ID:" + Thread.currentThread().getId());
System.out.println("supplyAsync is daemon thread " + Thread.currentThread().isDaemon());
int result = 10 / 2;
System.out.println("Result: " + result);
}, fixedThreadPool);
}
/** Test supplyAsync – returns a value */
private static void testSupplyAsync() throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("<======Current thread:" + Thread.currentThread().getName() + "=====Thread ID:" + Thread.currentThread().getId());
int result = 10 / 2;
return result;
}, fixedThreadPool);
Integer res = future.get();
System.out.println("Returned result: " + res);
}thenApply / thenApplyAsync
thenApplyregisters a callback that receives the result of the preceding stage. The async variant ( thenApplyAsync) submits the callback to the provided executor, potentially running it on a different thread.
private static void testThenApplyAsync() throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("<======Current thread:" + Thread.currentThread().getName() + "=====Thread ID:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("Result: " + i);
try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); }
return i;
}, executor);
CompletableFuture<String> future2 = future1.thenApplyAsync(res -> {
System.out.println("======Task 2 started..." + res * 20);
return "Hello" + res;
}, executor);
CompletableFuture<Void> future3 = CompletableFuture.runAsync(() -> {
System.out.println("======Task 3 executed");
}, executor);
CompletableFuture.allOf(future1, future2, future3).get();
System.out.println("=======Test finished");
}exceptionally / whenComplete / handle
exceptionallyruns only when the stage completes exceptionally and receives the thrown exception. whenComplete runs after completion regardless of success or failure, receiving both the result (or null) and the exception (or null). handle combines both behaviours, allowing transformation of the result or provision of a fallback value.
private static void testWhenCompleteAndExceptionally() throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("<======Current thread:" + Thread.currentThread().getName() + "=====Thread ID:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("Result: " + i);
return i;
}, executor)
.whenComplete((res, ex) -> {
System.out.println("<=====Task succeeded, result: " + res + "; exception: " + ex);
})
.exceptionally(throwable -> {
System.out.println("<=====Task threw exception: " + throwable);
return 10; // default value
});
Integer result = future.get();
System.out.println("<=====Final result=" + result + "=====>");
}
private static void testHandle() {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
System.out.println("<======Current thread:" + Thread.currentThread().getName() + "=====Thread ID:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("Result: " + i);
return i;
}, executor).handle((result, thr) -> {
if (result != null) {
return result * 2;
}
if (thr != null) {
System.out.println("Async task completed with exception: " + thr);
return 0;
}
return 0;
});
}Combining futures – thenCombine / thenAcceptBoth / runAfterBoth
These methods wait for two CompletableFuture instances to finish. thenCombine passes both results to a function and returns a new value; thenAcceptBoth consumes both results without returning a value; runAfterBoth runs a runnable after both complete and receives no inputs. If either stage completes exceptionally, the exception propagates to the combined stage.
private static void thenCombine() throws Exception {
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "hello1", fixedThreadPool);
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> "hello2", fixedThreadPool);
CompletableFuture<String> result = f1.thenCombine(f2, (t, u) -> t + " " + u);
System.out.println(result.get());
}
private static void thenAcceptBoth() throws Exception {
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {
int t = new Random().nextInt(3);
TimeUnit.SECONDS.sleep(t);
System.out.println("f1=" + t);
return t;
}, fixedThreadPool);
CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(() -> {
int t = new Random().nextInt(3);
TimeUnit.SECONDS.sleep(t);
System.out.println("f2=" + t);
return t;
}, fixedThreadPool);
f1.thenAcceptBothAsync(f2, (a, b) -> System.out.println("Both results: " + a + ", " + b), fixedThreadPool);
}applyToEither / acceptEither / runAfterEither
These methods trigger when the first of two futures completes. applyToEither receives the winner's result and returns a new value; acceptEither receives the result but returns nothing; runAfterEither runs a runnable with no inputs. If the winning future completed exceptionally, the exception propagates.
private static void applyToEither() throws Exception {
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> new Random().nextInt(3), fixedThreadPool);
CompletableFuture<Integer> f2 = CompletableFuture.supplyAsync(() -> new Random().nextInt(3), fixedThreadPool);
CompletableFuture<Integer> result = f1.applyToEither(f2, t -> {
System.out.println("applyEither:" + t);
return t * 2;
});
}allOf / anyOf
allOfreturns a new CompletableFuture that completes only when all supplied futures finish successfully; if any stage fails, the resulting future completes exceptionally. anyOf completes when the first supplied future completes, propagating its result or exception.
private static void testAllOf() throws ExecutionException, InterruptedException {
CompletableFuture<Integer> f1 = CompletableFuture.supplyAsync(() -> {
System.out.println("<======Current thread:" + Thread.currentThread().getName() + "=====Thread ID:" + Thread.currentThread().getId());
int i = 10 / 2;
System.out.println("Result: " + i);
TimeUnit.SECONDS.sleep(5);
return i;
}, executor);
CompletableFuture<String> f2 = f1.thenApplyAsync(res -> {
System.out.println("======Task 2 started..." + res * 20);
return "Hello" + res;
}, executor);
CompletableFuture<Void> f3 = CompletableFuture.runAsync(() -> {
System.out.println("======Task 3 executed");
}, executor);
CompletableFuture.allOf(f1, f2, f3).get();
System.out.println("=======Test finished");
}A common pitfall is that CompletableFuture tasks run in daemon threads. If the main thread finishes without calling get() or otherwise waiting, the JVM may exit before daemon threads complete, causing some async tasks never to execute.
Implementation details
Reading the source code shows that CompletableFuture maintains two fields: result , which stores the outcome of the future, and stack , a Treiber‑stack of Completion objects representing dependent actions that must be triggered once the future completes. For a deeper dive, see the Meituan technical article "CompletableFuture原理与实践" (https://mp.weixin.qq.com/s?__biz=MjM5NjQ5MTI5OA==∣=2651768670&idx=2&sn=c9eaef84499409bb402b37f2e51d9d88&scene=21#wechat_redirect).
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Shepherd Advanced Notes
Dedicated to sharing advanced Java technical insights, daily work snippets, and the power of persistent effort.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
