How to Merge Concurrent Requests in Spring Boot to Save Database Connections
This article explains how to combine multiple simultaneous user requests on the server side using a queue, scheduled thread pool and CompletableFuture in Spring Boot, reducing database connections while handling high concurrency, and discusses implementation details, testing, and potential pitfalls.
Introduction
Request merging can significantly reduce the number of database connections by combining multiple user queries into a single SQL statement.
For example, three users (IDs 1, 2, 3) each request their basic information, which would normally generate three separate database calls.
Replacing the database with a remote service follows the same principle.
By merging the requests on the server, only one SQL query is sent, and the results are distributed back to each user based on a unique request ID.
Technical Approach
LinkedBlockingQueue (blocking queue)
ScheduledThreadPoolExecutor (scheduled task thread pool)
CompletableFuture (future without built‑in timeout in Java 8)
Code Implementation
Query User Code
<code>public interface UserService {
Map<String, Users> queryUserByIdBatch(List<UserWrapBatchService.Request> userReqs);
}
@Service
public class UserServiceImpl implements UserService {
@Resource
UsersMapper usersMapper;
@Override
public Map<String, Users> queryUserByIdBatch(List<UserWrapBatchService.Request> userReqs) {
List<Long> userIds = userReqs.stream().map(UserWrapBatchService.Request::getUserId).collect(Collectors.toList());
QueryWrapper<Users> queryWrapper = new QueryWrapper<>();
queryWrapper.in("id", userIds);
List<Users> users = usersMapper.selectList(queryWrapper);
Map<Long, List<Users>> userGroup = users.stream().collect(Collectors.groupingBy(Users::getId));
HashMap<String, Users> result = new HashMap<>();
userReqs.forEach(val -> {
List<Users> usersList = userGroup.get(val.getUserId());
if (!CollectionUtils.isEmpty(usersList)) {
result.put(val.getRequestId(), usersList.get(0));
} else {
result.put(val.getRequestId(), null);
}
});
return result;
}
}
</code>Merge Request Implementation
<code>package com.springboot.sample.service.impl;
import com.springboot.sample.bean.Users;
import com.springboot.sample.service.UserService;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.*;
@Service
public class UserWrapBatchService {
@Resource
UserService userService;
public static int MAX_TASK_NUM = 100;
public class Request {
String requestId;
Long userId;
CompletableFuture<Users> completableFuture;
// getters and setters omitted for brevity
}
private final Queue<Request> queue = new LinkedBlockingQueue();
@PostConstruct
public void init() {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
scheduledExecutorService.scheduleAtFixedRate(() -> {
int size = queue.size();
if (size == 0) {
return;
}
List<Request> list = new ArrayList<>();
System.out.println("Merged [" + size + "] requests");
for (int i = 0; i < size; i++) {
if (i < MAX_TASK_NUM) {
list.add(queue.poll());
}
}
List<Request> userReqs = new ArrayList<>();
for (Request request : list) {
userReqs.add(request);
}
Map<String, Users> response = userService.queryUserByIdBatch(userReqs);
for (Request request : list) {
Users result = response.get(request.requestId);
request.completableFuture.complete(result);
}
}, 100, 10, TimeUnit.MILLISECONDS);
}
public Users queryUser(Long userId) {
Request request = new Request();
request.requestId = UUID.randomUUID().toString().replace("-", "");
request.userId = userId;
CompletableFuture<Users> future = new CompletableFuture<>();
request.completableFuture = future;
queue.offer(request);
try {
return future.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
return null;
}
}
</code>Controller Call
<code>/*** Request merging ***/
@RequestMapping("/merge")
public Callable<Users> merge(Long userId) {
return new Callable<Users>() {
@Override
public Users call() throws Exception {
return userBatchService.queryUser(userId);
}
};
}
</code>High Concurrency Test
<code>package com.springboot.sample;
import org.springframework.web.client.RestTemplate;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
public class TestBatch {
private static int threadCount = 30;
private static final CountDownLatch COUNT_DOWN_LATCH = new CountDownLatch(threadCount);
private static final RestTemplate restTemplate = new RestTemplate();
public static void main(String[] args) {
for (int i = 0; i < threadCount; i++) {
new Thread(new Runnable() {
public void run() {
COUNT_DOWN_LATCH.countDown();
try { COUNT_DOWN_LATCH.await(); } catch (InterruptedException e) { e.printStackTrace(); }
for (int j = 1; j <= 3; j++) {
int param = new Random().nextInt(4);
if (param <= 0) { param++; }
String responseBody = restTemplate.getForObject(
"http://localhost:8080/asyncAndMerge/merge?userId=" + param,
String.class);
System.out.println(Thread.currentThread().getName() + " param " + param + " response " + responseBody);
}
}
}).start();
}
}
}
</code>Test Results
Issues to Note
Java 8 CompletableFuture does not provide a timeout mechanism.
SQL statements have length limits, so batch size must be limited (MAX_TASK_NUM in the example).
Problem Solving
Queue‑based Timeout Solution
<code>package com.springboot.sample.service.impl;
import com.springboot.sample.bean.Users;
import com.springboot.sample.service.UserService;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.*;
@Service
public class UserWrapBatchQueueService {
@Resource
UserService userService;
public static int MAX_TASK_NUM = 100;
public class Request {
String requestId;
Long userId;
LinkedBlockingQueue<Users> usersQueue;
// getters and setters omitted
}
private final Queue<Request> queue = new LinkedBlockingQueue();
@PostConstruct
public void init() {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
scheduledExecutorService.scheduleAtFixedRate(() -> {
int size = queue.size();
if (size == 0) { return; }
List<Request> list = new ArrayList<>();
System.out.println("Merged [" + size + "] requests");
for (int i = 0; i < size; i++) {
if (i < MAX_TASK_NUM) { list.add(queue.poll()); }
}
List<Request> userReqs = new ArrayList<>();
for (Request request : list) { userReqs.add(request); }
Map<String, Users> response = userService.queryUserByIdBatchQueue(userReqs);
for (Request userReq : userReqs) {
Users users = response.get(userReq.getRequestId());
userReq.usersQueue.offer(users);
}
}, 100, 10, TimeUnit.MILLISECONDS);
}
public Users queryUser(Long userId) {
Request request = new Request();
request.requestId = UUID.randomUUID().toString().replace("-", "");
request.userId = userId;
LinkedBlockingQueue<Users> usersQueue = new LinkedBlockingQueue<>();
request.usersQueue = usersQueue;
queue.offer(request);
try {
return usersQueue.poll(3000, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) { e.printStackTrace(); }
return null;
}
@Override
public Map<String, Users> queryUserByIdBatchQueue(List<UserWrapBatchQueueService.Request> userReqs) {
List<Long> userIds = userReqs.stream().map(UserWrapBatchQueueService.Request::getUserId).collect(Collectors.toList());
QueryWrapper<Users> queryWrapper = new QueryWrapper<>();
queryWrapper.in("id", userIds);
List<Users> users = usersMapper.selectList(queryWrapper);
Map<Long, List<Users>> userGroup = users.stream().collect(Collectors.groupingBy(Users::getId));
HashMap<String, Users> result = new HashMap<>();
userReqs.forEach(val -> {
List<Users> usersList = userGroup.get(val.getUserId());
if (!CollectionUtils.isEmpty(usersList)) {
result.put(val.getRequestId(), usersList.get(0));
} else {
result.put(val.getRequestId(), new Users());
}
});
return result;
}
}
</code>Conclusion
Request merging and batch processing can greatly save connection resources of the called system; the example uses a database, but the same principle applies to RPC calls. The downside is added waiting time before actual logic execution, making it unsuitable for low‑concurrency scenarios.
macrozheng
Dedicated to Java tech sharing and dissecting top open-source projects. Topics include Spring Boot, Spring Cloud, Docker, Kubernetes and more. Author’s GitHub project “mall” has 50K+ stars.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.