Mastering Multithreaded Transactions in Spring Boot with Custom Annotations

This article explains why @Transactional may fail in multithreaded Spring Boot scenarios, shows how to enable @Async with a custom thread pool, and demonstrates a pair of custom annotations together with AOP to achieve coordinated multithreaded transaction management.

Java Interview Crash Guide
Java Interview Crash Guide
Java Interview Crash Guide
Mastering Multithreaded Transactions in Spring Boot with Custom Annotations

When developing, we often encounter multithreaded transaction issues; adding @Transactional alone does not work because Spring stores the database connection in a ThreadLocal, so each thread gets a different connection and thus a different transaction.

The article is based on Spring Boot's @Async to start threads and uses custom annotations combined with AOP to implement multithreaded transactions, avoiding manual commit/rollback.

1. Declarative usage of Spring Boot multithreading

Spring Boot provides the @Async annotation to use a thread pool. The steps are:

(1) Add @EnableAsync to the main/config class to enable the thread pool.

(2) Add @Async to the method that should run in a child thread.

Note: By default the framework creates one thread per request; under high concurrency this can cause memory overflow.

Therefore a custom thread pool should be configured, for example:

@Configuration
@EnableAsync
public class ThreadPoolTaskConfig {
    @Bean("threadPoolTaskExecutor")
    public ThreadPoolTaskExecutor threadPoolTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(16);
        executor.setQueueCapacity(124);
        executor.setMaxPoolSize(64);
        executor.setKeepAliveSeconds(30);
        executor.setThreadNamePrefix("CustomThread-");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setAwaitTerminationSeconds(60);
        executor.initialize();
        return executor;
    }
}

To start a child thread, annotate the method with @Async("threadPoolTaskExecutor"), where the parameter is the name of the custom thread pool.

2. Custom annotations for multithreaded transaction control

Custom annotations

This implementation uses two annotations: the main thread acts as the coordinator, and each child thread acts as a participant.
package com.example.anno;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
 * Multithread transaction annotation: main transaction
 */
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface MainTransaction {
    int value(); // number of child threads
}
package com.example.anno;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
 * Multithread transaction annotation: sub transaction
 */
@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
public @interface SonTransaction {
    String value() default "";
}

Both annotations are placed on methods and should be used together with @Transactional(rollbackFor = Exception.class).

@MainTransaction is applied to the caller method; its required parameter specifies how many child threads are started (e.g., @MainTransaction(2) when two @Async methods are invoked). If the annotation is omitted, multithreaded transaction control is not applied.

@SonTransaction is applied to the callee (the method that runs in a child thread) and does not require parameters.

AOP implementation

The AOP class intercepts methods annotated with @MainTransaction and @SonTransaction to coordinate transaction boundaries across threads.

package com.example.aop;
import com.baomidou.mybatisplus.core.toolkit.CollectionUtils;
import com.example.anno.MainTransaction;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.springframework.stereotype.Component;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.TransactionDefinition;
import org.springframework.transaction.TransactionStatus;
import org.springframework.transaction.support.DefaultTransactionDefinition;
import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;

@Aspect
@Component
public class TransactionAop {
    private static final Map<String, Object> map = new HashMap<>();
    @Resource
    private PlatformTransactionManager transactionManager;

    @Around("@annotation(mainTransaction)")
    public void mainIntercept(ProceedingJoinPoint joinPoint, MainTransaction mainTransaction) throws Throwable {
        Thread thread = Thread.currentThread();
        String threadName = thread.getName();
        CountDownLatch mainDownLatch = new CountDownLatch(1);
        CountDownLatch sonDownLatch = new CountDownLatch(mainTransaction.value());
        AtomicBoolean rollBackFlag = new AtomicBoolean(false);
        Vector<Throwable> exceptionVector = new Vector<>();
        map.put(threadName + "mainDownLatch", mainDownLatch);
        map.put(threadName + "sonDownLatch", sonDownLatch);
        map.put(threadName + "rollBackFlag", rollBackFlag);
        map.put(threadName + "exceptionVector", exceptionVector);
        try {
            joinPoint.proceed();
        } catch (Throwable e) {
            exceptionVector.add(0, e);
            rollBackFlag.set(true);
            mainDownLatch.countDown();
        }
        if (!rollBackFlag.get()) {
            try {
                sonDownLatch.await();
                mainDownLatch.countDown();
            } catch (Exception e) {
                rollBackFlag.set(true);
                exceptionVector.add(0, e);
            }
        }
        if (CollectionUtils.isNotEmpty(exceptionVector)) {
            map.remove(threadName + "mainDownLatch");
            map.remove(threadName + "sonDownLatch");
            map.remove(threadName + "rollBackFlag");
            map.remove(threadName + "exceptionVector");
            throw exceptionVector.get(0);
        }
    }

