How to Retrieve Async Return Values with Java FutureTask?

This article explains how to obtain return values from asynchronous Java methods using FutureTask, covering its AQS foundation, execution flow, get() behavior, and provides concrete source code examples for practical implementation.

Su San Talks Tech
Su San Talks Tech
Su San Talks Tech
How to Retrieve Async Return Values with Java FutureTask?

1. Background

In the previous article we optimized a process from 191 seconds to 2 seconds using double async. A reader asked how to ensure data consistency after insertion.

The simple answer is to compare the number of rows in the Excel file with the number of rows inserted into the database.

How to obtain the return value of an asynchronous thread?

2. Getting async return values with Future

We can obtain the result by adding a Future return value to the asynchronous method. FutureTask implements both Future and Runnable, so it can be submitted to an Executor or executed directly by calling FutureTask.run().

1. FutureTask is based on AbstractQueuedSynchronizer (AQS)

AbstractQueuedSynchronizer (AQS) is a synchronization framework that provides a generic mechanism to atomically manage synchronization state, block and wake up threads, and maintain a queue of blocked threads. Synchronizers built on AQS include ReentrantLock, Semaphore, ReentrantReadWriteLock, CountDownLatch and FutureTask.

AQS‑based synchronizers support two operations:

acquire – blocks the calling thread until the AQS state permits it to proceed; in FutureTask this is the get() method.

release – changes the AQS state to a non‑blocked state; in FutureTask this is done via run() or cancel().

2. FutureTask execution flow

Execute the @Async method.

Create a new thread (e.g., async‑executor‑X) and run the Runnable part of the FutureTask.

Check the internal state.

If cancel() changes the AQS state, the first waiting thread (async‑executor‑1) is awakened.

When async‑executor‑1 is awakened:

The next waiting thread repeats the same steps.

Create a new thread (async‑executor‑N) to monitor the async method’s state:

3. get() method execution flow

The get() method checks the state; if the task is still running it spins, otherwise it returns the result or throws an exception.

If state ≤ COMPLETING, the task is still executing; calculate timeout, possibly park the thread, and enqueue a wait node.

If state > COMPLETING, the task has finished or been cancelled; clean up the wait node and return the state.

Handle interruption, timeout, and cancellation according to the state.

In short:

If the async thread has not finished, it enters a CAS spin.

Other threads obtain the result or cancellation, then wake the waiting threads in the CAS queue.

Finally get() checks the state and returns the result or an exception.

3. Detailed source analysis of FutureTask

1. FutureTask source

Key points: the class defines several integer state constants (NEW, COMPLETING, NORMAL, EXCEPTIONAL, CANCELLED, INTERRUPTING, INTERRUPTED) and implements the get(), awaitDone(), and report() methods that manage waiting, state transitions, and result retrieval.

public interface RunnableFuture<V> extends Runnable, Future<V> { /* ... */ }
public class FutureTask<V> implements RunnableFuture<V> {
    // state constants
    private volatile int state;
    private static final int NEW = 0;          // new state
    private static final int COMPLETING = 1;    // completing
    private static final int NORMAL = 2;       // normal completion
    private static final int EXCEPTIONAL = 3;   // exceptional completion
    private static final int CANCELLED = 4;     // cancelled
    private static final int INTERRUPTING = 5;  // interrupting
    private static final int INTERRUPTED = 6;   // interrupted
    // ...
    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }
    private int awaitDone(boolean timed, long nanos) throws InterruptedException { /* ... */ }
    private V report(int s) throws ExecutionException { /* ... */ }
}

2. Change async method return type to Future&lt;Integer&gt;

@Async("async-executor")
public void readXls(String filePath, String filename) {
    try {
        List<Future<Integer>> futureList = new ArrayList<>();
        for (int time = 0; time < times; time++) {
            Future<Integer> sumFuture = readExcelDataAsyncFutureService.readXlsCacheAsync();
            futureList.add(sumFuture);
        }
    } catch (Exception e) {
        logger.error("readXlsCacheAsync---insert error:", e);
    }
}
@Async("async-executor")
public Future<Integer> readXlsCacheAsync() {
    try {
        return new AsyncResult<>(sum);
    } catch (Exception e) {
        return new AsyncResult<>(0);
    }
}

3. Retrieve the return value with Future&lt;Integer&gt;.get()

public static boolean getFutureResult(List<Future<Integer>> futureList, int excelRow) {
    int[] futureSumArr = new int[futureList.size()];
    for (int i = 0; i < futureList.size(); i++) {
        try {
            Future<Integer> future = futureList.get(i);
            while (true) {
                if (future.isDone() && !future.isCancelled()) {
                    Integer futureSum = future.get();
                    logger.info("Got Future result: " + future + ", Result:" + futureSum);
                    futureSumArr[i] += futureSum;
                    break;
                } else {
                    logger.info("Future is running, waiting 3 seconds");
                    Thread.sleep(3000);
                }
            }
        } catch (Exception e) {
            logger.error("Error getting Future result:", e);
        }
    }
    boolean insertFlag = getInsertSum(futureSumArr, excelRow);
    logger.info("All Future results retrieved, insert result=" + insertFlag);
    return insertFlag;
}

4. Alternative: new thread + Future

public static Future<Boolean> getFutureResultThreadFuture(List<Future<Integer>> futureList, int excelRow) {
    ExecutorService service = Executors.newSingleThreadExecutor();
    final boolean[] insertFlag = {false};
    service.execute(new Runnable() {
        public void run() {
            try {
                insertFlag[0] = getFutureResult(futureList, excelRow);
            } catch (Exception e) {
                logger.error("New thread + Future error:", e);
                insertFlag[0] = false;
            }
        }
    });
    service.shutdown();
    return new AsyncResult<>(insertFlag[0]);
}

After obtaining the async results, you can wrap the database insertion in a transaction to guarantee data consistency. However, using Future may block the main thread, so a more optimal solution might be needed.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

JavaconcurrencyThreadAQSFutureFutureTaskAsync
Su San Talks Tech
Written by

Su San Talks Tech

Su San, former staff at several leading tech companies, is a top creator on Juejin and a premium creator on CSDN, and runs the free coding practice site www.susan.net.cn.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.