Implementing Multi‑threaded Transactions in Spring Boot with Custom Annotations and AOP
This article explains why @Transactional fails in multithreaded Spring Boot scenarios, then demonstrates how to combine @Async, a custom thread‑pool, @MainTransaction and @SonTransaction annotations with AOP logic to achieve reliable multi‑threaded transaction control, complete with code examples and usage guidelines.
When using Spring Boot, adding @Transactional alone does not guarantee transaction integrity across multiple threads because the database connection is stored in a ThreadLocal, causing each thread to obtain a different connection and thus separate transactions.
The solution is to enable asynchronous execution with @Async, define two custom annotations— @MainTransaction (applied to the coordinating method) and @SonTransaction (applied to the asynchronous worker methods)—and use an AOP aspect to manage a shared transaction context.
First, configure a custom thread pool:
@Configuration
@EnableAsync
public class ThreadPoolTaskConfig {
@Bean("threadPoolTaskExecutor")
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(16);
executor.setMaxPoolSize(64);
executor.setQueueCapacity(124);
executor.setKeepAliveSeconds(30);
executor.setThreadNamePrefix("CustomThreadPool-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(60);
executor.initialize();
return executor;
}
}The AOP component intercepts methods annotated with @MainTransaction and @SonTransaction:
@Aspect
@Component
public class TransactionAop {
private static final Map<String, Object> map = new HashMap<>();
@Resource
private PlatformTransactionManager transactionManager;
@Around("@annotation(mainTransaction)")
public void mainIntercept(ProceedingJoinPoint joinPoint, MainTransaction mainTransaction) throws Throwable {
Thread thread = Thread.currentThread();
String threadName = thread.getName();
CountDownLatch mainDownLatch = new CountDownLatch(1);
CountDownLatch sonDownLatch = new CountDownLatch(mainTransaction.value());
AtomicBoolean rollBackFlag = new AtomicBoolean(false);
Vector<Throwable> exceptionVector = new Vector<>();
map.put(threadName + "mainDownLatch", mainDownLatch);
map.put(threadName + "sonDownLatch", sonDownLatch);
map.put(threadName + "rollBackFlag", rollBackFlag);
map.put(threadName + "exceptionVector", exceptionVector);
try {
joinPoint.proceed();
} catch (Throwable e) {
exceptionVector.add(0, e);
rollBackFlag.set(true);
mainDownLatch.countDown();
}
if (!rollBackFlag.get()) {
try {
sonDownLatch.await();
mainDownLatch.countDown();
} catch (Exception e) {
rollBackFlag.set(true);
exceptionVector.add(0, e);
}
if (CollectionUtils.isNotEmpty(exceptionVector)) {
mapCleanup(threadName);
throw exceptionVector.get(0);
}
}
}
@Around("@annotation(com.huigu.common.anno.SonTransaction)")
public void sonIntercept(ProceedingJoinPoint joinPoint) throws Throwable {
Object[] args = joinPoint.getArgs();
Thread thread = (Thread) args[args.length - 1];
String threadName = thread.getName();
CountDownLatch mainDownLatch = (CountDownLatch) map.get(threadName + "mainDownLatch");
if (mainDownLatch == null) {
joinPoint.proceed();
return;
}
CountDownLatch sonDownLatch = (CountDownLatch) map.get(threadName + "sonDownLatch");
AtomicBoolean rollBackFlag = (AtomicBoolean) map.get(threadName + "rollBackFlag");
Vector<Throwable> exceptionVector = (Vector<Throwable>) map.get(threadName + "exceptionVector");
if (rollBackFlag.get()) {
sonDownLatch.countDown();
return;
}
DefaultTransactionDefinition def = new DefaultTransactionDefinition();
def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
TransactionStatus status = transactionManager.getTransaction(def);
try {
joinPoint.proceed();
sonDownLatch.countDown();
mainDownLatch.await();
if (rollBackFlag.get()) {
transactionManager.rollback(status);
} else {
transactionManager.commit(status);
}
} catch (Throwable e) {
exceptionVector.add(0, e);
transactionManager.rollback(status);
rollBackFlag.set(true);
mainDownLatch.countDown();
sonDownLatch.countDown();
}
}
private void mapCleanup(String threadName) {
map.remove(threadName + "mainDownLatch");
map.remove(threadName + "sonDownLatch");
map.remove(threadName + "rollBackFlag");
map.remove(threadName + "exceptionVector");
}
}Demo services illustrate the usage:
package com.example.demo.service;
@Service
public class SonService {
@Transactional(rollbackFor = Exception.class)
@Async("threadPoolTaskExecutor")
@SonTransaction
public void sonMethod1(String args, Thread thread) { System.out.println(args + " opened thread"); }
// ... similar sonMethod2, sonMethod3, sonMethod4 (without async) ...
} package com.example.demo.service;
@Service
public class MainService {
@Resource
private SonService sonService;
@MainTransaction(3)
@Transactional(rollbackFor = Exception.class)
public void test1() {
sonService.sonMethod1("Luffy", Thread.currentThread());
sonService.sonMethod2("Zoro", "Sanji", Thread.currentThread());
sonService.sonMethod3("Nami", Thread.currentThread());
sonService.sonMethod4("Robin");
}
}The article also explains the role of CountDownLatch —initialised with 1 for the main latch and with the number of child threads for the son latch—to synchronise the start and completion of asynchronous tasks, ensuring that all sub‑transactions either commit together or roll back together.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Architect's Tech Stack
Java backend, microservices, distributed systems, containerized programming, and more.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
