Mastering Multithreaded Transactions in Spring Boot with Custom Annotations
This article explains why @Transactional may fail in multithreaded Spring Boot scenarios, shows how to enable @Async with a custom thread pool, and demonstrates a pair of custom annotations together with AOP to achieve coordinated multithreaded transaction management.
When developing, we often encounter multithreaded transaction issues; adding @Transactional alone does not work because Spring stores the database connection in a ThreadLocal, so each thread gets a different connection and thus a different transaction.
The article is based on Spring Boot's @Async to start threads and uses custom annotations combined with AOP to implement multithreaded transactions, avoiding manual commit/rollback.
1. Declarative usage of Spring Boot multithreading
Spring Boot provides the @Async annotation to use a thread pool. The steps are:
(1) Add @EnableAsync to the main/config class to enable the thread pool.
(2) Add @Async to the method that should run in a child thread.
Note: By default the framework creates one thread per request; under high concurrency this can cause memory overflow.
Therefore a custom thread pool should be configured, for example:
@Configuration
@EnableAsync
public class ThreadPoolTaskConfig {
@Bean("threadPoolTaskExecutor")
public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(16);
executor.setQueueCapacity(124);
executor.setMaxPoolSize(64);
executor.setKeepAliveSeconds(30);
executor.setThreadNamePrefix("CustomThread-");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setAwaitTerminationSeconds(60);
executor.initialize();
return executor;
}
}To start a child thread, annotate the method with @Async("threadPoolTaskExecutor"), where the parameter is the name of the custom thread pool.
2. Custom annotations for multithreaded transaction control
Custom annotations
This implementation uses two annotations: the main thread acts as the coordinator, and each child thread acts as a participant.
package com.example.anno;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* Multithread transaction annotation: main transaction
*/
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface MainTransaction {
int value(); // number of child threads
} package com.example.anno;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* Multithread transaction annotation: sub transaction
*/
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface SonTransaction {
String value() default "";
}Both annotations are placed on methods and should be used together with @Transactional(rollbackFor = Exception.class).
@MainTransaction is applied to the caller method; its required parameter specifies how many child threads are started (e.g., @MainTransaction(2) when two @Async methods are invoked). If the annotation is omitted, multithreaded transaction control is not applied.
@SonTransaction is applied to the callee (the method that runs in a child thread) and does not require parameters.
AOP implementation
The AOP class intercepts methods annotated with @MainTransaction and @SonTransaction to coordinate transaction boundaries across threads.
package com.example.aop;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.example.anno.MainTransaction;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.stereotype.Component;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
@Aspect
@Component
public class TransactionAop {
private static final Map<String, Object> map = new HashMap<>();
@Resource
private PlatformTransactionManager transactionManager;
@Around("@annotation(mainTransaction)")
public void mainIntercept(ProceedingJoinPoint joinPoint, MainTransaction mainTransaction) throws Throwable {
Thread thread = Thread.currentThread();
String threadName = thread.getName();
CountDownLatch mainDownLatch = new CountDownLatch(1);
CountDownLatch sonDownLatch = new CountDownLatch(mainTransaction.value());
AtomicBoolean rollBackFlag = new AtomicBoolean(false);
Vector<Throwable> exceptionVector = new Vector<>();
map.put(threadName + "mainDownLatch", mainDownLatch);
map.put(threadName + "sonDownLatch", sonDownLatch);
map.put(threadName + "rollBackFlag", rollBackFlag);
map.put(threadName + "exceptionVector", exceptionVector);
try {
joinPoint.proceed();
} catch (Throwable e) {
exceptionVector.add(0, e);
rollBackFlag.set(true);
mainDownLatch.countDown();
}
if (!rollBackFlag.get()) {
try {
sonDownLatch.await();
mainDownLatch.countDown();
} catch (Exception e) {
rollBackFlag.set(true);
exceptionVector.add(0, e);
}
}
if (CollectionUtils.isNotEmpty(exceptionVector)) {
map.remove(threadName + "mainDownLatch");
map.remove(threadName + "sonDownLatch");
map.remove(threadName + "rollBackFlag");
map.remove(threadName + "exceptionVector");
throw exceptionVector.get(0);
}
}
@Around("@annotation(com.huigu.common.anno.SonTransaction)")
public void sonIntercept(ProceedingJoinPoint joinPoint) throws Throwable {
Object[] args = joinPoint.getArgs();
Thread thread = (Thread) args[args.length - 1];
String threadName = thread.getName();
CountDownLatch mainDownLatch = (CountDownLatch) map.get(threadName + "mainDownLatch");
if (mainDownLatch == null) {
joinPoint.proceed();
return;
}
CountDownLatch sonDownLatch = (CountDownLatch) map.get(threadName + "sonDownLatch");
AtomicBoolean rollBackFlag = (AtomicBoolean) map.get(threadName + "rollBackFlag");
Vector<Throwable> exceptionVector = (Vector<Throwable>) map.get(threadName + "exceptionVector");
if (rollBackFlag.get()) {
sonDownLatch.countDown();
return;
}
DefaultTransactionDefinition def = new DefaultTransactionDefinition();
def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
TransactionStatus status = transactionManager.getTransaction(def);
try {
joinPoint.proceed();
sonDownLatch.countDown();
mainDownLatch.await();
if (rollBackFlag.get()) {
transactionManager.rollback(status);
} else {
transactionManager.commit(status);
}
} catch (Throwable e) {
exceptionVector.add(0, e);
transactionManager.rollback(status);
rollBackFlag.set(true);
mainDownLatch.countDown();
sonDownLatch.countDown();
}
}
}CountDownLatch explanation
CountDownLatch is a synchronization aid that allows one or more threads to wait until a set of operations performed in other threads completes.
Created with a given count. countDown() decrements the count. await() blocks until the count reaches zero.
In this article, a latch initialized with 1 ( mainDownLatch) acts as a simple on/off switch, while a latch initialized with the number of child threads ( sonDownLatch) makes the main thread wait until all child threads finish.
Annotation usage demo
Service methods annotated with @SonTransaction and @Async:
package com.example.demo.service;
import com.example.demo.anno.SonTransaction;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
@Service
public class SonService {
@Transactional(rollbackFor = Exception.class)
@Async("threadPoolTaskExecutor")
@SonTransaction
public void sonMethod1(String args, Thread thread) {
System.out.println(args + "开启了线程");
}
@Transactional(rollbackFor = Exception.class)
@Async("threadPoolTaskExecutor")
@SonTransaction
public void sonMethod2(String args1, String args2, Thread thread) {
System.out.println(args1 + "和" + args2 + "开启了线程");
}
@Transactional(rollbackFor = Exception.class)
@Async("threadPoolTaskExecutor")
@SonTransaction
public void sonMethod3(String args, Thread thread) {
System.out.println(args + "开启了线程");
}
@Transactional(rollbackFor = Exception.class)
public void sonMethod4(String args) {
System.out.println(args + "没有开启线程");
}
}Calling service with @MainTransaction:
package com.example.demo.service;
import com.example.demo.anno.MainTransaction;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;
@Service
public class MainService {
@Resource
private SonService sonService;
@MainTransaction(3)
@Transactional(rollbackFor = Exception.class)
public void test1() {
sonService.sonMethod1("路飞", Thread.currentThread());
sonService.sonMethod2("索隆", "山治", Thread.currentThread());
sonService.sonMethod3("娜美", Thread.currentThread());
sonService.sonMethod4("罗宾");
}
// In complex branching scenarios you may omit @MainTransaction to avoid lock contention.
@Transactional(rollbackFor = Exception.class)
public void test2() {
sonService.sonMethod1("路飞", Thread.currentThread());
sonService.sonMethod2("索隆", "山治", Thread.currentThread());
sonService.sonMethod3("娜美", Thread.currentThread());
sonService.sonMethod4("罗宾");
}
}When business logic has many conditional branches that trigger different numbers of threads, you can choose to omit @MainTransaction to prevent potential lock table issues, accepting that exceptions may not roll back all threads simultaneously.
Java Interview Crash Guide
Dedicated to sharing Java interview Q&A; follow and reply "java" to receive a free premium Java interview guide.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
