Must‑Know CompletableFuture: Usage, Best Practices, and Real‑World Scenarios
This article explains why CompletableFuture replaces traditional Future in high‑concurrency Java applications, demonstrates core APIs such as runAsync, supplyAsync, thenApply, allOf, anyOf, thenCombine, and exception handling, and provides detailed code examples—including serial, parallel, aggregation, and an e‑commerce order‑processing case—to guide robust asynchronous programming.
Why CompletableFuture?
Before CompletableFuture, Java asynchronous code relied on Thread, ThreadPoolExecutor and the Future interface. The traditional Future suffers from four major drawbacks: blocking get() calls that waste CPU, lack of callbacks, weak task orchestration that forces developers to use CountDownLatch or CyclicBarrier, and cumbersome exception handling that can only be done via try‑catch around get(). CompletableFuture implements both Future and CompletionStage, providing non‑blocking callbacks and rich composition methods, which the article shows improves response time by 15‑20% and increases throughput up to 1.5× in IO‑intensive benchmarks.
Basic API Usage
The core APIs are illustrated with a custom thread pool (CPU cores × 2) to avoid the pitfalls of the shared ForkJoinPool.commonPool(). Two creation methods are covered:
import java.util.concurrent.*;
public class CompletableFutureCreateDemo {
// Custom thread pool for IO‑bound tasks
private static final ThreadPoolExecutor CUSTOM_THREAD_POOL = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors() * 2,
30,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
r -> new Thread(r, "completable-future-thread-"),
new ThreadPoolExecutor.CallerRunsPolicy());
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 1. runAsync – no return value
CompletableFuture<Void> runFuture = CompletableFuture.runAsync(() -> {
try { TimeUnit.MILLISECONDS.sleep(300); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); }
System.out.println("No‑return async task completed");
}, CUSTOM_THREAD_POOL);
runFuture.get(); // avoid in production, use callbacks instead
// 2. supplyAsync – returns a value
CompletableFuture<String> supplyFuture = CompletableFuture.supplyAsync(() -> {
try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); }
String userId = "u1001";
return "User[" + userId + "] name: ZhangSan";
}, CUSTOM_THREAD_POOL);
String result = supplyFuture.get();
System.out.println("Result with return: " + result);
CUSTOM_THREAD_POOL.shutdown();
}
}Task Composition
CompletableFuture excels at orchestrating dependent, parallel, and aggregating tasks.
Serial Composition (thenApply, thenAccept, thenRun)
public class CompletableFutureSerialDemo {
private static final ThreadPoolExecutor CUSTOM_THREAD_POOL = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors() * 2, 30, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000), r -> new Thread(r, "serial-thread-"), new ThreadPoolExecutor.CallerRunsPolicy());
private static CompletableFuture<User> queryUser(String userId) {
return CompletableFuture.supplyAsync(() -> {
try { TimeUnit.MILLISECONDS.sleep(400); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); }
return new User(userId, "LiSi", 100);
}, CUSTOM_THREAD_POOL);
}
private static CompletableFuture<Integer> updateUserPoints(User user, int addPoints) {
return CompletableFuture.supplyAsync(() -> {
try { TimeUnit.MILLISECONDS.sleep(300); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); }
int newPoints = user.getPoints() + addPoints;
System.out.println("User[" + user.getUserId() + "] points: " + user.getPoints() + " → " + newPoints);
return newPoints;
}, CUSTOM_THREAD_POOL);
}
private static CompletableFuture<Void> recordLog(String userId, int newPoints) {
return CompletableFuture.runAsync(() -> {
try { TimeUnit.MILLISECONDS.sleep(200); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); }
System.out.println("Log: User[" + userId + "] points updated to " + newPoints);
}, CUSTOM_THREAD_POOL);
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
String userId = "u1002";
CompletableFuture<Void> serialFuture = queryUser(userId)
.thenApply(user -> updateUserPoints(user, 30).join())
.thenAccept(newPoints -> recordLog(userId, newPoints).join())
.thenRun(() -> System.out.println("User points update flow completed"));
serialFuture.get();
CUSTOM_THREAD_POOL.shutdown();
}
static class User {
private final String userId;
private final String userName;
private final int points;
User(String userId, String userName, int points) { this.userId = userId; this.userName = userName; this.points = points; }
String getUserId() { return userId; }
int getPoints() { return points; }
}
}Parallel Composition (allOf, anyOf)
public class CompletableFutureParallelDemo {
private static final ThreadPoolExecutor CUSTOM_THREAD_POOL = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors() * 2, 30, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000), r -> new Thread(r, "parallel-thread-"), new ThreadPoolExecutor.CallerRunsPolicy());
private static CompletableFuture<String> queryUserInfo(String userId) {
return CompletableFuture.supplyAsync(() -> { try { TimeUnit.MILLISECONDS.sleep(400); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); } return "UserInfo:userId=" + userId + ",userName=WangWu"; }, CUSTOM_THREAD_POOL);
}
private static CompletableFuture<String> queryProductInfo(String productId) {
return CompletableFuture.supplyAsync(() -> { try { TimeUnit.MILLISECONDS.sleep(300); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); } return "ProductInfo:productId=" + productId + ",productName=Notebook,price=4999"; }, CUSTOM_THREAD_POOL);
}
private static CompletableFuture<String> queryCouponInfo(String userId) {
return CompletableFuture.supplyAsync(() -> { try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); } return "CouponInfo:userId=" + userId + ",couponAmount=300,valid=true"; }, CUSTOM_THREAD_POOL);
}
private static CompletableFuture<String> multiSourceQuery() {
CompletableFuture<String> source1 = CompletableFuture.supplyAsync(() -> { try { TimeUnit.MILLISECONDS.sleep(200); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); } return "Source1 result"; }, CUSTOM_THREAD_POOL);
CompletableFuture<String> source2 = CompletableFuture.supplyAsync(() -> { try { TimeUnit.MILLISECONDS.sleep(150); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); } return "Source2 result"; }, CUSTOM_THREAD_POOL);
return CompletableFuture.anyOf(source1, source2).thenApply(r -> (String) r);
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
String userId = "u1003"; String productId = "p2001";
// allOf – wait for all parallel tasks
CompletableFuture<String> userFuture = queryUserInfo(userId);
CompletableFuture<String> productFuture = queryProductInfo(productId);
CompletableFuture<String> couponFuture = queryCouponInfo(userId);
CompletableFuture<Void> allFuture = CompletableFuture.allOf(userFuture, productFuture, couponFuture);
allFuture.thenRun(() -> {
try {
System.out.println("Parallel results:");
System.out.println(userFuture.get());
System.out.println(productFuture.get());
System.out.println(couponFuture.get());
} catch (Exception e) { e.printStackTrace(); }
}).get();
// anyOf – fastest source wins
String fastResult = multiSourceQuery().get();
System.out.println("Fastest multi‑source result: " + fastResult);
CUSTOM_THREAD_POOL.shutdown();
}
}Dependency Aggregation (thenCombine)
public class CompletableFutureCombineDemo {
private static final ThreadPoolExecutor CUSTOM_THREAD_POOL = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors() * 2, 30, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000), r -> new Thread(r, "combine-thread-"), new ThreadPoolExecutor.CallerRunsPolicy());
private static CompletableFuture<Double> calculateOrderAmount(int count, double price) {
return CompletableFuture.supplyAsync(() -> { try { TimeUnit.MILLISECONDS.sleep(300); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); } double amt = count * price; System.out.println("Order amount: " + amt); return amt; }, CUSTOM_THREAD_POOL);
}
private static CompletableFuture<Double> calculateDiscountAmount(double orderAmt, double rate) {
return CompletableFuture.supplyAsync(() -> { try { TimeUnit.MILLISECONDS.sleep(200); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); } double disc = Math.min(orderAmt * rate, 200); System.out.println("Discount: " + disc); return disc; }, CUSTOM_THREAD_POOL);
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
int count = 3; double price = 199.0; double rate = 0.15;
CompletableFuture<Double> payFuture = calculateOrderAmount(count, price)
.thenCombine(calculateDiscountAmount(count * price, rate), (order, disc) -> order - disc);
double pay = payFuture.get();
System.out.println("Final payment: " + pay);
CUSTOM_THREAD_POOL.shutdown();
}
}Exception Handling
Three patterns are demonstrated: exceptionally for fallback values, handle for unified result‑or‑exception processing, and whenComplete for side‑effects without altering the outcome.
public class CompletableFutureExceptionDemo {
private static final ThreadPoolExecutor CUSTOM_THREAD_POOL = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors() * 2, 30, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000), r -> new Thread(r, "exception-thread-"), new ThreadPoolExecutor.CallerRunsPolicy());
private static CompletableFuture<Integer> queryUserPoints(String userId) {
return CompletableFuture.supplyAsync(() -> {
try { TimeUnit.MILLISECONDS.sleep(300); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); }
if ("u9999".equals(userId)) { throw new RuntimeException("User[" + userId + "] not found"); }
return 150;
}, CUSTOM_THREAD_POOL);
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
// exceptionally – fallback to 0
CompletableFuture<Integer> exFuture = queryUserPoints("u9999").exceptionally(ex -> { System.err.println("Caught: " + ex.getMessage()); return 0; });
System.out.println("exceptionally result: " + exFuture.get());
// handle – custom message on error
CompletableFuture<String> handleFuture = queryUserPoints("u1004").handle((pts, ex) -> {
if (ex != null) { System.err.println("handle caught: " + ex.getMessage()); return "Query failed, default 0"; }
return "Points: " + pts;
});
System.out.println("handle result: " + handleFuture.get());
// whenComplete – log only
CompletableFuture<Integer> whenFuture = queryUserPoints("u1004").whenComplete((pts, ex) -> {
if (ex != null) System.err.println("whenComplete error: " + ex.getMessage());
else System.out.println("whenComplete success: " + pts);
});
System.out.println("whenComplete result: " + whenFuture.get());
CUSTOM_THREAD_POOL.shutdown();
}
}Real‑World E‑Commerce Order Processing Case
The article assembles all previously discussed techniques into a complete order‑processing pipeline. Four logical steps are required after an order is placed: order insertion, stock deduction (depends on insertion), payment result listening, and notification (depends on payment). Insertion and payment are executed in parallel, followed by serial dependencies. Custom thread pools isolate order‑related work from other services.
public class OrderAsyncProcessDemo {
private static final ThreadPoolExecutor ORDER_THREAD_POOL = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors() * 2, 30, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000), r -> new Thread(r, "order-process-thread-"), new ThreadPoolExecutor.CallerRunsPolicy());
// DAO stubs
static class OrderDAO { String insertOrder(String oid, String uid, String pid, int cnt) { try { TimeUnit.MILLISECONDS.sleep(300); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); } return "Order["+oid+"] inserted"; } }
static class StockDAO { String deductStock(String pid, int cnt) { try { TimeUnit.MILLISECONDS.sleep(200); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); } if ("p9999".equals(pid)) throw new RuntimeException("Product["+pid+"] out of stock"); return "Stock["+pid+"] deducted"; } }
static class PaymentService { String listenPaymentResult(String oid) { try { TimeUnit.MILLISECONDS.sleep(500); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); } if ("o8888".equals(oid)) return "Order["+oid+"] payment failed"; return "Order["+oid+"] payment success"; } }
static class MessageService { void sendNotification(String uid, String msg) { try { TimeUnit.MILLISECONDS.sleep(150); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RuntimeException(e); } System.out.println("Notify user["+uid+"]: " + msg); } }
static class OrderAsyncService {
private final OrderDAO orderDAO = new OrderDAO();
private final StockDAO stockDAO = new StockDAO();
private final PaymentService paymentService = new PaymentService();
private final MessageService messageService = new MessageService();
CompletableFuture<String> processOrder(String oid, String uid, String pid, int cnt) {
// Parallel: insert order & listen payment
CompletableFuture<String> insertFuture = CompletableFuture.supplyAsync(() -> orderDAO.insertOrder(oid, uid, pid, cnt), ORDER_THREAD_POOL);
CompletableFuture<String> paymentFuture = CompletableFuture.supplyAsync(() -> paymentService.listenPaymentResult(oid), ORDER_THREAD_POOL);
// Serial after insertion: stock deduction
CompletableFuture<String> stockFuture = insertFuture.thenApply(r -> stockDAO.deductStock(pid, cnt))
.exceptionally(ex -> { System.err.println("Stock error: " + ex.getMessage()); return "Stock error: " + ex.getMessage(); });
// Serial after payment: notification
CompletableFuture<Void> notifyFuture = paymentFuture.thenAccept(res -> messageService.sendNotification(uid, res))
.exceptionally(ex -> { System.err.println("Notify error: " + ex.getMessage()); return null; });
// Aggregate results
return CompletableFuture.allOf(stockFuture, notifyFuture)
.thenApply(v -> {
try {
String stockRes = stockFuture.get();
String payRes = paymentFuture.get();
if (payRes.contains("success") && stockRes.contains("deducted"))
return "Order["+oid+"] completed successfully";
else
return "Order["+oid+"] failed – " + stockRes + "; " + payRes;
} catch (Exception e) { return "Order["+oid+"] exception: " + e.getMessage(); }
});
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
OrderAsyncService svc = new OrderAsyncService();
// Normal case
System.out.println(svc.processOrder("o1001", "u1005", "p2002", 3).get());
// Stock shortage
System.out.println(svc.processOrder("o1002", "u1005", "p9999", 3).get());
// Payment failure
System.out.println(svc.processOrder("o8888", "u1005", "p2002", 3).get());
ORDER_THREAD_POOL.shutdown();
}
}Best Practices & Common Pitfalls
Prefer a custom ThreadPoolExecutor over the default ForkJoinPool; size it according to IO‑ or CPU‑bound characteristics.
Choose the right composition primitive: thenApply/thenAccept/thenRun for sequential steps, allOf/anyOf for parallel aggregation, thenCombine for two‑way dependency.
Always attach exception handling ( exceptionally, handle or whenComplete) to prevent silent failures.
Avoid blocking get() in production code; use callbacks or set explicit timeouts.
Manage thread pools as global singletons to avoid creation‑destruction overhead.
Conclusion
CompletableFuture resolves the blocking and orchestration shortcomings of the classic Future API, offering non‑blocking callbacks, flexible task composition, and comprehensive exception handling. When combined with properly sized custom thread pools and disciplined best‑practice guidelines, it dramatically improves throughput and latency in IO‑intensive, high‑concurrency Java services.
Architecture & Thinking
🍭 Frontline tech director and chief architect at top-tier companies 🥝 Years of deep experience in internet, e‑commerce, social, and finance sectors 🌾 Committed to publishing high‑quality articles covering core technologies of leading internet firms, application architecture, and AI breakthroughs.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
