How to Load Test Dubbo Queue APIs with JMeter and Java Concurrency
This article explains a practical approach to performance‑testing Dubbo add and delete methods of a message queue by pre‑generating payloads, using a thread‑safe LinkedBlockingQueue, and driving the calls with JMeter‑compatible Java code.
Benchmarking Dubbo Queue Add/Delete Operations
The performance test targets two Dubbo methods of a message‑queue service: one that adds a message to the queue and another that removes (deletes) a message. Because the built‑in JMeter Dubbo sampler was discontinued, a custom Java‑based load generator was created.
Message format and uniqueness
Each message is a string composed of an event type, a user identifier, and a payload. Uniqueness is achieved by appending System.nanoTime() (the nanosecond timestamp) to the payload; a random component is unnecessary.
Pre‑generation of messages
Before the test starts, a large set of distinct message strings is generated and written to a file (e.g., queue). At runtime the file is read and the strings are loaded into a thread‑safe LinkedBlockingQueue<String>. This queue supplies a unique message to each test thread.
Adding messages
Each thread retrieves a message from the shared queue and invokes the Dubbo createQueue method. The request includes a trace ID, a delay time (current time + 1 hour), the generated message, a task type enum, and a TTL of zero.
public int createQ() {
String absolutePath = new File("").getAbsolutePath();
List<String> strings = WriteRead.readTxtFileByLine(absolutePath + "/dubbo");
new Concurrent(new ThreadBase(SourceCode.changeStringToInt(strings.get(0))) {
@Override
protected void before() {}
@Override
protected void doing() throws Exception {
CreateQueueRequest req = new CreateQueueRequest();
req.setReqId(TraceKeyHolder.getTraceKey());
req.setDelayTime(System.currentTimeMillis() + 3600 * 1000);
String msg = "wait_for_publish:8888" + "@" + System.nanoTime() + PublishType.ZUOYE;
req.setMsg(msg);
req.setTaskTypeEnum(TaskTypeEnum.PUBLISH_PROMU);
req.setTtl(0L);
CommonResponse<CreateQueueResultVo> resp = commonDelayQueueService.createQueue(req);
logger.info("createQueue0 {}", JsonUtil.obj2Json(resp));
}
@Override
protected void after() {}
}, SourceCode.changeStringToInt(strings.get(1))).start();
return 0;
}Deleting messages
The delete test follows the same pattern. If the shared queue becomes empty, it is repopulated by calling addmsg(). Each thread polls a message (with a 100 ms timeout) and calls the Dubbo deleteQueue method.
public int deleteQ() throws InterruptedException {
if (msgs.size() == 0) {
logger.info("Queue is empty");
msgs = addmsg();
}
String absolutePath = new File("").getAbsolutePath();
List<String> strings = WriteRead.readTxtFileByLine(absolutePath + "/dubbo");
new Concurrent(new ThreadBase(SourceCode.changeStringToInt(strings.get(0))) {
@Override
protected void before() {}
@Override
protected void doing() throws Exception {
String msg = msgs.poll(100, TimeUnit.MILLISECONDS);
logger.info("msg:{}", msg);
DeleteQueueRequest req = new DeleteQueueRequest();
req.setMsg(msg);
req.setTaskTypeEnum(TaskTypeEnum.PUBLISH_PROMU);
CommonResponse<String> resp = commonDelayQueueService.deleteQueue(req);
logger.info("deleteQueue2 {}", JsonUtil.obj2Json(resp));
}
@Override
protected void after() {}
}, SourceCode.changeStringToInt(strings.get(1))).start();
return 0;
}Queue initialization
The static queue msgs is populated once at JVM start by reading the queue file. The helper method returns a LinkedBlockingQueue<String> containing all pre‑generated messages.
public static LinkedBlockingQueue<String> msgs = addmsg();
public static LinkedBlockingQueue<String> addmsg() {
String absolutePath = new File("").getAbsolutePath();
List<String> strings = WriteRead.readTxtFileByLine(absolutePath + "/queue");
LinkedBlockingQueue<String> q = new LinkedBlockingQueue<>();
q.addAll(strings);
logger.info("重新读取队列值");
return q;
}Potential race condition and mitigation
If addmsg() is invoked while a test is still running, the shared queue may be repopulated mid‑test, leading to inconsistent load distribution. For small data sets the author recommends initializing msgs before the test begins or performing per‑thread initialization inside the before() hook of each thread.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
