How to Implement Fixed QPS Asynchronous Tasks in Java with Semaphores and Thread Pools

This article explains a Java technique for limiting asynchronous task execution to a fixed queries‑per‑second rate using a Semaphore‑based thread pool, a LinkedBlockingQueue, and a daemon thread that manages task dispatch, shutdown, and idle waiting.

FunTester
FunTester
FunTester
How to Implement Fixed QPS Asynchronous Tasks in Java with Semaphores and Thread Pools

Entry Method

The utility method funer(Closure f) forwards a closure to ThreadPoolUtil.addQPSTask(f), which queues the task for execution under a fixed QPS limit.

/**
 * Execute a closure with fixed QPS, default 16. Adjust via SourceCode#setMaxQps(int).
 */
public static void funer(Closure f) {
    fun(JToG.toClosure(() -> {
        ThreadPoolUtil.addQPSTask(f);
        return null;
    }));
}

Implementation Details

A java.util.concurrent.LinkedBlockingQueue stores pending asynchronous tasks. New tasks are added with addQPSTask(Closure closure), which simply offers the closure to the queue.

/**
 * Add a fixed‑QPS asynchronous task.
 * The offer method is non‑blocking and returns a Boolean.
 */
static def addQPSTask(Closure closure) {
    asyncQueue.offer(closure)
}

A daemon thread continuously polls the queue, executes tasks, and ensures the thread pool shuts down cleanly.

static boolean daemon() {
    def thread = new Thread(new Runnable() {
        @Override
        void run() {
            SourceCode.noError {
                while (checkMain()) {
                    SourceCode.sleep(1.0)
                    ASYNC_QPS.times { executeCacheSync() }
                }
                waitAsyncIdle()
            }
            ThreadPoolUtil.shutPool()
        }
    })
    thread.setDaemon(true)
    thread.setName("Deamon")
    thread.start()
}

Task Execution Logic

The method executeCacheSync() pulls a closure from the queue and runs it if present.

/**
 * Execute a task from the cache queue using the thread pool.
 */
static def executeCacheSync() {
    def poll = asyncQueue.poll()
    if (poll != null) executeCacheSync({ poll() })
}

Shutdown and Idle Waiting

Two helper methods ensure the pool stops when the application ends and that the daemon waits until all queued tasks finish.

/** Wait for the asynchronous thread pool to become idle */
static void waitAsyncIdle() {
    if (asyncPool == null) return
    SourceCode.time({
        SourceCode.waitFor {
            ((int) (ASYNC_QPS / 5) + 1).times { executeCacheSync() }
            asyncPool.getQueue().size() == 0 &&
            asyncPool.getActiveCount() == 0 &&
            asyncQueue.size() == 0
        }
    }, "异步线程池等待")
}

/** Close the asynchronous thread pool to allow the JVM to exit */
static void shutPool() {
    if (!getFunPool().isShutdown()) {
        log.info(Output.rgb("异步线程池关闭!"))
        getFunPool().shutdown()
    }
    if (cachePool != null && !cachePool.isShutdown()) {
        cachePool.shutdown()
    }
}

Self‑Test Example

The following main method demonstrates setting a max QPS of 1, submitting ten tasks that print the current time, sleeping for five seconds, and then printing a final message.

static void main(String[] args) {
    setMaxQps(1)
    10.times {
        funer {
            output(Time.getDate())
        }
    }
    sleep(5.0)
    output("FunTester")
}

Running the program produces timestamped console output followed by a summary of the daemon’s waiting time and shutdown messages, confirming that tasks were throttled to the configured QPS and that the thread pool terminated cleanly.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

JavaconcurrencyThreadPoolAsynchronoussemaphoreQPS
FunTester
Written by

FunTester

10k followers, 1k articles | completely useless

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.