Merging Requests and Batch Querying in Spring Boot to Reduce Database Connections
This article explains how to merge concurrent user requests into a single batch SQL query using Java's LinkedBlockingQueue, ScheduledThreadPoolExecutor and CompletableFuture in a Spring Boot application, thereby saving database connections and improving performance under high concurrency.
When multiple users request their basic information simultaneously, the server may issue separate SQL queries for each request, wasting valuable database connections. By merging these requests into one batch query, the system can issue a single SQL statement, retrieve all needed rows, and then distribute the results back to the individual callers.
The implementation relies on a LinkedBlockingQueue to collect incoming requests, a ScheduledThreadPoolExecutor to trigger periodic merging, and CompletableFuture to deliver the result to the original caller. Java 8's CompletableFuture does not provide a timeout, so a custom queue‑based timeout is introduced.
public interface UserService { Map queryUserByIdBatch(List userReqs); }
public class UserServiceImpl implements UserService { @Resource UsersMapper usersMapper; @Override public Map queryUserByIdBatch(List userReqs) { List userIds = userReqs.stream() .map(UserWrapBatchService.Request::getUserId) .collect(Collectors.toList()); QueryWrapper queryWrapper = new QueryWrapper<>(); queryWrapper.in("id", userIds); List users = usersMapper.selectList(queryWrapper); Map > userGroup = users.stream() .collect(Collectors.groupingBy(Users::getId)); Map result = new HashMap<>(); userReqs.forEach(req -> { List list = userGroup.get(req.getUserId()); if (!CollectionUtils.isEmpty(list)) { result.put(req.getRequestId(), list.get(0)); } else { result.put(req.getRequestId(), null); } }); return result; } }
The batch service collects requests:
@Service public class UserWrapBatchService { @Resource UserService userService; public static int MAX_TASK_NUM = 100; private final Queue queue = new LinkedBlockingQueue<>(); @PostConstruct public void init() { ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); executor.scheduleAtFixedRate(() -> { int size = queue.size(); if (size == 0) return; List list = new ArrayList<>(); for (int i = 0; i < size && i < MAX_TASK_NUM; i++) { list.add(queue.poll()); } List userReqs = new ArrayList<>(list); Map response = userService.queryUserByIdBatch(userReqs); for (Request r : list) { r.getCompletableFuture().complete(response.get(r.getRequestId())); } }, 100, 10, TimeUnit.MILLISECONDS); } public Users queryUser(Long userId) { Request request = new Request(); request.setRequestId(UUID.randomUUID().toString().replace("-", "")); request.setUserId(userId); CompletableFuture future = new CompletableFuture<>(); request.setCompletableFuture(future); queue.offer(request); try { return future.get(); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); return null; } } static class Request { String requestId; Long userId; CompletableFuture completableFuture; // getters and setters omitted for brevity } }
A simple controller demonstrates the usage:
@RestController @RequestMapping("/merge") public class MergeController { @Resource UserWrapBatchService userBatchService; @GetMapping public Callable merge(Long userId) { return () -> userBatchService.queryUser(userId); } }
To verify the effect under high concurrency, a test spawns 30 threads, each sending three requests after synchronizing with a CountDownLatch . The console output shows how many requests are merged per batch.
public class TestBatch { private static final int THREAD_COUNT = 30; private static final CountDownLatch LATCH = new CountDownLatch(THREAD_COUNT); private static final RestTemplate restTemplate = new RestTemplate(); public static void main(String[] args) { for (int i = 0; i < THREAD_COUNT; i++) { new Thread(() -> { LATCH.countDown(); try { LATCH.await(); } catch (InterruptedException e) { e.printStackTrace(); } for (int j = 1; j <= 3; j++) { int param = new Random().nextInt(4); if (param <= 0) param++; String resp = restTemplate.getForObject( "http://localhost:8080/asyncAndMerge/merge?userId=" + param, String.class); System.out.println(Thread.currentThread().getName() + " param " + param + " resp " + resp); } }).start(); } } }
Key issues addressed:
Java 8's CompletableFuture lacks a built‑in timeout, so the queue’s poll(timeout, unit) is used to impose a wait limit.
SQL statements have length limits; therefore MAX_TASK_NUM caps the number of requests merged in a single batch.
In summary, request merging and batch querying can dramatically reduce the number of database (or RPC) connections, improving throughput for high‑traffic services. The trade‑off is an added waiting time before the actual logic runs, making it unsuitable for low‑concurrency scenarios.
Full source code is available at https://gitee.com/apple_1030907690/spring-boot-kubernetes/tree/v1.0.5 .
Architect
Professional architect sharing high‑quality architecture insights. Topics include high‑availability, high‑performance, high‑stability architectures, big data, machine learning, Java, system and distributed architecture, AI, and practical large‑scale architecture case studies. Open to ideas‑driven architects who enjoy sharing and learning.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.