Understanding Java CompletionService: Prioritizing the First Completed Task
This article explains the purpose and implementation of Java's CompletionService, compares it with ExecutorService, walks through its source code and key methods, and demonstrates typical usage patterns for efficiently retrieving the earliest completed asynchronous results.
ExecutorService and CompletionService Comparison
Assume we have four tasks (A, B, C, D) whose execution times vary with input parameters. Submitting them to an ExecutorService is straightforward:
ExecutorService executorService = Executors.newFixedThreadPool(4);
List<Future> futures = new ArrayList<Future<Integer>>();
futures.add(executorService.submit(A));
futures.add(executorService.submit(B));
futures.add(executorService.submit(C));
futures.add(executorService.submit(D));
// Iterate the Future list and call get() to obtain each result
for (Future future : futures) {
Integer result = future.get();
// other business logic
}Using CompletionService for the same scenario looks almost identical:
ExecutorService executorService = Executors.newFixedThreadPool(4);
// ExecutorCompletionService is the only implementation of CompletionService
CompletionService<Integer> executorCompletionService = new ExecutorCompletionService<>(executorService);
List<Future> futures = new ArrayList<Future<Integer>>();
futures.add(executorCompletionService.submit(A));
futures.add(executorCompletionService.submit(B));
futures.add(executorCompletionService.submit(C));
futures.add(executorCompletionService.submit(D));
for (int i = 0; i < futures.size(); i++) {
Integer result = executorCompletionService.take().get();
// other business logic
}The two implementations are almost the same, but CompletionService solves a critical flaw of Future.get() : if a task has not finished, get() blocks the thread until the result is available, which can delay subsequent tasks.
"If the Future result is not completed, calling get() blocks the program until the result is returned."
When task A takes much longer than B, C, D, a plain Future approach blocks on A's get() , preventing the other tasks from proceeding. CompletionService eliminates this by returning results in the order they finish.
The service works like a message queue: completed tasks are placed into a blocking queue, and consumers take results from the queue, automatically receiving the earliest completed task.
"It is a service that decouples asynchronous task production from result consumption."
Key concepts:
Tasks are submitted as Runnable or Callable .
Results are represented by Future objects.
CompletionService Interface
The interface defines five methods:
Future<V> submit(Callable<V> task);
Future<V> submit(Runnable task, V result);
Future<V> take() throws InterruptedException;
Future<V> poll();
Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;The last three methods retrieve and remove the head of the internal blocking queue, differing in blocking behavior.
take : blocks if the queue is empty.
poll : returns null immediately when empty.
poll‑timeout : waits up to the specified timeout, then returns null if still empty.
Thus the API essentially provides two functionalities: submitting asynchronous tasks and retrieving completed results.
ExecutorCompletionService Implementation
It has two constructors, both requiring an Executor (usually a thread pool) and optionally a custom BlockingQueue . If no queue is supplied, a LinkedBlockingQueue is used.
private final Executor executor;
private final AbstractExecutorService aes;
private final BlockingQueue<Future<V>> completionQueue;
public ExecutorCompletionService(Executor executor) {
if (executor == null) throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null;
this.completionQueue = new LinkedBlockingQueue<>();
}
public ExecutorCompletionService(Executor executor, BlockingQueue<Future<V>> completionQueue) {
if (executor == null || completionQueue == null) throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ? (AbstractExecutorService) executor : null;
this.completionQueue = completionQueue;
}When a task is submitted, it is wrapped in a QueueingFuture which extends FutureTask . The overridden done() method places the completed task into the completionQueue :
protected void done() {
completionQueue.add(this);
}Consumers then call take() or poll() to obtain the earliest completed result.
Typical Use Cases
Dubbo's Forking Cluster.
Downloading from multiple mirrors and stopping the slower ones once the fastest copy finishes.
Aggregating responses from several services (e.g., weather APIs) and using the first successful result.
These scenarios benefit from both fast‑first result retrieval and a lightweight load‑balancing effect.
Discussion Questions
When processing results asynchronously, what precautions should be taken?
Would you choose an unbounded queue for the completion queue? Why or why not?
Feel free to like, share, and follow the author if the article helped you.
Code Ape Tech Column
Former Ant Group P8 engineer, pure technologist, sharing full‑stack Java, job interview and career advice through a column. Site: java-family.cn
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.