How to Ensure Transaction Consistency in Multithreaded Spring Applications
This article explains how to execute two dependent tasks in parallel using CompletableFuture, why @Async and @Transactional are insufficient for multithreaded transaction consistency, and provides a programmatic solution that copies Spring transaction resources between threads to guarantee atomic commit or rollback across asynchronous operations.
Problem Statement
We need to delete resources of a module and its sub‑modules in two steps, then delete the module itself, while executing steps 1 and 2 in parallel and committing the whole operation only after step 3 succeeds.
public void removeAuthorityModuleSeq(Integer authorityModuleId, IAuthorityService iAuthorityService, IRoleAuthorityService iRoleAuthorityService) {
// 1. delete current module resources
deleteAuthoritiesOfCurrentAuthorityModule(authorityModuleId, iAuthorityService, iRoleAuthorityService);
// 2. delete sub‑modules recursively
deleteSonAuthorityModuleUnderCurrentAuthorityModule(authorityModuleId, iAuthorityService, iRoleAuthorityService);
// 3. delete the module itself
removeById(authorityModuleId);
}How to Solve Asynchronous Execution
Spring's @Async annotation creates a MethodInterceptor that wraps the method in a task and submits it to a thread pool, but it cannot express the required binary dependency between the two parallel tasks.
We first use CompletableFuture to run the two deletion tasks concurrently and wait for both to finish before invoking the final delete step:
public void removeAuthorityModuleSeq(Integer authorityModuleId, IAuthorityService iAuthorityService, IRoleAuthorityService iRoleAuthorityService) {
CompletableFuture.runAsync(() -> {
CompletableFuture<Void> future1 = CompletableFuture.runAsync(() ->
deleteAuthoritiesOfCurrentAuthorityModule(authorityModuleId, iAuthorityService, iRoleAuthorityService), executor);
CompletableFuture<Void> future2 = CompletableFuture.runAsync(() ->
deleteSonAuthorityModuleUnderCurrentAuthorityModule(authorityModuleId, iAuthorityService, iRoleAuthorityService), executor);
CompletableFuture.allOf(future1, future2).thenRun(() -> removeById(authorityModuleId));
}, executor);
}Ensuring Transaction Consistency in a Multithreaded Environment
Declarative transaction management with @Transactional fails because Spring binds transaction resources (connections, isolation level, etc.) to the thread that creates the transaction via TransactionSynchronizationManager. When the commit runs in a different thread, the resources cannot be found, leading to errors such as "No value for key [HikariDataSource (HikariPool‑1)] bound to thread [main]".
The core of Spring's transaction handling involves:
Creating a TransactionDefinition that describes isolation, propagation, timeout, read‑only flag, etc.
Using a PlatformTransactionManager (e.g., DataSourceTransactionManager) to obtain a TransactionStatus and manage the transaction lifecycle.
Storing resources in thread‑local maps inside TransactionSynchronizationManager (resources, synchronizations, transaction name, read‑only flag, isolation level, active flag).
Programmatic Transaction Solution
To make the transaction span multiple threads we copy the thread‑local resources from the worker thread back to the main thread before committing or rolling back.
public class MultiplyThreadTransactionManager {
private final DataSource dataSource;
public void runAsyncButWaitUntilAllDown(List<Runnable> tasks, Executor executor) {
if (executor == null) {
throw new IllegalArgumentException("线程池不能为空");
}
DataSourceTransactionManager transactionManager = getTransactionManager();
AtomicBoolean ex = new AtomicBoolean();
List<CompletableFuture<?>> taskFutureList = new ArrayList<>(tasks.size());
List<TransactionStatus> transactionStatusList = new ArrayList<>(tasks.size());
List<TransactionResource> transactionResources = new ArrayList<>(tasks.size());
tasks.forEach(task -> {
taskFutureList.add(CompletableFuture.runAsync(() -> {
try {
transactionStatusList.add(openNewTransaction(transactionManager));
transactionResources.add(TransactionResource.copyTransactionResource());
task.run();
} catch (Throwable t) {
t.printStackTrace();
ex.set(Boolean.TRUE);
taskFutureList.forEach(f -> f.cancel(true));
}
}, executor));
});
try {
CompletableFuture.allOf(taskFutureList.toArray(new CompletableFuture[0])).get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
if (ex.get()) {
System.out.println("发生异常,全部事务回滚");
for (int i = 0; i < tasks.size(); i++) {
transactionResources.get(i).autoWiredTransactionResource();
transactionManager.rollback(transactionStatusList.get(i));
transactionResources.get(i).removeTransactionResource();
}
} else {
System.out.println("全部事务正常提交");
for (int i = 0; i < tasks.size(); i++) {
transactionResources.get(i).autoWiredTransactionResource();
transactionManager.commit(transactionStatusList.get(i));
transactionResources.get(i).removeTransactionResource();
}
}
}
private TransactionStatus openNewTransaction(DataSourceTransactionManager tm) {
DefaultTransactionDefinition def = new DefaultTransactionDefinition();
return tm.getTransaction(def);
}
private DataSourceTransactionManager getTransactionManager() {
return new DataSourceTransactionManager(dataSource);
}
@Builder
private static class TransactionResource {
private Map<Object, Object> resources = new HashMap<>();
private Set<TransactionSynchronization> synchronizations = new HashSet<>();
private String currentTransactionName;
private Boolean currentTransactionReadOnly;
private Integer currentTransactionIsolationLevel;
private Boolean actualTransactionActive;
public static TransactionResource copyTransactionResource() {
return TransactionResource.builder()
.resources(TransactionSynchronizationManager.getResourceMap())
.synchronizations(new LinkedHashSet<>())
.currentTransactionName(TransactionSynchronizationManager.getCurrentTransactionName())
.currentTransactionReadOnly(TransactionSynchronizationManager.isCurrentTransactionReadOnly())
.currentTransactionIsolationLevel(TransactionSynchronizationManager.getCurrentTransactionIsolationLevel())
.actualTransactionActive(TransactionSynchronizationManager.isActualTransactionActive())
.build();
}
public void autoWiredTransactionResource() {
resources.forEach(TransactionSynchronizationManager::bindResource);
TransactionSynchronizationManager.initSynchronization();
TransactionSynchronizationManager.setActualTransactionActive(actualTransactionActive);
TransactionSynchronizationManager.setCurrentTransactionName(currentTransactionName);
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(currentTransactionIsolationLevel);
TransactionSynchronizationManager.setCurrentTransactionReadOnly(currentTransactionReadOnly);
}
public void removeTransactionResource() {
resources.keySet().forEach(key -> {
if (!(key instanceof DataSource)) {
TransactionSynchronizationManager.unbindResource(key);
}
});
}
}
}Tests show that when any task throws an exception, all transactions are rolled back, leaving the database unchanged.
Conclusion
The presented method demonstrates one way to achieve transaction consistency across multiple threads by manually copying and restoring Spring's transaction resources. Alternative approaches include using JDBC directly for programmatic commit/rollback or adopting distributed‑transaction techniques. Understanding the underlying transaction infrastructure is essential to avoid relying solely on @Transactional in complex multithreaded scenarios.
Source: blog.csdn.net/m0_53157173/article/details/127423286
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Java Interview Crash Guide
Dedicated to sharing Java interview Q&A; follow and reply "java" to receive a free premium Java interview guide.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
