Preventing JVM OOM in Bulk Data Queries with MyBatis‑Plus Bulk Executor
The article explains why unrestricted large‑table queries cause JVM Out‑Of‑Memory errors, analyzes the drawbacks of simple LIMIT pagination, and provides a MyBatis‑Plus based bulk‑execution utility with code examples for safe, efficient batch processing and multithreaded execution.
Background
During development, many developers query large amounts of data without limiting the result set, which can cause the JVM to run out of memory (OOM) when the data grows.
Problem Analysis
Loading an entire large table into memory creates many objects that cannot be reclaimed, leading to OOM. Simple select * from table; is unsafe; using LIMIT can reduce the load but may be inefficient for very large offsets.
Recommended Query Practices
Use LIMIT with a reasonable page size, e.g., select * from table limit 0,200;. For large offsets, prefer range queries on indexed columns, e.g., select * from table where id > 0 and id < 201;.
Code Implementation
The following Java classes demonstrate a bulk executor built on MyBatis‑Plus. BulkExecutorParam defines parameters such as batch size, start page, query wrapper, service, multithreading options, and executor pool.
/**
* 定义所需要的参数
*/
@Data
@Builder
@RequiredArgsConstructor
public class BulkExecutorParam<T> {
/**
* batch size
*/
@Builder.Default
private Integer batchSize = 500;
/**
* start page
*/
@Builder.Default
private Integer start = 1;
/**
* query wrapper
*/
private Wrapper<T> queryWrapper;
/**
* query service
*/
private IService<T> service;
/**
* is need multiple threads
*/
@Builder.Default
private Boolean isMultiThreaded = Boolean.TRUE;
/**
* parallelism for executors
*/
@Builder.Default
private Integer parallelism = 0;
/**
* count: if present,use it else use service.count()
*/
@Builder.Default
private Integer count = 0;
/**
* query function
*/
private Function<Integer, List<T>> queryFunc;
/**
* execute consumer
*/
private Consumer<List<T>> execConsumer;
/**
* thread pool
*/
private ExecutorService executors;
public ExecutorService getExecutors() {
return this.parallelism > 0 ? Executors.newWorkStealingPool(this.parallelism) : Executors.newWorkStealingPool();
}
}The core utility BulkExecutorUtil provides static methods execute and submit to process batches either synchronously or asynchronously, handling pagination, multithreading, and result collection.
public class BulkExecutorUtil {
/**
* execute batch
*/
public static <T> void execute(BulkExecutorParam<T> param, Consumer<List<T>> consumer) {
param.setExecConsumer(consumer);
submit(param, null);
}
/**
* batch submit with result
*/
public static <T, R> List<Future<R>> submit(BulkExecutorParam<T> param, Function<List<T>, R> execFunc) {
List<Future<R>> futures = new ArrayList<>();
IService<T> service = param.getService();
Wrapper<T> queryWrapper = param.getQueryWrapper();
int count = param.getCount() > 0 ? param.getCount() : service.count(queryWrapper);
if (count == 0) {
return futures;
}
Integer batchSize = param.getBatchSize();
int pageCount = (count + batchSize - 1) / batchSize;
IntStream.rangeClosed(param.getStart(), pageCount).forEach(currentPage -> {
if (param.getIsMultiThreaded()) {
if (execFunc == null) {
param.getExecutors().execute(() -> {
List<T> records = getRecords(param.getQueryFunc(), service, queryWrapper, batchSize, currentPage);
if (CollectionUtil.isEmpty(records)) {
return;
}
param.getExecConsumer().accept(records);
});
} else {
Future<R> submit = param.getExecutors().submit(() -> {
List<T> records = getRecords(param.getQueryFunc(), service, queryWrapper, batchSize, currentPage);
return execFunc.apply(records);
});
futures.add(submit);
}
} else {
List<T> records = getRecords(param.getQueryFunc(), service, queryWrapper, batchSize, currentPage);
if (CollectionUtil.isEmpty(records)) {
return;
}
param.getExecConsumer().accept(records);
}
});
return futures;
}
private static <T> List<T> getRecords(Function<Integer, List<T>> queryFunc,
IService<T> service,
Wrapper<T> queryWrapper,
Integer batchSize,
int currentPage) {
return service == null ? queryFunc.apply(currentPage)
: service.page(new Page<>(currentPage, batchSize), queryWrapper).getRecords();
}
}Usage Examples
Example of using execute with a consumer to process each batch:
private void batchProcessor(List<T> records) {
// implement your processing logic
}
LambdaQueryWrapper<T> wrapper = Wrappers.lambdaQuery(T.class);
BulkExecutorParam<T> param = BulkExecutorParam.<T>builder()
.service(this)
.queryWrapper(wrapper)
.build();
BulkExecutorUtil.execute(param, this::batchProcessor);Example of using submit to obtain Future results for I/O‑intensive tasks:
private File batchProcessorFile(List<T> records) {
// implement your processing logic
File file = new File("/your-path");
return file;
}
int parallelism = Runtime.getRuntime().availableProcessors() * 2 + 1;
BulkExecutorParam<T> exeParam = BulkExecutorParam.<T>builder()
.queryWrapper(param.getQueryWrapper())
.batchSize(param.getBatchSize())
.service(this)
.parallelism(parallelism)
.build();
List<Future<File>> futures = BulkExecutorUtil.submit(exeParam, this::batchProcessorFile);
if (CollectionUtil.isEmpty(futures)) {
return downloadFilePath;
}
for (Future<File> future : futures) {
try {
File file = future.get();
// handle file
} catch (Exception e) {
log.error("future get error:{}", e.getMessage(), e);
}
}Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Selected Java Interview Questions
A professional Java tech channel sharing common knowledge to help developers fill gaps. Follow us!
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
