How to Implement Fixed QPS Asynchronous Tasks in Java with Semaphores and Thread Pools
This article explains a Java technique for limiting asynchronous task execution to a fixed queries‑per‑second rate using a Semaphore‑based thread pool, a LinkedBlockingQueue, and a daemon thread that manages task dispatch, shutdown, and idle waiting.
Entry Method
The utility method funer(Closure f) forwards a closure to ThreadPoolUtil.addQPSTask(f), which queues the task for execution under a fixed QPS limit.
/**
* Execute a closure with fixed QPS, default 16. Adjust via SourceCode#setMaxQps(int).
*/
public static void funer(Closure f) {
fun(JToG.toClosure(() -> {
ThreadPoolUtil.addQPSTask(f);
return null;
}));
}Implementation Details
A java.util.concurrent.LinkedBlockingQueue stores pending asynchronous tasks. New tasks are added with addQPSTask(Closure closure), which simply offers the closure to the queue.
/**
* Add a fixed‑QPS asynchronous task.
* The offer method is non‑blocking and returns a Boolean.
*/
static def addQPSTask(Closure closure) {
asyncQueue.offer(closure)
}A daemon thread continuously polls the queue, executes tasks, and ensures the thread pool shuts down cleanly.
static boolean daemon() {
def thread = new Thread(new Runnable() {
@Override
void run() {
SourceCode.noError {
while (checkMain()) {
SourceCode.sleep(1.0)
ASYNC_QPS.times { executeCacheSync() }
}
waitAsyncIdle()
}
ThreadPoolUtil.shutPool()
}
})
thread.setDaemon(true)
thread.setName("Deamon")
thread.start()
}Task Execution Logic
The method executeCacheSync() pulls a closure from the queue and runs it if present.
/**
* Execute a task from the cache queue using the thread pool.
*/
static def executeCacheSync() {
def poll = asyncQueue.poll()
if (poll != null) executeCacheSync({ poll() })
}Shutdown and Idle Waiting
Two helper methods ensure the pool stops when the application ends and that the daemon waits until all queued tasks finish.
/** Wait for the asynchronous thread pool to become idle */
static void waitAsyncIdle() {
if (asyncPool == null) return
SourceCode.time({
SourceCode.waitFor {
((int) (ASYNC_QPS / 5) + 1).times { executeCacheSync() }
asyncPool.getQueue().size() == 0 &&
asyncPool.getActiveCount() == 0 &&
asyncQueue.size() == 0
}
}, "异步线程池等待")
}
/** Close the asynchronous thread pool to allow the JVM to exit */
static void shutPool() {
if (!getFunPool().isShutdown()) {
log.info(Output.rgb("异步线程池关闭!"))
getFunPool().shutdown()
}
if (cachePool != null && !cachePool.isShutdown()) {
cachePool.shutdown()
}
}Self‑Test Example
The following main method demonstrates setting a max QPS of 1, submitting ten tasks that print the current time, sleeping for five seconds, and then printing a final message.
static void main(String[] args) {
setMaxQps(1)
10.times {
funer {
output(Time.getDate())
}
}
sleep(5.0)
output("FunTester")
}Running the program produces timestamped console output followed by a summary of the daemon’s waiting time and shutdown messages, confirming that tasks were throttled to the configured QPS and that the thread pool terminated cleanly.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
