Load Testing Dubbo Queue Service Using Java Concurrency and JMeter

This article describes how to conduct load testing of Dubbo queue service methods for adding and deleting messages using Java concurrency utilities and JMeter, including code examples for constructing test data, managing a thread‑safe LinkedBlockingQueue, and handling initialization challenges during repeated test runs.

FunTester
FunTester
FunTester
Load Testing Dubbo Queue Service Using Java Concurrency and JMeter

The author needed to performance‑test two Dubbo RPC methods of a message‑queue service: one that adds messages to the queue and another that retrieves (deletes) them. Existing team members used JMeter for Dubbo interface testing, but the author implemented a custom solution.

For the add‑operation, a simple message key is built from event type, user identifier and body. To generate unique payloads the author initially combined nanoseconds with a random number, later finding nanoseconds alone sufficient.

Because JMeter did not provide a convenient way to feed pre‑generated messages, the author pre‑created a large list of messages, stored them in a thread‑safe LinkedBlockingQueue<String>, and let multiple threads poll from it during the test. After each test run the data is reset to avoid leftover state.

public int createQ() {
    String absolutePath = new File("").getAbsolutePath();
    List<String> strings = WriteRead.readTxtFileByLine(absolutePath + "/dubbo");
    new Concurrent(new ThreadBase(SourceCode.changeStringToInt(strings.get(0))) {
        @Override
        protected void before() {
        }

        @Override
        protected void doing() throws Exception {
            CreateQueueRequest createQueueRequest = new CreateQueueRequest();
            createQueueRequest.setReqId(TraceKeyHolder.getTraceKey());
            createQueueRequest.setDelayTime(System.currentTimeMillis() + 3600 * 1000);
            String msg = "wait_for_publish:8888" + "@" + System.nanoTime() + PublishType.ZUOYE;
            createQueueRequest.setMsg(msg);
            createQueueRequest.setTaskTypeEnum(TaskTypeEnum.PUBLISH_PROMU);
            createQueueRequest.setTtl(0L);
            CommonResponse<CreateQueueResultVo> queue = commonDelayQueueService.createQueue(createQueueRequest);
            logger.info("createQueue0  {}", JsonUtil.obj2Json(queue));
        }

        @Override
        protected void after() {
        }
    }, SourceCode.changeStringToInt(strings.get(1))).start();
    return 0;
}

The delete‑operation retrieves a message from the shared queue and invokes the corresponding Dubbo method. If the queue becomes empty during a long test run, the code re‑initialises it.

public int deleteQ() throws InterruptedException {
    if (msgs.size() == 0) {
        logger.info("队列为空了");
        msgs = addmsg();
    }
    String absolutePath = new File("").getAbsolutePath();
    List<String> strings = WriteRead.readTxtFileByLine(absolutePath + "/dubbo");

    new Concurrent(new ThreadBase(SourceCode.changeStringToInt(strings.get(0))) {
        @Override
        protected void before() {
        }

        @Override
        protected void doing() throws Exception {
            String msg = msgs.poll(100, TimeUnit.MILLISECONDS);
            logger.info("msg:{}", msg);
            DeleteQueueRequest deleteQueueRequest0 = new DeleteQueueRequest();
            deleteQueueRequest0.setMsg(msg);
            deleteQueueRequest0.setTaskTypeEnum(TaskTypeEnum.PUBLISH_PROMU);
            CommonResponse<String> queue3 = commonDelayQueueService.deleteQueue(deleteQueueRequest0);
            logger.info("deleteQueue2 {}", JsonUtil.obj2Json(queue3));
        }

        @Override
        protected void after() {
        }
    }, SourceCode.changeStringToInt(strings.get(1))).start();
    return 0;
}

The shared queue and its initialisation are defined as follows:

public static LinkedBlockingQueue<String> msgs = addmsg();

public static LinkedBlockingQueue<String> addmsg() {
    String absolutePath = new File("").getAbsolutePath();
    List<String> strings = WriteRead.readTxtFileByLine(absolutePath + "/queue");
    LinkedBlockingQueue<String> ss = new LinkedBlockingQueue<>();
    ss.addAll(strings);
    logger.info("重新读取队列值");
    return ss;
}

A potential issue is that the addmsg() method may be invoked repeatedly during a long test, causing the queue to be re‑populated unexpectedly. For large data sets the author ignored this, but for smaller loads they recommend resetting msgs before the test starts or re‑initialising it in the before() method of each thread.

The article ends with an invitation to read the original post and discuss further.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

JavaconcurrencyDubboMessage Queue
FunTester
Written by

FunTester

10k followers, 1k articles | completely useless

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.