Backend Development 9 min read

Custom Asynchronous Functions in Java: Solving Thread‑Pool Blocking with Priority Execution

This article explains how to implement custom asynchronous keywords in Java, addresses thread‑pool blocking when nested async tasks use a barrier, and presents a solution using a separate priority thread pool, atomic locks, and a linked‑blocking deque to prioritize high‑priority tasks.

FunTester
FunTester
FunTester
Custom Asynchronous Functions in Java: Solving Thread‑Pool Blocking with Priority Execution

In performance testing, asynchronous tasks are essential. Java asynchronous programming improves application performance and responsiveness by avoiding thread blocking, increasing resource utilization, and simplifying concurrent programming, thereby enhancing user experience and preventing deadlocks.

Origin

The author referenced Go language keywords and created a custom fun keyword in Java, but encountered a problem: when an asynchronous task spawns another asynchronous task with a barrier, the thread pool becomes blocked, causing many tasks to wait.

Example scenario: 200 classes each with a teacher assigning homework to 30 students, using nested asynchronous tasks with a barrier, leading to thread‑pool blockage.

static void main(String[] args) {
    200.times {
        fun {
            sleep(1.0) // simulate business processing
            pushHomework() // assign homework
        }
    }
}
/**
 * Assign homework
 */
static void pushHomework() {
    FunPhaser phaser = new FunPhaser(); // create barrier
    30.times {
        fun {
            sleep(1.0) // simulate business processing
            output("布置作业")
        }, phaser
    }
    phaser.await(); // wait for all homework assignments to finish
}

The result is that the maximum number of threads are blocked inside pushHomework() , and the asynchronous tasks it spawns wait in the thread‑pool queue.

Initial Attempt

The first idea was to use a priority strategy, employing java.util.concurrent.PriorityBlockingQueue as the implementation of java.util.concurrent.BlockingQueue for the async thread‑pool queue, giving higher‑priority tasks precedence.

This reduced but did not eliminate blocking, leading to the decision to use a separate asynchronous thread pool.

Two thread‑pool options were considered:

Use a small‑size thread pool as an auxiliary to prevent blocking, while the normal pool handles high‑priority tasks first.

Use a large‑size (cached) thread pool dedicated to high‑priority tasks, with the normal pool also handling them when needed.

The author chose the second option for their project, as high‑priority tasks are infrequent and can be controlled via scripts.

Solution

Implementation of the thread pools:

priorityPool = createFixedPool(POOL_SIZE, "P")

Creation is placed within the normal thread pool:

static ThreadPoolExecutor getFunPool() {
    if (asyncPool == null) {
        synchronized (ThreadPoolUtil.class) {
            if (asyncPool == null) {
                asyncPool = createPool(POOL_SIZE, POOL_SIZE, ALIVE_TIME, new LinkedBlockingDeque
(Constant.MAX_WAIT_TASK), getFactory("F"));
                daemon();
            }
            priorityPool = createFixedPool(POOL_SIZE, "P");
            // priorityPool = createPool(1, POOL_MAX, ALIVE_TIME, new LinkedBlockingQueue
(10), getFactory("P"), new ThreadPoolExecutor.DiscardOldestPolicy())
        }
    }
    return asyncPool;
}

Method to execute high‑priority asynchronous tasks:

static void executeSyncPriority(Runnable runnable) {
    if (priorityPool == null) getFunPool();
    priorityPool.execute(runnable);
}

Method to let the normal pool prioritize high‑priority tasks:

static void executePriority() {
    def locked = priorityLock.compareAndSet(false, true); // acquire lock if free
    if (locked) {
        while (priorityPool.getQueue().size() > 0) {
            def poll = priorityPool.getQueue().poll();
            def queue = (LinkedBlockingDeque
) getFunPool().getQueue();
            if (poll != null) {
                queue.offerFirst(poll);
            }
        }
        priorityLock.set(false); // release lock
    }
}

An atomic boolean is used as a lock to avoid excessive performance loss:

private static AtomicBoolean priorityLock = new AtomicBoolean(false)

The task queue implementation was changed to java.util.concurrent.LinkedBlockingDeque , allowing high‑priority tasks to be inserted at the front of the queue.

Custom asynchronous keyword implementation:

public static void fun(Closure f, FunPhaser phaser, boolean log) {
    if (phaser != null) phaser.register();
    ThreadPoolUtil.executeSync(() -> {
        try {
            ThreadPoolUtil.executePriority();
            f.call();
        } finally {
            if (phaser != null) {
                phaser.done();
                if (log) logger.info("async task {}", phaser.queryTaskNum());
            }
        }
    });
}

High‑priority version (keyword funny ) using the priority pool:

public static void funny(Closure f, FunPhaser phaser, boolean log) {
    if (phaser != null) phaser.register();
    ThreadPoolUtil.executeSyncPriority(() -> {
        try {
            f.call();
        } finally {
            if (phaser != null) {
                phaser.done();
                if (log) logger.info("priority async task {}", phaser.queryTaskNum());
            }
        }
    });
}

Verification

Modified script for testing:

static void main(String[] args) {
    setPoolMax(2);
    6.times {
        fun {
            sleep(1.0) // simulate business processing
            pushHomework(); // assign homework
        }
    }
}
/**
 * Assign homework
 */
static void pushHomework() {
    FunPhaser phaser = new FunPhaser(); // create barrier
    4.times {
        fun {
            sleep(1.0) // simulate business processing
            output("布置作业");
        }, phaser;
    }
    phaser.await(); // wait for all homework assignments to finish
}

Running this, the F threads initially stay in TIME_WAITING . After changing pushHomework() to use the high‑priority keyword funny , the problem is resolved and F threads execute high‑priority tasks.

Console output demonstrates high‑priority tasks being processed:

22:47:17:160 P-1  布置作业
22:47:17:160 P-1  布置作业
22:47:17:160 P-1  priority async task 3
22:47:17:160 P-1  priority async task 4
22:47:18:178 F-2  布置作业
22:47:18:179 F-2  priority async task 3
22:47:19:183 F-2  布置作业

The output confirms that F threads are now executing high‑priority tasks.

Additional resources and original collections are listed at the end of the article.

Javaconcurrencythreadpoolasynchronouspriority
FunTester
Written by

FunTester

10k followers, 1k articles | completely useless

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.