    @Around("@annotation(com.huigu.common.anno.SonTransaction)")
    public void sonIntercept(ProceedingJoinPoint joinPoint) throws Throwable {
        Object[] args = joinPoint.getArgs();
        Thread thread = (Thread) args[args.length - 1];
        String threadName = thread.getName();
        CountDownLatch mainDownLatch = (CountDownLatch) map.get(threadName + "mainDownLatch");
        if (mainDownLatch == null) {
            joinPoint.proceed();
            return;
        }
        CountDownLatch sonDownLatch = (CountDownLatch) map.get(threadName + "sonDownLatch");
        AtomicBoolean rollBackFlag = (AtomicBoolean) map.get(threadName + "rollBackFlag");
        Vector<Throwable> exceptionVector = (Vector<Throwable>) map.get(threadName + "exceptionVector");
        if (rollBackFlag.get()) {
            sonDownLatch.countDown();
            return;
        }
        DefaultTransactionDefinition def = new DefaultTransactionDefinition();
        def.setPropagationBehavior(TransactionDefinition.PROPAGATION_REQUIRES_NEW);
        TransactionStatus status = transactionManager.getTransaction(def);
        try {
            joinPoint.proceed();
            sonDownLatch.countDown();
            mainDownLatch.await();
            if (rollBackFlag.get()) {
                transactionManager.rollback(status);
            } else {
                transactionManager.commit(status);
            }
        } catch (Throwable e) {
            exceptionVector.add(0, e);
            transactionManager.rollback(status);
            rollBackFlag.set(true);
            mainDownLatch.countDown();
            sonDownLatch.countDown();
        }
    }
}

CountDownLatch explanation

CountDownLatch is a synchronization aid that allows one or more threads to wait until a set of operations performed in other threads completes.

Created with a given count. countDown() decrements the count. await() blocks until the count reaches zero.

In this article, a latch initialized with 1 ( mainDownLatch) acts as a simple on/off switch, while a latch initialized with the number of child threads ( sonDownLatch) makes the main thread wait until all child threads finish.

Annotation usage demo

Service methods annotated with @SonTransaction and @Async:

package com.example.demo.service;
import com.example.demo.anno.SonTransaction;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

@Service
public class SonService {
    @Transactional(rollbackFor = Exception.class)
    @Async("threadPoolTaskExecutor")
    @SonTransaction
    public void sonMethod1(String args, Thread thread) {
        System.out.println(args + "开启了线程");
    }
    @Transactional(rollbackFor = Exception.class)
    @Async("threadPoolTaskExecutor")
    @SonTransaction
    public void sonMethod2(String args1, String args2, Thread thread) {
        System.out.println(args1 + "和" + args2 + "开启了线程");
    }
    @Transactional(rollbackFor = Exception.class)
    @Async("threadPoolTaskExecutor")
    @SonTransaction
    public void sonMethod3(String args, Thread thread) {
        System.out.println(args + "开启了线程");
    }
    @Transactional(rollbackFor = Exception.class)
    public void sonMethod4(String args) {
        System.out.println(args + "没有开启线程");
    }
}

Calling service with @MainTransaction:

package com.example.demo.service;
import com.example.demo.anno.MainTransaction;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.Resource;

@Service
public class MainService {
    @Resource
    private SonService sonService;

    @MainTransaction(3)
    @Transactional(rollbackFor = Exception.class)
    public void test1() {
        sonService.sonMethod1("路飞", Thread.currentThread());
        sonService.sonMethod2("索隆", "山治", Thread.currentThread());
        sonService.sonMethod3("娜美", Thread.currentThread());
        sonService.sonMethod4("罗宾");
    }
    // In complex branching scenarios you may omit @MainTransaction to avoid lock contention.
    @Transactional(rollbackFor = Exception.class)
    public void test2() {
        sonService.sonMethod1("路飞", Thread.currentThread());
        sonService.sonMethod2("索隆", "山治", Thread.currentThread());
        sonService.sonMethod3("娜美", Thread.currentThread());
        sonService.sonMethod4("罗宾");
    }
}

When business logic has many conditional branches that trigger different numbers of threads, you can choose to omit @MainTransaction to prevent potential lock table issues, accepting that exceptions may not roll back all threads simultaneously.

AOPThreadPoolSpring BootCustom AnnotationAsyncMultithreaded Transaction
Java Interview Crash Guide
Written by

Java Interview Crash Guide

Dedicated to sharing Java interview Q&A; follow and reply "java" to receive a free premium Java interview guide.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.