Request Merging and Batch Processing in Spring Boot to Reduce Database Connections
This article explains how to merge multiple user requests on the server side, batch them into a single SQL query using a blocking queue and CompletableFuture, and handle high‑concurrency scenarios with scheduled tasks and timeout‑aware queues to save database connection resources.
The article introduces the problem of multiple users (ids 1, 2, 3) each sending separate requests to query their basic information, which leads to three separate database connections—a costly resource.
By merging these requests on the server, only one SQL query is issued; the result is then grouped by a unique request ID and returned to each user, dramatically reducing connection usage. The same principle applies when the database is replaced by a remote service.
Technical Means
LinkedBlockQueue – a blocking queue for request aggregation.
ScheduledThreadPoolExecutor – a scheduled thread pool to periodically process queued requests.
CompletableFuture – used to deliver results back to the original callers (note: Java 8 CompletableFuture lacks a timeout mechanism).
Code Implementation
Query Service Interface
public interface UserService {
Map
queryUserByIdBatch(List
userReqs);
}Batch Request Service
package com.springboot.sample.service.impl;
import com.springboot.sample.bean.Users;
import com.springboot.sample.service.UserService;
import org.springframework.stereotype.Service;
import javax.annotation.PostConstruct;
import javax.annotation.Resource;
import java.util.*;
import java.util.concurrent.*;
@Service
public class UserWrapBatchService {
@Resource
private UserService userService;
public static int MAX_TASK_NUM = 100;
public class Request {
String requestId;
Long userId;
CompletableFuture
completableFuture;
// getters and setters omitted for brevity
}
private final Queue
queue = new LinkedBlockingQueue();
@PostConstruct
public void init() {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
scheduledExecutorService.scheduleAtFixedRate(() -> {
int size = queue.size();
if (size == 0) return;
List
list = new ArrayList<>();
for (int i = 0; i < size; i++) {
if (i < MAX_TASK_NUM) list.add(queue.poll());
}
List
userReqs = new ArrayList<>(list);
Map
response = userService.queryUserByIdBatch(userReqs);
for (Request request : list) {
Users result = response.get(request.requestId);
request.completableFuture.complete(result);
}
}, 100, 10, TimeUnit.MILLISECONDS);
}
public Users queryUser(Long userId) {
Request request = new Request();
request.requestId = UUID.randomUUID().toString().replace("-", "");
request.userId = userId;
CompletableFuture
future = new CompletableFuture<>();
request.completableFuture = future;
queue.offer(request);
try { return future.get(); } catch (Exception e) { e.printStackTrace(); }
return null;
}
}Controller Invocation
@RequestMapping("/merge")
public Callable
merge(Long userId) {
return () -> userBatchService.queryUser(userId);
}High‑Concurrency Test
public class TestBatch {
private static int threadCount = 30;
private static final CountDownLatch COUNT_DOWN_LATCH = new CountDownLatch(threadCount);
private static final RestTemplate restTemplate = new RestTemplate();
public static void main(String[] args) {
for (int i = 0; i < threadCount; i++) {
new Thread(() -> {
COUNT_DOWN_LATCH.countDown();
try { COUNT_DOWN_LATCH.await(); } catch (InterruptedException e) { e.printStackTrace(); }
for (int j = 1; j <= 3; j++) {
int param = new Random().nextInt(4);
if (param <= 0) param++;
String response = restTemplate.getForObject(
"http://localhost:8080/asyncAndMerge/merge?userId=" + param, String.class);
System.out.println(Thread.currentThread().getName() + " param " + param + " response " + response);
}
}).start();
}
}
}Issues to Note
Java 8 CompletableFuture does not provide a built‑in timeout; the article later shows a queue‑based workaround.
SQL statements have length limits, so the batch size must be capped (MAX_TASK_NUM).
Queue‑Based Timeout Solution
To add a timeout, the implementation replaces CompletableFuture with a LinkedBlockingQueue<Users> per request. The consumer thread polls the queue with a timeout (e.g., 3 seconds), allowing the caller to fail gracefully if the batch processing takes too long.
public Users queryUser(Long userId) {
Request request = new Request();
request.requestId = UUID.randomUUID().toString().replace("-", "");
request.userId = userId;
LinkedBlockingQueue
usersQueue = new LinkedBlockingQueue<>();
request.usersQueue = usersQueue;
queue.offer(request);
try { return usersQueue.poll(3000, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { e.printStackTrace(); }
return null;
}Conclusion
Request merging and batch processing can greatly reduce the number of connections to a database or remote service, but it introduces a small waiting period before actual logic execution, making it unsuitable for low‑concurrency scenarios.
Source code: https://gitee.com/apple_1030907690/spring-boot-kubernetes/tree/v1.0.5
Architecture Digest
Focusing on Java backend development, covering application architecture from top-tier internet companies (high availability, high performance, high stability), big data, machine learning, Java architecture, and other popular fields.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.