Mastering CompletableFuture: Asynchronous Callbacks and Source Code Deep Dive

This article explains Java's CompletableFuture—its purpose, key features like thenCompose, thenCombine, thenAccept, thenRun, thenApply, and provides a detailed walkthrough of its internal implementation, including code examples and the mechanics of task dependency handling.

Java High-Performance Architecture
Java High-Performance Architecture
Java High-Performance Architecture
Mastering CompletableFuture: Asynchronous Callbacks and Source Code Deep Dive

1. Overview

CompletableFuture, introduced in JDK 1.8, implements Future and CompletionStage, allowing operations to be triggered when a task completes, essentially providing asynchronous callbacks.

2. Why CompletableFuture was introduced

JDK 1.5's Future required blocking or polling to obtain results, which is inelegant. Registering callbacks avoids blocking, following the observer pattern (e.g., Netty's ChannelFuture).

Netty's ChannelFuture

public Promise<V> addListener(GenericFutureListener<? extends Future<? super V>> listener) {
    checkNotNull(listener, "listener");
    synchronized (this) {
        addListener0(listener);
    }
    if (isDone()) {
        notifyListeners();
    }
    return this;
}

private boolean setValue0(Object objResult) {
    if (RESULT_UPDATER.compareAndSet(this, null, objResult) ||
        RESULT_UPDATER.compareAndSet(this, UNCANCELLABLE, objResult)) {
        if (checkNotifyWaiters()) {
            notifyListeners();
        }
        return true;
    }
    return false;
}

CompletableFuture extends Future and introduces functional programming style callbacks to handle results.

3. Features

CompletableFuture’s capabilities are exposed through CompletionStage methods:

thenCompose (transform)

thenCombine (combine)

thenAccept (consume)

thenRun (run)

thenApply (consume with return)

The difference between accept and run is that accept uses the result of the previous stage, while run simply executes a task.

CompletableFuture can chain calls via CompletionStage methods, choosing synchronous or asynchronous execution.

Simple example

public static void thenApply() {
    ExecutorService executorService = Executors.newFixedThreadPool(2);
    CompletableFuture cf = CompletableFuture.supplyAsync(() -> {
        try {
            // Thread.sleep(2000);
        } catch (Exception e) {
            e.printStackTrace();
        }
        System.out.println("supplyAsync " + Thread.currentThread().getName());
        return "hello";
    }, executorService).thenApplyAsync(s -> {
        System.out.println(s + "world");
        return "hhh";
    }, executorService);
    cf.thenRunAsync(() -> {
        System.out.println("ddddd");
    });
    cf.thenRun(() -> {
        System.out.println("ddddsd");
    });
    cf.thenRun(() -> {
        System.out.println(Thread.currentThread());
        System.out.println("dddaewdd");
    });
}

Execution result (order may vary):

supplyAsync pool-1-thread-1
helloworld
ddddd
ddddsd
Thread[main,5,main]
dddaewdd

When cf.thenRun is synchronous, it may run on the main thread or the thread that completed the source task, depending on timing.

4. Source Code Trace

Creating a CompletableFuture

Various creation methods exist; the most common is supplyAsync:

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor) {
    return asyncSupplyStage(screenExecutor(executor), supplier);
}

static Executor screenExecutor(Executor e) {
    if (!useCommonPool && e == ForkJoinPool.commonPool())
        return asyncPool;
    if (e == null) throw new NullPointerException();
    return e;
}

The supplied Supplier runs in the provided executor or the common ForkJoin pool; a non‑daemon thread must exist for the pool to work.

asyncSupplyStage

static <U> CompletableFuture<U> asyncSupplyStage(Executor e, Supplier<U> f) {
    if (f == null) throw new NullPointerException();
    CompletableFuture<U> d = new CompletableFuture<>();
    e.execute(new AsyncSupply<U>(d, f));
    return d;
}

Creates a new CompletableFuture and schedules an AsyncSupply task.

AsyncSupply#run

public void run() {
    CompletableFuture<T> d; Supplier<T> f;
    if ((d = dep) != null && (f = fn) != null) {
        dep = null; fn = null;
        if (d.result == null) {
            try {
                d.completeValue(f.get());
            } catch (Throwable ex) {
                d.completeThrowable(ex);
            }
        }
        d.postComplete();
    }
}

The run method invokes the supplier, stores the result, and triggers postComplete to notify dependent tasks.

thenAcceptAsync

public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
    return uniAcceptStage(asyncPool, action);
}

private CompletableFuture<Void> uniAcceptStage(Executor e, Consumer<? super T> f) {
    if (f == null) throw new NullPointerException();
    CompletableFuture<Void> d = new CompletableFuture<>();
    if (e != null || !d.uniAccept(this, f, null)) {
        UniAccept<T> c = new UniAccept<T>(e, d, this, f);
        push(c);
        c.tryFire(SYNC);
    }
    return d;
}
thenAcceptAsync

creates a dependent CompletableFuture that consumes the result of the previous stage.

postComplete

final void postComplete() {
    CompletableFuture<?> f = this; Completion h;
    while ((h = f.stack) != null || (f != this && (h = (f = this).stack) != null)) {
        CompletableFuture<?> d; Completion t;
        if (f.casStack(h, t = h.next)) {
            if (t != null) {
                if (f != this) {
                    pushStack(h);
                    continue;
                }
                h.next = null;
            }
            f = (d = h.tryFire(NESTED)) == null ? this : d;
        }
    }
}

After the source task finishes, postComplete iterates over the stack of dependent tasks, invoking their tryFire methods while avoiding recursive loops.

5. Summary

CompletableFuture creates asynchronous tasks that may have dependent stages. When the source completes, postComplete triggers dependent tasks, which are executed either synchronously on the current thread or asynchronously via an executor, depending on how the stage was created. The internal logic uses a stack of Completion objects, CAS operations, and careful handling of synchronous vs asynchronous modes to ensure correct ordering and avoid dead recursion.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

BackendJavaCompletableFuture
Java High-Performance Architecture
Written by

Java High-Performance Architecture

Sharing Java development articles and resources, including SSM architecture and the Spring ecosystem (Spring Boot, Spring Cloud, MyBatis, Dubbo, Docker), Zookeeper, Redis, architecture design, microservices, message queues, Git, etc.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.