Implementing Multithreaded Test Execution with Thread Pools and Distributed User Locks in Java
This article describes how to accelerate test case execution by introducing a global thread pool with CountDownLatch synchronization, presents the full Java implementations of the custom thread pool and test‑run thread classes, and explains a combined local and distributed user‑lock mechanism that uses ConcurrentHashMap caching and transactional locks to ensure safe credential retrieval.
The author announces the completion and release of a testing framework project (https://github.com/JunManYuanLong/fun-svr) and highlights two noteworthy techniques: multithreaded test execution and a robust user‑lock implementation.
Multithreading is introduced to handle test case parameters and execution scenarios. By creating a global thread pool and wrapping each task as a Runnable, the solution reduces overall execution time and avoids the overhead of repeatedly creating threads. Task completion is coordinated using CountDownLatch , which blocks further processing until all threads finish.
The core thread‑pool code is:
package com.okay.family.common.threadpool;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
* Custom thread pool for batch test case execution.
*/
public class OkayThreadPool {
private static ThreadPoolExecutor executor = createPool();
public static void addSyncWork(Runnable runnable) {
executor.execute(runnable);
}
private static ThreadPoolExecutor createPool() {
return new ThreadPoolExecutor(16, 100, 10, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000));
}
}The accompanying test‑run thread class, which processes each case and records results, is shown below:
package com.okay.family.common.threadpool;
import com.okay.family.common.basedata.OkayConstant;
import com.okay.family.common.bean.testcase.CaseRunRecord;
import com.okay.family.common.bean.testcase.request.CaseDataBean;
import com.okay.family.common.enums.CaseAvailableStatus;
import com.okay.family.common.enums.RunResult;
import com.okay.family.utils.RunCaseUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.CountDownLatch;
public class CaseRunThread implements Runnable {
private static Logger logger = LoggerFactory.getLogger(CaseRunThread.class);
int envId;
CaseDataBean bean;
CaseRunRecord record;
CountDownLatch countDownLatch;
public CaseRunRecord getRecord() { return record; }
private CaseRunThread() {}
public CaseRunThread(CaseDataBean bean, CountDownLatch countDownLatch, int runId, int envId) {
this.bean = bean;
this.envId = envId;
this.countDownLatch = countDownLatch;
this.record = new CaseRunRecord();
record.setRunId(runId);
record.setUid(bean.getUid());
record.setParams(bean.getParams());
record.setCaseId(bean.getId());
record.setMark(OkayConstant.RUN_MARK.getAndIncrement());
bean.getHeaders().put(OkayConstant.MARK_HEADER, record.getMark());
record.setHeaders(bean.getHeaders());
}
@Override
public void run() {
try {
if (bean.getAvailable() == RunResult.USER_ERROR.getCode()) {
record.fail(RunResult.USER_ERROR, bean);
} else if (bean.getEnvId() != envId || bean.getAvailable() != CaseAvailableStatus.OK.getCode()) {
record.fail(RunResult.UNRUN, bean);
} else {
RunCaseUtil.run(bean, record);
}
} catch (Exception e) {
logger.warn("用例运行出错,ID:" + bean.getId(), e);
record.fail(RunResult.UNRUN, bean);
} finally {
countDownLatch.countDown();
}
}
}User‑Lock Implementation
The article then discusses a two‑level locking strategy that combines a thread‑level synchronization lock with a distributed lock to safely obtain and update user credentials. A ConcurrentHashMap cache reduces database reads and prevents repeated failed lookups.
The essential method for retrieving a user certificate is:
@Override
@Transactional(isolation = Isolation.REPEATABLE_READ)
public String getCertificate(int id, ConcurrentHashMap
map) {
if (map.containsKey(id)) return map.get(id);
Object lockObj = UserLock.get(id);
synchronized (lockObj) {
if (map.containsKey(id)) return map.get(id);
logger.warn("非缓存读取用户数据{}", id);
TestUserCheckBean user = testUserMapper.findUser(id);
if (user == null) UserStatusException.fail("用户不存在,ID:" + id);
long create = Time.getTimestamp(user.getCreate_time());
long now = Time.getTimeStamp();
if (now - create < OkayConstant.CERTIFICATE_TIMEOUT && user.getStatus() == UserState.OK.getCode()) {
map.put(id, user.getCertificate());
return user.getCertificate();
}
boolean loginOk = UserUtil.checkUserLoginStatus(user);
logger.info("环境:{},用户:{},身份:{},登录状态验证:{}", user.getEnvId(), user.getId(), user.getRoleId(), loginOk);
if (!loginOk) {
updateUserStatus(user);
if (user.getStatus() != UserState.OK.getCode()) {
map.put(id, OkayConstant.EMPTY);
UserStatusException.fail("用户不可用,ID:" + id);
}
} else {
testUserMapper.updateUserStatus(user);
}
map.put(id, user.getCertificate());
return user.getCertificate();
}
}The corresponding method for updating user status demonstrates acquiring a distributed lock via commonService.lock , handling lock contention with a retry loop, and finally releasing the lock:
@Override
@Transactional(isolation = Isolation.REPEATABLE_READ)
public int updateUserStatus(TestUserCheckBean bean) {
int userLock = NodeLock.getUserLock(bean.getId());
int lock = commonService.lock(userLock);
if (lock == 0) {
logger.info("分布式锁竞争失败,ID:{}", bean.getId());
int i = 0;
while (true) {
SourceCode.sleep(OkayConstant.WAIT_INTERVAL);
TestUserCheckBean user = testUserMapper.findUser(bean.getId());
long create = Time.getTimestamp(user.getCreate_time());
long now = Time.getTimeStamp();
if (now - create < OkayConstant.CERTIFICATE_TIMEOUT && user.getStatus() == UserState.OK.getCode()) {
bean.copyFrom(user);
return testUserMapper.updateUserStatus(bean);
}
if (i++ > OkayConstant.WAIT_MAX_TIME) {
UserStatusException.fail("获取分布式锁超时,无法更新用户凭据:id:" + bean.getId());
}
}
} else {
logger.info("分布式锁竞争成功,ID:{}", bean.getId());
try {
TestUserCheckBean user = testUserMapper.findUser(bean.getId());
long create = Time.getTimestamp(user.getCreate_time());
long now = Time.getTimeStamp();
if (bean.same(user) && StringUtils.isNotBlank(user.getCertificate())) {
if (now - create < OkayConstant.CERTIFICATE_TIMEOUT && user.getStatus() == UserState.OK.getCode()) {
bean.copyFrom(user);
return testUserMapper.updateUserStatus(bean);
}
if (UserUtil.checkUserLoginStatus(user)) bean.copyFrom(user);
}
UserUtil.updateUserStatus(bean);
return testUserMapper.updateUserStatus(bean);
} catch (Exception e) {
logger.error("用户验证失败!ID:{}", bean.getId(), e);
bean.setStatus(UserState.CANNOT.getCode());
return testUserMapper.updateUserStatus(bean);
} finally {
commonService.unlock(userLock);
}
}
}Finally, the author invites readers to follow the FunTester public account for more original articles and reminds them not to repost without permission.
FunTester
10k followers, 1k articles | completely useless
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.