Merging Requests and Batch Querying in Spring Boot to Reduce Database Connections
This article explains how to merge concurrent user requests into a single batch SQL query using Java's LinkedBlockingQueue, ScheduledThreadPoolExecutor and CompletableFuture in a Spring Boot application, thereby saving database connections and improving performance under high concurrency.
When multiple users request their basic information simultaneously, the server may issue separate SQL queries for each request, wasting valuable database connections. By merging these requests into one batch query, the system can issue a single SQL statement, retrieve all needed rows, and then distribute the results back to the individual callers.
The implementation relies on a LinkedBlockingQueue to collect incoming requests, a ScheduledThreadPoolExecutor to trigger periodic merging, and CompletableFuture to deliver the result to the original caller. Java 8's CompletableFuture does not provide a timeout, so a custom queue‑based timeout is introduced.
public interface UserService {
Map<String, Users> queryUserByIdBatch(List<UserWrapBatchService.Request> userReqs);
} public class UserServiceImpl implements UserService {
@Resource
UsersMapper usersMapper;
@Override
public Map<String, Users> queryUserByIdBatch(List<UserWrapBatchService.Request> userReqs) {
List<Long> userIds = userReqs.stream()
.map(UserWrapBatchService.Request::getUserId)
.collect(Collectors.toList());
QueryWrapper<Users> queryWrapper = new QueryWrapper<>();
queryWrapper.in("id", userIds);
List<Users> users = usersMapper.selectList(queryWrapper);
Map<Long, List<Users>> userGroup = users.stream()
.collect(Collectors.groupingBy(Users::getId));
Map<String, Users> result = new HashMap<>();
userReqs.forEach(req -> {
List<Users> list = userGroup.get(req.getUserId());
if (!CollectionUtils.isEmpty(list)) {
result.put(req.getRequestId(), list.get(0));
} else {
result.put(req.getRequestId(), null);
}
});
return result;
}
}The batch service collects requests:
@Service
public class UserWrapBatchService {
@Resource
UserService userService;
public static int MAX_TASK_NUM = 100;
private final Queue<Request> queue = new LinkedBlockingQueue<>();
@PostConstruct
public void init() {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
executor.scheduleAtFixedRate(() -> {
int size = queue.size();
if (size == 0) return;
List<Request> list = new ArrayList<>();
for (int i = 0; i < size && i < MAX_TASK_NUM; i++) {
list.add(queue.poll());
}
List<Request> userReqs = new ArrayList<>(list);
Map<String, Users> response = userService.queryUserByIdBatch(userReqs);
for (Request r : list) {
r.getCompletableFuture().complete(response.get(r.getRequestId()));
}
}, 100, 10, TimeUnit.MILLISECONDS);
}
public Users queryUser(Long userId) {
Request request = new Request();
request.setRequestId(UUID.randomUUID().toString().replace("-", ""));
request.setUserId(userId);
CompletableFuture<Users> future = new CompletableFuture<>();
request.setCompletableFuture(future);
queue.offer(request);
try {
return future.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
return null;
}
}
static class Request {
String requestId;
Long userId;
CompletableFuture<Users> completableFuture;
// getters and setters omitted for brevity
}
}A simple controller demonstrates the usage:
@RestController
@RequestMapping("/merge")
public class MergeController {
@Resource
UserWrapBatchService userBatchService;
@GetMapping
public Callable<Users> merge(Long userId) {
return () -> userBatchService.queryUser(userId);
}
}To verify the effect under high concurrency, a test spawns 30 threads, each sending three requests after synchronizing with a CountDownLatch. The console output shows how many requests are merged per batch.
public class TestBatch {
private static final int THREAD_COUNT = 30;
private static final CountDownLatch LATCH = new CountDownLatch(THREAD_COUNT);
private static final RestTemplate restTemplate = new RestTemplate();
public static void main(String[] args) {
for (int i = 0; i < THREAD_COUNT; i++) {
new Thread(() -> {
LATCH.countDown();
try { LATCH.await(); } catch (InterruptedException e) { e.printStackTrace(); }
for (int j = 1; j <= 3; j++) {
int param = new Random().nextInt(4);
if (param <= 0) param++;
String resp = restTemplate.getForObject(
"http://localhost:8080/asyncAndMerge/merge?userId=" + param,
String.class);
System.out.println(Thread.currentThread().getName() + " param " + param + " resp " + resp);
}
}).start();
}
}
}Key issues addressed:
Java 8's CompletableFuture lacks a built‑in timeout, so the queue’s poll(timeout, unit) is used to impose a wait limit.
SQL statements have length limits; therefore MAX_TASK_NUM caps the number of requests merged in a single batch.
In summary, request merging and batch querying can dramatically reduce the number of database (or RPC) connections, improving throughput for high‑traffic services. The trade‑off is an added waiting time before the actual logic runs, making it unsuitable for low‑concurrency scenarios.
Full source code is available at https://gitee.com/apple_1030907690/spring-boot-kubernetes/tree/v1.0.5 .
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Architect
Professional architect sharing high‑quality architecture insights. Topics include high‑availability, high‑performance, high‑stability architectures, big data, machine learning, Java, system and distributed architecture, AI, and practical large‑scale architecture case studies. Open to ideas‑driven architects who enjoy sharing and learning.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
