Mastering Request Merging: Hystrix Collapser, BatchCollapser & ConcurrentHashMultiset
By merging similar or duplicate requests upstream before sending them downstream, you can dramatically reduce downstream load and boost overall throughput; this article compares Hystrix Collapser, a custom BatchCollapser, and Guava's ConcurrentHashMultiset, detailing their implementations, configurations, and ideal use cases.
Preface
In many services the request‑response model allocates a separate thread and memory space for each request, which makes each I/O operation costly. When a large number of requests perform the same I/O, merging them into a single operation can greatly reduce the burden on downstream servers.
The author spent considerable time exploring this problem, comparing existing libraries, translating Hystrix Javanica code, and implementing a simple custom collapser to address a specific business need.
Hystrix Collapser
Overview
Hystrix, an open‑source library from Netflix, provides circuit breaking and request merging. The collapser component can batch similar requests, reducing downstream pressure.
To use Hystrix Javanica you typically need the hystrix-core and hystrix-javanica dependencies and configure the HystrixAspect bean via XML:
<aop:aspectj-autoproxy/>
<bean id="hystrixAspect" class="com.netflix.hystrix.contrib.javanica.aop.aspectj.HystrixCommandAspect" />Collapser Annotation
The @HystrixCollapser annotation defines a batch method and can be used together with @HystrixCommand. Important points:
Annotate the method to be collapsed with @HystrixCollapser and the batch method with @HystrixCommand.
The single method accepts a single argument; for multiple arguments you must wrap them in a custom class. The batch method must accept java.util.List<SingleParam>.
The single method returns java.util.concurrent.Future<SingleReturn>, while the batch method returns java.util.List<SingleReturn>, and the result count must match the input count.
A minimal example:
public class HystrixCollapserSample {
@HystrixCollapser(batchMethod = "batch")
public Future<Boolean> single(String input) {
return null; // single method won't be executed
}
public List<Boolean> batch(List<String> inputs) {
return inputs.stream().map(it -> Boolean.TRUE).collect(Collectors.toList());
}
}Implementation Details
When a method annotated with @HystrixCollapser is invoked, Hystrix creates a collapser instance (per request scope or globally) and stores request arguments in a ConcurrentHashMap. A timer thread periodically consumes stored requests, builds a batch request, executes it, and maps the results back to the original futures.
Note: the timer adds roughly timerInterval/2 ms to each request’s latency.
Configuration
Configuration is provided via the @HystrixCollapser annotation:
@HystrixCollapser(
batchMethod = "batch",
collapserKey = "single",
scope = com.netflix.hystrix.HystrixCollapser.Scope.GLOBAL,
collapserProperties = {
@HystrixProperty(name = "maxRequestsInBatch", value = "100"),
@HystrixProperty(name = "timerDelayInMilliseconds", value = "1000"),
@HystrixProperty(name = "requestCache.enabled", value = "true")
})Key properties: maxRequestsInBatch: maximum number of requests per batch. timerDelayInMilliseconds: interval for the timer thread to trigger merging. requestCache.enabled: whether to cache submitted requests.
BatchCollapser
Design
The custom BatchCollapser focuses on time‑based and count‑based triggering without caring about individual request results. Requests are placed into a container; when the container reaches a threshold or a timer expires, all accumulated requests are sent downstream in a single batch.
Thread safety is achieved using LinkedBlockingDeque, which provides a blocking, ordered collection that can be safely drained without explicit locks.
Implementation
public class BatchCollapser<E> implements InitializingBean {
private static final Logger logger = LoggerFactory.getLogger(BatchCollapser.class);
private static volatile Map<Class, BatchCollapser> instance = Maps.newConcurrentMap();
private static final ScheduledExecutorService SCHEDULE_EXECUTOR = Executors.newScheduledThreadPool(1);
private volatile LinkedBlockingDeque<E> batchContainer = new LinkedBlockingDeque<>();
private Handler<List<E>, Boolean> cleaner;
private long interval;
private int threshHold;
private BatchCollapser(Handler<List<E>, Boolean> cleaner, int threshHold, long interval) {
this.cleaner = cleaner;
this.threshHold = threshHold;
this.interval = interval;
}
@Override
public void afterPropertiesSet() throws Exception {
SCHEDULE_EXECUTOR.scheduleAtFixedRate(() -> {
try { this.clean(); } catch (Exception e) { logger.error("clean container exception", e); }
}, 0, interval, TimeUnit.MILLISECONDS);
}
public void submit(E event) {
batchContainer.add(event);
if (batchContainer.size() >= threshHold) { clean(); }
}
private void clean() {
List<E> transferList = Lists.newArrayListWithExpectedSize(threshHold);
batchContainer.drainTo(transferList, 100);
if (CollectionUtils.isEmpty(transferList)) { return; }
try { cleaner.handle(transferList); } catch (Exception e) { logger.error("batch execute error, transferList:{}", transferList, e); }
}
public static <E> BatchCollapser<E> getInstance(Handler<List<E>, Boolean> cleaner, int threshHold, long interval) {
Class jobClass = cleaner.getClass();
if (instance.get(jobClass) == null) {
synchronized (BatchCollapser.class) {
if (instance.get(jobClass) == null) {
instance.put(jobClass, new BatchCollapser<>(cleaner, threshHold, interval));
}
}
}
return instance.get(jobClass);
}
}Key points:
The collapser is implemented as a singleton to satisfy global usage.
A Handler implementation groups requests by its class, allowing multiple collapser types.
Instead of java.util.Timer (which blocks), a ScheduledExecutorService is used to schedule the timer thread.
ConcurrentHashMultiset
Design
For scenarios where request results are not needed but high‑frequency counting is required (e.g., statistics), Guava’s ConcurrentHashMultiset stores each element with a count, supporting lock‑free concurrent increments and decrements.
Implementation
Typical usage involves iterating over the element set, retrieving the count, and performing batch operations based on that count:
if (ConcurrentHashMultiset.isEmpty()) { return; }
List<Request> transferList = Lists.newArrayList();
ConcurrentHashMultiset.elementSet().forEach(request -> {
int count = ConcurrentHashMultiset.count(request);
if (count <= 0) { return; }
transferList.add(count == 1 ? request : new Request(request.getIncrement() * count));
ConcurrentHashMultiset.remove(request, count);
});Conclusion
Choosing the right merging technique depends on the requirements: Hystrix Collapser: needed when each request’s result must be returned and extra latency is acceptable. BatchCollapser: suitable when results are irrelevant and merging can be triggered by time or count thresholds. ConcurrentHashMultiset: ideal for high‑frequency statistical aggregation where duplicate requests are common.
In practice, you can combine BatchCollapser with ConcurrentHashMultiset as the container, gaining the benefits of both time‑based merging and efficient counting.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Su San Talks Tech
Su San, former staff at several leading tech companies, is a top creator on Juejin and a premium creator on CSDN, and runs the free coding practice site www.susan.net.cn.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
