Understanding Java 8 CompletionStage and CompletableFuture with Practical Examples
This article explains Java 8's CompletionStage API and its standard library implementation CompletableFuture, demonstrating its behavior through a series of concise examples that cover creation, chaining, asynchronous execution, custom executors, exception handling, cancellation, and composition of multiple stages.
In Java asynchronous programming you don't have to rely on RxJava; the standard library's CompletableFuture (which implements CompletionStage) can handle most scenarios. The article introduces the CompletionStage contract, describing it as a pipeline stage that can be completed synchronously or asynchronously.
1. Creating a Completed CompletableFuture
The simplest way is to use CompletableFuture.completedFuture with a predefined result.
static void completedFutureExample() {
CompletableFuture cf = CompletableFuture.completedFuture("message");
assertTrue(cf.isDone());
assertEquals("message", cf.getNow(null));
}2. Running a Simple Asynchronous Stage
Using runAsync creates a stage that runs in a daemon thread from the ForkJoinPool.
static void runAsyncExample() {
CompletableFuture cf = CompletableFuture.runAsync(() -> {
assertTrue(Thread.currentThread().isDaemon());
randomSleep();
});
assertFalse(cf.isDone());
sleepEnough();
assertTrue(cf.isDone());
}3. Applying a Function to the Previous Stage
thenApplytransforms the result of the previous stage synchronously.
static void thenApplyExample() {
CompletableFuture cf = CompletableFuture.completedFuture("message").thenApply(s -> {
assertFalse(Thread.currentThread().isDaemon());
return s.toUpperCase();
});
assertEquals("MESSAGE", cf.getNow(null));
}4. Asynchronously Applying a Function
Appending Async runs the function in the common ForkJoinPool.
static void thenApplyAsyncExample() {
CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(s -> {
assertTrue(Thread.currentThread().isDaemon());
randomSleep();
return s.toUpperCase();
});
assertNull(cf.getNow(null));
assertEquals("MESSAGE", cf.join());
}5. Using a Custom Executor
You can supply an Executor to control the thread pool.
static ExecutorService executor = Executors.newFixedThreadPool(3, new ThreadFactory() {
int count = 1;
@Override
public Thread newThread(Runnable runnable) {
return new Thread(runnable, "custom-executor-" + count++);
}
});
static void thenApplyAsyncWithExecutorExample() {
CompletableFuture cf = CompletableFuture.completedFuture("message").thenApplyAsync(s -> {
assertTrue(Thread.currentThread().getName().startsWith("custom-executor-"));
assertFalse(Thread.currentThread().isDaemon());
randomSleep();
return s.toUpperCase();
}, executor);
assertNull(cf.getNow(null));
assertEquals("MESSAGE", cf.join());
}6. Consuming a Result
thenAcceptconsumes the previous result without returning a value.
static void thenAcceptExample() {
StringBuilder result = new StringBuilder();
CompletableFuture.completedFuture("thenAccept message")
.thenAccept(s -> result.append(s));
assertTrue("Result was empty", result.length() > 0);
}7. Asynchronously Consuming a Result
thenAcceptAsyncperforms the consumer asynchronously.
static void thenAcceptAsyncExample() {
StringBuilder result = new StringBuilder();
CompletableFuture cf = CompletableFuture.completedFuture("thenAcceptAsync message")
.thenAcceptAsync(s -> result.append(s));
cf.join();
assertTrue("Result was empty", result.length() > 0);
}8. Completing a Computation Exceptionally
Use completeExceptionally to signal failure and handle it with handle or exceptionally.
static void completeExceptionallyExample() {
CompletableFuture cf = CompletableFuture.completedFuture("message")
.thenApplyAsync(String::toUpperCase, CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS));
CompletableFuture exceptionHandler = cf.handle((s, th) -> th != null ? "message upon cancel" : "");
cf.completeExceptionally(new RuntimeException("completed exceptionally"));
assertTrue("Was not completed exceptionally", cf.isCompletedExceptionally());
try {
cf.join();
fail("Should have thrown an exception");
} catch (CompletionException ex) {
assertEquals("completed exceptionally", ex.getCause().getMessage());
}
assertEquals("message upon cancel", exceptionHandler.join());
}9. Cancelling a Computation
Calling cancel is equivalent to completing exceptionally with a CancellationException.
static void cancelExample() {
CompletableFuture cf = CompletableFuture.completedFuture("message")
.thenApplyAsync(String::toUpperCase, CompletableFuture.delayedExecutor(1, TimeUnit.SECONDS));
CompletableFuture cf2 = cf.exceptionally(throwable -> "canceled message");
assertTrue("Was not canceled", cf.cancel(true));
assertTrue("Was not completed exceptionally", cf.isCompletedExceptionally());
assertEquals("canceled message", cf2.join());
}10. Applying a Function to Either of Two Completed Stages
applyToEitherruns the function on whichever stage finishes first.
static void applyToEitherExample() {
String original = "Message";
CompletableFuture cf1 = CompletableFuture.completedFuture(original)
.thenApplyAsync(s -> delayedUpperCase(s));
CompletableFuture cf2 = cf1.applyToEither(
CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)),
s -> s + " from applyToEither");
assertTrue(cf2.join().endsWith(" from applyToEither"));
}11. Consuming Either Result
acceptEitherconsumes the result of whichever stage finishes first.
static void acceptEitherExample() {
String original = "Message";
StringBuilder result = new StringBuilder();
CompletableFuture cf = CompletableFuture.completedFuture(original)
.thenApplyAsync(s -> delayedUpperCase(s))
.acceptEither(CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)),
s -> result.append(s).append("acceptEither"));
cf.join();
assertTrue("Result was empty", result.toString().endsWith("acceptEither"));
}12. Running a Runnable After Both Stages Complete
runAfterBothexecutes a Runnable after two stages finish.
static void runAfterBothExample() {
String original = "Message";
StringBuilder result = new StringBuilder();
CompletableFuture.completedFuture(original).thenApply(String::toUpperCase)
.runAfterBoth(CompletableFuture.completedFuture(original).thenApply(String::toLowerCase),
() -> result.append("done"));
assertTrue("Result was empty", result.length() > 0);
}13. Using BiConsumer to Process Two Results
thenAcceptBothcombines two results with a BiConsumer.
static void thenAcceptBothExample() {
String original = "Message";
StringBuilder result = new StringBuilder();
CompletableFuture.completedFuture(original).thenApply(String::toUpperCase)
.thenAcceptBoth(CompletableFuture.completedFuture(original).thenApply(String::toLowerCase),
(s1, s2) -> result.append(s1 + s2));
assertEquals("MESSAGEmessage", result.toString());
}14. Using BiFunction to Combine Two Results
thenCombinemerges two stages and returns a new result.
static void thenCombineExample() {
String original = "Message";
CompletableFuture cf = CompletableFuture.completedFuture(original)
.thenApply(s -> delayedUpperCase(s))
.thenCombine(CompletableFuture.completedFuture(original).thenApply(s -> delayedLowerCase(s)),
(s1, s2) -> s1 + s2);
assertEquals("MESSAGEmessage", cf.getNow(null));
}15. Asynchronously Combining with BiFunction
Even without an Async suffix, the combination can be asynchronous if preceding stages are async.
static void thenCombineAsyncExample() {
String original = "Message";
CompletableFuture cf = CompletableFuture.completedFuture(original)
.thenApplyAsync(s -> delayedUpperCase(s))
.thenCombine(CompletableFuture.completedFuture(original).thenApplyAsync(s -> delayedLowerCase(s)),
(s1, s2) -> s1 + s2);
assertEquals("MESSAGEmessage", cf.join());
}16. Composing CompletableFutures
thenComposeflattens nested futures, passing the result of the first stage to a function that returns another future.
static void thenComposeExample() {
String original = "Message";
CompletableFuture cf = CompletableFuture.completedFuture(original)
.thenApply(s -> delayedUpperCase(s))
.thenCompose(upper -> CompletableFuture.completedFuture(original)
.thenApply(s -> delayedLowerCase(s))
.thenApply(s -> upper + s));
assertEquals("MESSAGEmessage", cf.join());
}17. anyOf – Completing When Any Stage Finishes
anyOfcreates a future that completes when the first of the supplied futures completes.
static void anyOfExample() {
StringBuilder result = new StringBuilder();
List messages = Arrays.asList("a", "b", "c");
List futures = messages.stream()
.map(msg -> CompletableFuture.completedFuture(msg).thenApply(s -> delayedUpperCase(s)))
.collect(Collectors.toList());
CompletableFuture.anyOf(futures.toArray(new CompletableFuture[futures.size()]))
.whenComplete((res, th) -> {
if (th == null) {
assertTrue(isUpperCase((String) res));
result.append(res);
}
});
assertTrue("Result was empty", result.length() > 0);
}18‑19. allOf and allOfAsync – Completing After All Stages Finish
allOfwaits for every supplied future; the async variant runs the waiting logic in the common pool.
static void allOfExample() {
StringBuilder result = new StringBuilder();
List messages = Arrays.asList("a", "b", "c");
List futures = messages.stream()
.map(msg -> CompletableFuture.completedFuture(msg).thenApply(s -> delayedUpperCase(s)))
.collect(Collectors.toList());
CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
.whenComplete((v, th) -> {
futures.forEach(cf -> assertTrue(isUpperCase(cf.getNow(null))));
result.append("done");
});
assertTrue("Result was empty", result.length() > 0);
}
static void allOfAsyncExample() {
StringBuilder result = new StringBuilder();
List messages = Arrays.asList("a", "b", "c");
List futures = messages.stream()
.map(msg -> CompletableFuture.completedFuture(msg).thenApplyAsync(s -> delayedUpperCase(s)))
.collect(Collectors.toList());
CompletableFuture all = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()]))
.whenComplete((v, th) -> {
futures.forEach(cf -> assertTrue(isUpperCase(cf.getNow(null))));
result.append("done");
});
all.join();
assertTrue("Result was empty", result.length() > 0);
}20. Real‑World Example
The final example shows how to fetch a list of cars asynchronously, retrieve each car's rating with another async call, combine the results using allOf, and finally print each car with its rating.
cars().thenCompose(cars -> {
List updatedCars = cars.stream()
.map(car -> rating(car.manufacturerId).thenApply(r -> {
car.setRating(r);
return car;
}))
.collect(Collectors.toList());
CompletableFuture done = CompletableFuture.allOf(updatedCars.toArray(new CompletableFuture[updatedCars.size()]));
return done.thenApply(v -> updatedCars.stream()
.map(CompletionStage::toCompletableFuture)
.map(CompletableFuture::join)
.collect(Collectors.toList()));
}).whenComplete((cars, th) -> {
if (th == null) {
cars.forEach(System.out::println);
} else {
throw new RuntimeException(th);
}
}).toCompletableFuture().join();This comprehensive guide demonstrates how CompletableFuture can simplify asynchronous workflows, improve performance, and keep code readable.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Code Ape Tech Column
Former Ant Group P8 engineer, pure technologist, sharing full‑stack Java, job interview and career advice through a column. Site: java-family.cn
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
