Ensuring Transaction Rollback in Multi‑Threaded Spring Batch Inserts
This article explains why the @Transactional annotation fails in multithreaded Spring batch insert scenarios, demonstrates the problem with code examples, and shows how to use SqlSession for manual commit and rollback to guarantee atomicity across all threads.
Background Introduction
1. A business scenario requires inserting a large amount of data after performing other modifications; the insert is split into multiple threads to improve response time, and the whole operation must roll back if any thread fails.
2. In Spring, the @Transactional annotation controls transaction rollback, but it does not take effect in multithreaded contexts; when a child thread throws an exception, the main thread’s database changes are not rolled back.
3. The following simple example demonstrates multithreaded transaction handling.
Common Classes and Methods
/**
* Average split list method.
*/
public static <T> List<List<T>> averageAssign(List<T> source, int n) {
List<List<T>> result = new ArrayList<>();
int remainder = source.size() % n;
int number = source.size() / n;
int offset = 0;
for (int i = 0; i < n; i++) {
List<T> value;
if (remainder > 0) {
value = source.subList(i * number + offset, (i + 1) * number + offset + 1);
remainder--;
offset++;
} else {
value = source.subList(i * number + offset, (i + 1) * number + offset);
}
result.add(value);
}
return result;
}
/** Thread pool configuration */
public class ExecutorConfig {
private static int maxPoolSize = Runtime.getRuntime().availableProcessors();
private volatile static ExecutorService executorService;
public static ExecutorService getThreadPool() {
if (executorService == null) {
synchronized (ExecutorConfig.class) {
if (executorService == null) {
executorService = newThreadPool();
}
}
}
return executorService;
}
private static ExecutorService newThreadPool() {
int queueSize = 500;
int corePool = Math.min(5, maxPoolSize);
return new ThreadPoolExecutor(corePool, maxPoolSize, 10000L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(queueSize), new ThreadPoolExecutor.AbortPolicy());
}
private ExecutorConfig() {}
}
/** SqlSession provider */
@Component
public class SqlContext {
@Resource
private SqlSessionTemplate sqlSessionTemplate;
public SqlSession getSqlSession() {
SqlSessionFactory sqlSessionFactory = sqlSessionTemplate.getSqlSessionFactory();
return sqlSessionFactory.openSession();
}
}Example of Transaction Failure
@Override
@Transactional
public void saveThread(List<EmployeeDO> employeeDOList) {
try {
// delete first; this will not roll back if a child thread fails
this.getBaseMapper().delete(null);
ExecutorService service = ExecutorConfig.getThreadPool();
List<List<EmployeeDO>> lists = averageAssign(employeeDOList, 5);
Thread[] threadArray = new Thread[lists.size()];
CountDownLatch countDownLatch = new CountDownLatch(lists.size());
AtomicBoolean atomicBoolean = new AtomicBoolean(true);
for (int i = 0; i < lists.size(); i++) {
if (i == lists.size() - 1) {
atomicBoolean.set(false);
}
List<EmployeeDO> list = lists.get(i);
threadArray[i] = new Thread(() -> {
try {
if (!atomicBoolean.get()) {
throw new ServiceException("001", "出现异常");
}
this.saveBatch(list);
} finally {
countDownLatch.countDown();
}
});
}
for (int i = 0; i < lists.size(); i++) {
service.execute(threadArray[i]);
}
countDownLatch.await();
System.out.println("添加完毕");
} catch (Exception e) {
log.info("error", e);
throw new ServiceException("002", "出现异常");
} finally {
connection.close();
}
}Result: When one child thread throws an exception, all other threads also report failure, but the delete operation performed by the main thread is not rolled back, confirming that @Transactional is ineffective in a multithreaded environment.
Solution: Use SqlSession to obtain a connection, set manual commit, and explicitly roll back or commit based on the outcome of all child threads.
@Resource
SqlContext sqlContext;
@Override
public void saveThread(List<EmployeeDO> employeeDOList) throws SQLException {
SqlSession sqlSession = sqlContext.getSqlSession();
Connection connection = sqlSession.getConnection();
try {
connection.setAutoCommit(false);
EmployeeMapper employeeMapper = sqlSession.getMapper(EmployeeMapper.class);
employeeMapper.delete(null);
ExecutorService service = ExecutorConfig.getThreadPool();
List<List<EmployeeDO>> lists = averageAssign(employeeDOList, 5);
AtomicBoolean atomicBoolean = new AtomicBoolean(true);
List<Callable<Integer>> callableList = new ArrayList<>();
for (int i = 0; i < lists.size(); i++) {
if (i == lists.size() - 1) {
atomicBoolean.set(false);
}
List<EmployeeDO> list = lists.get(i);
Callable<Integer> callable = () -> {
if (!atomicBoolean.get()) {
throw new ServiceException("001", "出现异常");
}
return employeeMapper.saveBatch(list);
};
callableList.add(callable);
}
List<Future<Integer>> futures = service.invokeAll(callableList);
for (Future<Integer> future : futures) {
if (future.get() <= 0) {
connection.rollback();
return;
}
}
connection.commit();
System.out.println("添加完毕");
} catch (Exception e) {
connection.rollback();
log.info("error", e);
throw new ServiceException("002", "出现异常");
} finally {
connection.close();
}
}SQL mapper for batch insert:
<insert id="saveBatch" parameterType="List">
INSERT INTO employee (employee_id, age, employee_name, birth_date, gender, id_number, creat_time, update_time, status)
VALUES
<foreach collection="list" item="item" index="index" separator=",">
(#{item.employeeId}, #{item.age}, #{item.employeeName}, #{item.birthDate}, #{item.gender},
#{item.idNumber}, #{item.creatTime}, #{item.updateTime}, #{item.status})
</foreach>
</insert>Test results show that with manual transaction control, if any child thread fails the whole operation rolls back, and when all threads succeed the data is inserted correctly.
Java Captain
Focused on Java technologies: SSM, the Spring ecosystem, microservices, MySQL, MyCat, clustering, distributed systems, middleware, Linux, networking, multithreading; occasionally covers DevOps tools like Jenkins, Nexus, Docker, ELK; shares practical tech insights and is dedicated to full‑stack Java development.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
