Advanced Java Multithreading: From Fundamentals to High‑Performance Transactional Scenarios
This article explores Java multithreading concepts, JVM safety tools, thread creation methods, coordination mechanisms, and practical scenarios such as parallel data aggregation, for‑loop conversion, map‑based processing, and multi‑threaded transaction handling, providing code examples and performance tips for backend developers.
Multithreading is a familiar concept, but many developers only use simple constructs like @Async or new Thread() and rarely build custom thread pools; this article demonstrates how to uncover performance‑critical multithreading scenarios in real projects and make code run "as fast as lightning".
What multithreading solves and the problems it introduces
CPU‑cache introduces visibility issues; OS time‑sharing creates atomicity problems; JVM instruction reordering causes ordering issues. The article explains each problem with examples of visibility, atomicity, and ordering bugs.
JVM tools for thread‑safety
The JVM provides synchronized, volatile, final and the Java Memory Model (JMM) with its happens‑before rules. Java code can address safety through synchronization, CAS, and thread‑local storage.
How to start threads
Basic options include Thread, Runnable, Callable; advanced options cover ThreadPoolExecutor, Future, and the JDK 8 powerhouse CompletableFuture.
Thread coordination mechanisms
Key mechanisms: volatile and synchronized for visibility and exclusivity, wait/notify for signaling, piped streams for data transfer, join() for ordering, and ThreadLocal for per‑thread storage. Advanced tools such as Semaphore, CyclicBarrier, CountDownLatch, and CompletableFuture are also mentioned.
Practical scenarios
Parallel aggregation with CompletableFuture
AtomicReference<List<SellOrderList>> orderLists = new AtomicReference<>();
AtomicReference<List<InternationalSalesList>> salesLists = new AtomicReference<>();
AtomicReference<Map<String, BigDecimal>> productMap = new AtomicReference<>();
// Logic A
CompletableFuture<Void> orderListCom = CompletableFuture.runAsync(() -> {
orderLists.set(sellOrderListService.lambdaQuery()
.ge(SellOrderList::getOrderCreateDate, startDate)
.le(SellOrderList::getOrderCreateDate, endDate)
.eq(SellOrderList::getIsDelete, 0).list());
});
CompletableFuture<Void> productCom = CompletableFuture.runAsync(() -> {
// Logic B
});
CompletableFuture<Void> euLineCom = CompletableFuture.runAsync(() -> {
// Logic C
});
CompletableFuture.allOf(orderListCom, productCom, euLineCom).handle((res, e) -> {
if (e != null) {
log.error("客户订单定时任务聚合数据异常", e);
} else {
try {
aggregateData(...);
} catch (Exception ex) {
log.error("客户订单处理数据异常", ex);
}
}
return null;
});Converting a for‑loop to parallel streams
int logicCpus = Runtime.getRuntime().availableProcessors();
ForkJoinPool forkJoinPool = new ForkJoinPool(logicCpus * 80);
List<RedundantErpSl> slAddList = new ArrayList<>(50000);
List<List<SlErpDTO>> partition = Lists.partition(slErpList, 1000);
CompletableFuture<Void> handle = CompletableFuture.allOf(
partition.stream().map(addPartitionList ->
CompletableFuture.runAsync(() -> {
for (SlErpDTO slErp : addPartitionList) {
// TODO logic processing
synchronized (slAddList) {
slAddList.add(sl);
}
}
}, forkJoinPool)).toArray(CompletableFuture[]::new)
).whenComplete((res, e) -> {
if (e != null) {
log.error("多线程组装数据失败", e);
} else {
try {
slService.batchSchedule(versionNum, slAddList);
} catch (Exception ex) {
log.error("批量插入失败", ex);
}
}
});
handle.join();Replacing nested loops with a map lookup
public Map<String, SafeRule> list2Map() {
List<SafeRule> ruleList = lambdaQuery().eq(SafeRule::getIsDelete, 0).list();
return ruleList.stream().collect(Collectors.toMap(e -> e.getBigClass() + e.getSmallClass(), Function.identity()));
}
for (X x : dataToAddOrUpdate) {
String xKey = x.getKey();
Map<String, SafeRule> map = list2Map();
if (map.get(xKey) != null) {
// update
} else {
// add
}
}Parallel map traversal
public static <K, V> List<Map<K, V>> mapSplit(Map<K, V> splitMap, int splitNum) {
if (splitMap == null || splitNum <= 0) {
List<Map<K, V>> list = new ArrayList<>();
list.add(splitMap);
return list;
}
Set<K> keySet = splitMap.keySet();
Iterator<K> iterator = keySet.iterator();
int i = 1;
List<Map<K, V>> total = new ArrayList<>();
Map<K, V> tem = new HashMap<>();
while (iterator.hasNext()) {
K next = iterator.next();
tem.put(next, splitMap.get(next));
if (i == splitNum) {
total.add(tem);
tem = new HashMap<>();
i = 0;
}
i++;
}
if (!CollectionUtils.isEmpty(tem)) {
total.add(tem);
}
return total;
}
Map<String, List<BudgetErpDTO>> materialMap = materialList.parallelStream()
.collect(Collectors.groupingBy(e -> e.getInvOrgId() + "-" + e.getItemId() + "-" + e.getVendorId()));
List<Map<String, List<BudgetErpDTO>>> mapList = MapUtil.mapSplit(materialMap, 50);
CompletableFuture<Void> handle = CompletableFuture.allOf(
mapList.stream().map(splitMap -> CompletableFuture.runAsync(() -> {
splitMap.forEach((identity, list) -> {
// business logic
});
}, ioDense)).toArray(CompletableFuture[]::new)
).exceptionally(e -> {
log.error("多线程组装数据失败", e);
return null;
});Multi‑threaded batch insert utility
// Batch method name
String methodName = "batchSchedule";
String paramName = "addList";
String serviceName = "baseInfoService";
Class<?> entity = BudgetBase.class;
printMapper(entity.getSimpleName(), methodName, paramName);
printXml(entity, methodName, paramName);
printSave(entity.getSimpleName(), serviceName, paramName, 1000);
printAddTransaction(entity.getSimpleName(), paramName, 1000);The actual batch insert uses partitioned lists and CompletableFuture.allOf to run inserts in parallel, handling exceptions with exceptionally and logging errors.
Multi‑threaded transaction handling
The article explains why Spring’s @Transactional cannot be used directly in multithreaded contexts because Spring stores transaction state in a ThreadLocal. It shows a faulty implementation that creates a new transaction per thread but attempts to commit/rollback from the main thread, leading to missing ThreadLocal keys and lock time‑outs.
Solution: each thread opens its own transaction, performs its work, then waits on a LockSupport.park(). After all threads finish, a coordinator thread uses LockSupport.unpark() to wake them and either commit or roll back based on a shared success flag.
@Autowired
private PlatformTransactionManager transactionManager;
@Autowired
@Qualifier("ioDenseExecutor")
private ThreadPoolTaskExecutor ioDense;
private void batchSchedule(List<BudgetBase> addList) {
if (!CollectionUtils.isEmpty(addList)) {
AtomicBoolean isSuccess = new AtomicBoolean(true);
AtomicInteger cur = new AtomicInteger(1);
List<Thread> unfinishedList = new ArrayList<>();
List<List<BudgetBase>> partition = Lists.partition(addList, 1000);
int totalSize = partition.size();
CompletableFuture<Void> future = CompletableFuture.allOf(
partition.stream().map(addPartitionList -> CompletableFuture.runAsync(() -> {
DefaultTransactionDefinition defGo = new DefaultTransactionDefinition();
defGo.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
TransactionStatus statusGo = transactionManager.getTransaction(defGo);
int curInt = cur.getAndIncrement();
try {
log.info("当前是第{}个线程开始启动,线程名={}", curInt, Thread.currentThread().getName());
baseInfoService.getBaseMapper().batchSchedule(addPartitionList);
log.info("当前是第{}个线程完成批量插入,开始加入等待队列,线程名={}", curInt, Thread.currentThread().getName());
synchronized (unfinishedList) { unfinishedList.add(Thread.currentThread()); }
log.info("当前是第{}个线程已加入队列,开始休眠,线程名={}", curInt, Thread.currentThread().getName());
notifyAllThread(unfinishedList, totalSize, false);
LockSupport.park();
if (isSuccess.get()) {
log.info("当前是第{}个线程提交,线程名={}", curInt, Thread.currentThread().getName());
transactionManager.commit(statusGo);
} else {
log.info("当前是第{}个线程回滚,线程名={}", curInt, Thread.currentThread().getName());
transactionManager.rollback(statusGo);
}
} catch (Exception e) {
log.error("当前是第{}个线程出现异常,线程名={}", curInt, Thread.currentThread().getName(), e);
transactionManager.rollback(statusGo);
isSuccess.set(false);
notifyAllThread(unfinishedList, totalSize, true);
}
}, ioDense)).toArray(CompletableFuture[]::new));
future.join();
}
}
private void notifyAllThread(List<Thread> unfinishedList, int totalSize, boolean isForce) {
if (isForce || unfinishedList.size() >= totalSize) {
log.info("唤醒当前所有休眠线程,线程数={}, 总线程数={}, 是否强制={}", unfinishedList.size(), totalSize, isForce);
for (Thread thread : unfinishedList) {
log.info("当前线程={}被唤醒", thread.getName());
LockSupport.unpark(thread);
}
}
}The article also discusses why LockSupport.park() / unpark() are preferred over Thread.sleep() or Object.wait(), and why a custom ThreadPoolTaskExecutor is needed instead of the default ForkJoinPool.commonPool() for I/O‑bound transactional work.
Finally, it warns about database connection pool limits: if the pool size is smaller than the number of parallel threads, threads may block indefinitely, so developers must size the pool and the batch size appropriately.
Source: juejin.cn/post/7139700932018700319
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Code Ape Tech Column
Former Ant Group P8 engineer, pure technologist, sharing full‑stack Java, job interview and career advice through a column. Site: java-family.cn
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
