How to Ensure Transaction Consistency When Async Threads Fail in Spring
This article explains how to guarantee transactional integrity when importing a 100,000‑row Excel file with asynchronous threads in Spring, covering double‑async optimization, Future and CompletableFuture handling, @Transactional pitfalls, manual transaction control, and a complete solution for cross‑thread transaction management.
1. Background
In the previous article we imported 100,000 rows of Excel using a double‑asynchronous approach. A reader asked how to guarantee transaction consistency when processing the data in batches.
Original Requirement: Read a 100,000‑row Excel
Serial reading of the Excel file took 191 seconds per file.
Optimization 1: Double Async
Read Excel and insert into the database using POI and EasyExcel.
Discussed core thread‑pool size settings.
After dozens of tests, using a thread pool to perform a single‑shot parallel insert achieved the best efficiency.
Optimization 2: Future for Data Consistency
By obtaining asynchronous return values with Future and comparing them with the Excel rows, we can determine data accuracy.
Analyzed FutureTask source code and drew its execution flow diagram.
Analyzed get() source code and drew its execution flow diagram.
Discovered that Future.get() blocks the main thread.
Optimization 3: CompletableFuture
Java 8 introduced CompletableFuture, which upgrades Future and allows callbacks to obtain asynchronous results.
The asynchronous execution uses ForkJoinPool, which employs daemon threads to run tasks. ForkJoinPool fully utilizes multi‑core CPUs by splitting a task into many subtasks, executing them in parallel, and then merging the results.
Optimized "obtain asynchronous return value via Future" with CompletableFuture.
Compared efficiency of CompletableFuture and Future.
Customized ForkJoinPool thread pool.
With the same core thread count, CompletableFuture inserts data about 4 seconds faster than Future for 100,000 rows.
Used CompletableFuture.allOf to avoid blocking the main thread.
Summarized the syntactic sugar of CompletableFuture.
2. Rolling Back All Async Threads When One Fails
To guarantee transaction, we must use @Transactional. The scenario involves importing several large Excel files, each requiring its own transaction. The main thread holds a transaction, and each async sub‑thread also has a transaction. If any sub‑thread fails, all sub‑transactions must roll back; otherwise, commit.
Adding @Transactional to both the main and sub‑threads does not work because a sub‑thread’s exception only rolls back its own transaction, leaving partial data in the database.
Example simulation: if the last batch throws an exception, 96,600 rows are already inserted, violating the single‑Excel transaction requirement.
if(end == sheet.getLastRowNum()){ logger.info("插入最后一批数据,模拟异常"); int a = 1/0;}3. @Transactional Annotation
Declarative transaction management is built on AOP. It intercepts method execution, creates or joins a transaction before the target method runs, and commits or rolls back after execution.
In short, @Transactional rolls back the transaction when an error occurs.
Add @EnableTransactionManagement on the startup class.
When placed on a class, all public methods inherit the transaction attribute; method‑level annotations can override it.
Using @Transactional(rollbackFor=Exception.class) makes the transaction roll back for both runtime and checked exceptions.
Without rollbackFor, only RuntimeException triggers a rollback.
3.1 @Transactional Behavior
If a runtime exception occurs without a try‑catch, the program aborts and the database rolls back.
If the exception is caught and re‑thrown, the rollback still occurs.
If the exception is caught but not re‑thrown, the program continues and the database does not roll back.
3.2 Common Pitfalls
Applying @Transactional to non‑public methods: Spring’s transaction interceptor only reads annotations on public methods.
Incorrect rollbackFor configuration.
Self‑invocation within the same class bypasses the proxy, causing the transaction annotation to be ignored.
Catching exceptions without re‑throwing prevents rollback.
4. Adding Transaction Manually
Manual transaction control can be achieved by obtaining a TransactionStatus from DataSourceTransactionManager, performing the async operations, and then committing or rolling back based on the aggregated result.
@Transactional(rollbackFor = Exception.class)
public void readXls(String filePath, String filename) throws Exception {
try {
// complex operations ...
List<Future<Integer>> futureList = new ArrayList<>();
for (int i = 0; i < times; i++) {
Future<Integer> sumFuture = readExcelDataAsyncFutureService.readXlsCacheAsyncMybatis();
futureList.add(sumFuture);
}
boolean futureFlag = getFutureResult(futureList, excelRow);
if (futureFlag) {
logger.info("readXlsCacheAsync---插入数据成功,提交事务");
} else {
TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
logger.info("readXlsCacheAsync---插入数据失败,回滚事务");
}
} catch (Exception e) {
TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
logger.error("readXlsCacheAsync---插入数据异常,回滚事务:", e);
}
}
@Async("async-executor")
@Override
public Integer readXlsCacheAsyncMybatis() {
try {
// complex operations ...
} catch (Exception e) {
throw new RuntimeException("插入数据库异常", e);
}
}5. @Async + @Transactional Transaction Failure
The requirement is to roll back all async thread transactions when any thread fails.
Both @Async and @Transactional are implemented via Spring AOP, each adding its own interceptor. Spring’s transaction propagation uses ThreadLocal, which cannot cross thread boundaries.
6. Transaction Propagation Cannot Cross Threads
Because ThreadLocal is thread‑private, a transaction started in one thread is invisible to another thread.
7. Solution: One Transaction per Async Thread with Transaction Copy
We create a separate transaction for each async thread, collect the TransactionStatus objects, and after the main thread gathers all Future results, we either commit or roll back each transaction.
List<TransactionStatus> transactionStatusList = Collections.synchronizedList(new ArrayList<>());
List<TransactionResource> transactionResourceList = Collections.synchronizedList(new ArrayList<>());
try {
List<Future<Integer>> futureList = new ArrayList<>();
for (int i = 0; i < times; i++) {
Future<Integer> sumFuture = readAsyncFutureTransactionDBService.readXlsCacheAsyncMybatis(sheet, row, start, end, insertBuilder, transactionStatusList, transactionResourceList);
futureList.add(sumFuture);
}
boolean futureFlag = getFutureResult(futureList, excelRow);
if (futureFlag) {
for (TransactionStatus ts : transactionStatusList) {
dataSourceTransactionManager.commit(ts);
}
logger.info("readXlsCacheAsync---插入数据成功,提交事务");
} else {
for (TransactionStatus ts : transactionStatusList) {
dataSourceTransactionManager.rollback(ts);
}
logger.info("readXlsCacheAsync---插入数据失败,事务回滚");
throw new RuntimeException("readXlsCacheAsync---插入数据异常,异常事务回滚");
}
} catch (Exception e) {
logger.error("readXlsCacheAsync---插入数据异常,事务回滚:", e);
for (TransactionStatus ts : transactionStatusList) {
dataSourceTransactionManager.rollback(ts);
}
throw new RuntimeException("readXlsCacheAsync---插入数据异常,异常事务回滚");
}7.1 Transaction Copy Class
static class TransactionResource {
private Map<Object, Object> resources;
private Set<TransactionSynchronization> synchronizations;
private String currentTransactionName;
private Boolean currentTransactionReadOnly;
private Integer currentTransactionIsolationLevel;
private Boolean actualTransactionActive;
public static TransactionResource copyTransactionResource() {
return TransactionResource.builder()
.resources(TransactionSynchronizationManager.getResourceMap())
.synchronizations(new LinkedHashSet<>())
.currentTransactionName(TransactionSynchronizationManager.getCurrentTransactionName())
.currentTransactionReadOnly(TransactionSynchronizationManager.isCurrentTransactionReadOnly())
.currentTransactionIsolationLevel(TransactionSynchronizationManager.getCurrentTransactionIsolationLevel())
.actualTransactionActive(TransactionSynchronizationManager.isActualTransactionActive())
.build();
}
public void autoWiredTransactionResource() {
resources.forEach(TransactionSynchronizationManager::bindResource);
TransactionSynchronizationManager.initSynchronization();
TransactionSynchronizationManager.setActualTransactionActive(actualTransactionActive);
TransactionSynchronizationManager.setCurrentTransactionName(currentTransactionName);
TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(currentTransactionIsolationLevel);
TransactionSynchronizationManager.setCurrentTransactionReadOnly(currentTransactionReadOnly);
}
public void removeTransactionResource() {
resources.keySet().forEach(key -> {
if (!(key instanceof DataSource)) {
TransactionSynchronizationManager.unbindResource(key);
}
});
}
}8. Summary
After extensive effort, we solved the problem of rolling back all async thread transactions when any thread fails, completing the double‑async import series.
Key points:
Use EasyExcel for asynchronous Excel reading.
Obtain async return values with Future and compare with Excel rows to ensure consistency.
Employ CompletableFuture with a custom ForkJoinPool to avoid main‑thread blocking.
Set the core thread count and rows per thread for optimal efficiency.
Implement manual transaction control, one transaction per thread, and copy transaction resources to achieve effective async transaction management.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Su San Talks Tech
Su San, former staff at several leading tech companies, is a top creator on Juejin and a premium creator on CSDN, and runs the free coding practice site www.susan.net.cn.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
