Custom Asynchronous Functions in Java: Solving Thread‑Pool Blocking with Priority Execution
This article explains how to implement custom asynchronous keywords in Java, addresses thread‑pool blocking when nested async tasks use a barrier, and presents a solution using a separate priority thread pool, atomic locks, and a linked‑blocking deque to prioritize high‑priority tasks.
In performance testing, asynchronous tasks are essential. Java asynchronous programming improves application performance and responsiveness by avoiding thread blocking, increasing resource utilization, and simplifying concurrent programming, thereby enhancing user experience and preventing deadlocks.
Origin
The author referenced Go language keywords and created a custom fun keyword in Java, but encountered a problem: when an asynchronous task spawns another asynchronous task with a barrier, the thread pool becomes blocked, causing many tasks to wait.
Example scenario: 200 classes each with a teacher assigning homework to 30 students, using nested asynchronous tasks with a barrier, leading to thread‑pool blockage.
static void main(String[] args) {
200.times {
fun {
sleep(1.0) // simulate business processing
pushHomework() // assign homework
}
}
}
/**
* Assign homework
*/
static void pushHomework() {
FunPhaser phaser = new FunPhaser(); // create barrier
30.times {
fun {
sleep(1.0) // simulate business processing
output("布置作业")
}, phaser
}
phaser.await(); // wait for all homework assignments to finish
}The result is that the maximum number of threads are blocked inside pushHomework() , and the asynchronous tasks it spawns wait in the thread‑pool queue.
Initial Attempt
The first idea was to use a priority strategy, employing java.util.concurrent.PriorityBlockingQueue as the implementation of java.util.concurrent.BlockingQueue for the async thread‑pool queue, giving higher‑priority tasks precedence.
This reduced but did not eliminate blocking, leading to the decision to use a separate asynchronous thread pool.
Two thread‑pool options were considered:
Use a small‑size thread pool as an auxiliary to prevent blocking, while the normal pool handles high‑priority tasks first.
Use a large‑size (cached) thread pool dedicated to high‑priority tasks, with the normal pool also handling them when needed.
The author chose the second option for their project, as high‑priority tasks are infrequent and can be controlled via scripts.
Solution
Implementation of the thread pools:
priorityPool = createFixedPool(POOL_SIZE, "P")
Creation is placed within the normal thread pool:
static ThreadPoolExecutor getFunPool() {
if (asyncPool == null) {
synchronized (ThreadPoolUtil.class) {
if (asyncPool == null) {
asyncPool = createPool(POOL_SIZE, POOL_SIZE, ALIVE_TIME, new LinkedBlockingDeque
(Constant.MAX_WAIT_TASK), getFactory("F"));
daemon();
}
priorityPool = createFixedPool(POOL_SIZE, "P");
// priorityPool = createPool(1, POOL_MAX, ALIVE_TIME, new LinkedBlockingQueue
(10), getFactory("P"), new ThreadPoolExecutor.DiscardOldestPolicy())
}
}
return asyncPool;
}Method to execute high‑priority asynchronous tasks:
static void executeSyncPriority(Runnable runnable) {
if (priorityPool == null) getFunPool();
priorityPool.execute(runnable);
}Method to let the normal pool prioritize high‑priority tasks:
static void executePriority() {
def locked = priorityLock.compareAndSet(false, true); // acquire lock if free
if (locked) {
while (priorityPool.getQueue().size() > 0) {
def poll = priorityPool.getQueue().poll();
def queue = (LinkedBlockingDeque
) getFunPool().getQueue();
if (poll != null) {
queue.offerFirst(poll);
}
}
priorityLock.set(false); // release lock
}
}An atomic boolean is used as a lock to avoid excessive performance loss:
private static AtomicBoolean priorityLock = new AtomicBoolean(false)
The task queue implementation was changed to java.util.concurrent.LinkedBlockingDeque , allowing high‑priority tasks to be inserted at the front of the queue.
Custom asynchronous keyword implementation:
public static void fun(Closure f, FunPhaser phaser, boolean log) {
if (phaser != null) phaser.register();
ThreadPoolUtil.executeSync(() -> {
try {
ThreadPoolUtil.executePriority();
f.call();
} finally {
if (phaser != null) {
phaser.done();
if (log) logger.info("async task {}", phaser.queryTaskNum());
}
}
});
}High‑priority version (keyword funny ) using the priority pool:
public static void funny(Closure f, FunPhaser phaser, boolean log) {
if (phaser != null) phaser.register();
ThreadPoolUtil.executeSyncPriority(() -> {
try {
f.call();
} finally {
if (phaser != null) {
phaser.done();
if (log) logger.info("priority async task {}", phaser.queryTaskNum());
}
}
});
}Verification
Modified script for testing:
static void main(String[] args) {
setPoolMax(2);
6.times {
fun {
sleep(1.0) // simulate business processing
pushHomework(); // assign homework
}
}
}
/**
* Assign homework
*/
static void pushHomework() {
FunPhaser phaser = new FunPhaser(); // create barrier
4.times {
fun {
sleep(1.0) // simulate business processing
output("布置作业");
}, phaser;
}
phaser.await(); // wait for all homework assignments to finish
}Running this, the F threads initially stay in TIME_WAITING . After changing pushHomework() to use the high‑priority keyword funny , the problem is resolved and F threads execute high‑priority tasks.
Console output demonstrates high‑priority tasks being processed:
22:47:17:160 P-1 布置作业
22:47:17:160 P-1 布置作业
22:47:17:160 P-1 priority async task 3
22:47:17:160 P-1 priority async task 4
22:47:18:178 F-2 布置作业
22:47:18:179 F-2 priority async task 3
22:47:19:183 F-2 布置作业The output confirms that F threads are now executing high‑priority tasks.
Additional resources and original collections are listed at the end of the article.
FunTester
10k followers, 1k articles | completely useless
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.