Request Collapsing Techniques: Hystrix Collapser, BatchCollapser, and ConcurrentHashMultiset
This article explains how merging similar or duplicate requests upstream using Hystrix Collapser, a custom BatchCollapser implementation, and Guava's ConcurrentHashMultiset can significantly reduce downstream load, improve throughput, and outlines their configurations, usage patterns, and suitable scenarios.
Combining similar or duplicate requests in the upstream system before sending them downstream can greatly reduce downstream load and improve overall system throughput. This article introduces three request‑collapsing techniques— hystrix collapser , ConcurrentHashMultiset , and a custom BatchCollapser —and compares the scenarios where each is appropriate.
Preface
In typical request‑response models, each request occupies its own thread and memory space, and the I/O cost of each request is high. When many requests perform the same I/O operation, merging those I/O calls into a single operation can dramatically lessen the burden on downstream services.
The author spent considerable time investigating this problem, comparing existing libraries, reviewing hystrix javanica source code, and eventually implementing a simple custom collapser to address a specific business need.
Hystrix Collapser
Overview
Hystrix, an open‑source library from Netflix, provides circuit‑breaker functionality to keep web servers stable under high concurrency. Its collapser feature merges multiple requests into a single batch request, which aligns with the goal of protecting downstream services.
When using Hystrix, the hystrix‑javanica module allows developers to annotate methods, reducing code intrusion. Typical dependencies include hystrix‑core and hystrix‑javanica . Hystrix is enabled via AOP, requiring an explicit bean definition such as:
<aop:aspectj-autoproxy/>
<bean id="hystrixAspect" class="com.netflix.hystrix.contrib.javanica.aop.aspectj.HystrixCommandAspect"/>Collapser Annotation
The Hystrix collapser can be defined either with a custom BatchMethod or with annotations. The annotation approach only needs two lines but lacks official documentation, making this article a unique reference.
Add @HystrixCollapser on the method that should be collapsed and @HystrixCommand on the batch method.
The single method must accept a single argument; for multiple arguments, wrap them in a custom class. The batch method receives a java.util.List<SingleParam> .
The single method returns java.util.concurrent.Future<SingleReturn> , while the batch method returns java.util.List<SingleReturn> , with a one‑to‑one mapping between inputs and outputs.
Simple example:
public class HystrixCollapserSample {
@HystrixCollapser(batchMethod = "batch")
public Future
single(String input) {
return null; // single method will not be executed directly
}
public List
batch(List
inputs) {
return inputs.stream().map(it -> Boolean.TRUE).collect(Collectors.toList());
}
}Implementation Details
Key steps performed by Hystrix:
Register a collapser bean in Spring‑Boot via HystrixCollapser aspect.
When a method annotated with @HystrixCollapser is invoked, Spring calls methodsAnnotatedWithHystrixCommand to create a Hystrix proxy.
Hystrix obtains a collapser instance (creating one if none exists in the current scope).
The request arguments are stored in a ConcurrentHashMap<RequestArgumentType, CollapsedRequest> , returning an Observable wrapped in a Future to the caller.
A timer thread periodically consumes stored requests, builds a batch request, executes the batch method, maps results back to the original order, and completes the pending Future s.
Note: Because the timer must wait before executing the real request, the collapser adds roughly timerInterval/2 ms to each request’s latency.
Configuration
Configuration is applied on the @HystrixCollapser annotation and includes both collapser‑specific and generic Hystrix command properties:
collapserKey (optional, defaults to method name).
batchMethod – name of the batch method.
scope – REQUEST or GLOBAL . REQUEST creates a collapser per request, often resulting in batch size 1.
collapserProperties – allows setting generic Hystrix command properties such as maxRequestsInBatch , timerDelayInMilliseconds , and requestCache.enabled .
Example configuration:
@HystrixCollapser(
batchMethod = "batch",
collapserKey = "single",
scope = com.netflix.hystrix.HystrixCollapser.Scope.GLOBAL,
collapserProperties = {
@HystrixProperty(name = "maxRequestsInBatch", value = "100"),
@HystrixProperty(name = "timerDelayInMilliseconds", value = "1000"),
@HystrixProperty(name = "requestCache.enabled", value = "true")
})BatchCollapser
Design
To avoid the overhead of many Future s, the author implemented a lightweight collapser that simply queues request parameters, merges them when a count threshold or time interval is reached, and sends a single batch request downstream. The business thread returns immediately after enqueuing, without waiting for the result.
The design mirrors Hystrix: a container holds pending requests, and a timer thread periodically drains the container for processing. An additional trigger fires when the container size reaches a configured threshold.
Implementation
Key points of the implementation:
Uses java.util.concurrent.LinkedBlockingDeque for thread‑safe storage and drainTo for bulk extraction.
A singleton factory (double‑checked locking with a ConcurrentMap ) provides one collapser instance per handler class.
A ScheduledExecutorService replaces java.util.Timer to avoid blocking behavior.
Core code (kept unchanged):
public class BatchCollapser
implements InitializingBean {
private static final Logger logger = LoggerFactory.getLogger(BatchCollapser.class);
private static volatile Map
instance = Maps.newConcurrentMap();
private static final ScheduledExecutorService SCHEDULE_EXECUTOR = Executors.newScheduledThreadPool(1);
private volatile LinkedBlockingDeque
batchContainer = new LinkedBlockingDeque<>();
private Handler
, Boolean> cleaner;
private long interval;
private int threshHold;
private BatchCollapser(Handler
, Boolean> cleaner, int threshHold, long interval) {
this.cleaner = cleaner;
this.threshHold = threshHold;
this.interval = interval;
}
@Override
public void afterPropertiesSet() throws Exception {
SCHEDULE_EXECUTOR.scheduleAtFixedRate(() -> {
try { this.clean(); } catch (Exception e) { logger.error("clean container exception", e); }
}, 0, interval, TimeUnit.MILLISECONDS);
}
public void submit(E event) {
batchContainer.add(event);
if (batchContainer.size() >= threshHold) { clean(); }
}
private void clean() {
List
transferList = Lists.newArrayListWithExpectedSize(threshHold);
batchContainer.drainTo(transferList, 100);
if (CollectionUtils.isEmpty(transferList)) { return; }
try { cleaner.handle(transferList); } catch (Exception e) { logger.error("batch execute error, transferList:{}", transferList, e); }
}
public static
BatchCollapser getInstance(Handler
, Boolean> cleaner, int threshHold, long interval) {
Class jobClass = cleaner.getClass();
if (instance.get(jobClass) == null) {
synchronized (BatchCollapser.class) {
if (instance.get(jobClass) == null) {
instance.put(jobClass, new BatchCollapser<>(cleaner, threshHold, interval));
}
}
}
return instance.get(jobClass);
}
}ConcurrentHashMultiset
Design
Guava’s ConcurrentHashMultiset stores each element together with a count, incrementing the count on duplicate insertions. It achieves thread‑safety without explicit locks by repeatedly attempting CAS updates inside a while(true) loop.
This structure is ideal for high‑frequency counting scenarios, such as aggregating scores or statistics before persisting them, thereby reducing downstream pressure.
Implementation Example
if (ConcurrentHashMultiset.isEmpty()) { return; }
List
transferList = Lists.newArrayList();
ConcurrentHashMultiset.elementSet().forEach(request -> {
int count = ConcurrentHashMultiset.count(request);
if (count <= 0) { return; }
transferList.add(count == 1 ? request : new Request(request.getIncrement() * count));
ConcurrentHashMultiset.remove(request, count);
});Summary of Scenarios
hystrix collapser : Use when each request’s result is needed and the extra latency is acceptable.
BatchCollapser : Use when results are not required and you want merging triggered by time or count thresholds.
ConcurrentHashMultiset : Ideal for high‑duplicate‑rate statistical aggregation.
These techniques can also be combined; for example, a BatchCollapser can use a ConcurrentHashMultiset as its internal container to leverage both time‑based merging and duplicate counting.
Final Note (Promotional)
The author encourages readers to like, watch, share, and bookmark the article if it helped them, and mentions a paid knowledge community offering additional Spring, micro‑service, and data‑sharding tutorials.
Code Ape Tech Column
Former Ant Group P8 engineer, pure technologist, sharing full‑stack Java, job interview and career advice through a column. Site: java-family.cn
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.