Boost System Throughput with Hystrix Collapser, BatchCollapser, and ConcurrentHashMultiset
This article explains how merging similar requests using Hystrix Collapser, a custom BatchCollapser, and Guava's ConcurrentHashMultiset can dramatically reduce downstream load, improve overall throughput, and provides detailed implementation, configuration, and scenario guidance for each technique.
Introduction
In typical request‑response models each request occupies a separate thread and memory space, causing high I/O cost when many requests perform the same operation. Merging similar requests upstream before sending them downstream can greatly reduce the load on downstream services and increase overall system throughput.
Hystrix Collapser
Hystrix, a Netflix open‑source library, offers a collapser that merges requests. It provides two ways to define the batch method: a custom BatchMethod or annotation‑based configuration. The annotation approach requires only two annotations but lacks clear documentation, which this article summarizes.
Key Points
Add @HystrixCollapser on the method to be merged and @HystrixCommand on the batch method.
The single method can accept only one parameter; for multiple parameters wrap them in a custom class.
The single method returns Future<T> and the batch method returns List<T>, with the result count matching the input count.
Simple Example
public class HystrixCollapserSample {
@HystrixCollapser(batchMethod = "batch")
public Future<Boolean> single(String input) {
// single method will not be executed
return null;
}
public List<Boolean> batch(List<String> inputs) {
return inputs.stream().map(it -> Boolean.TRUE).collect(Collectors.toList());
}
}Configuration
The collapser can be configured via the @HystrixCollapser annotation. Specific properties include: collapserKey (optional, defaults to method name) batchMethod – name of the batch method scope – REQUEST (default) or GLOBAL; REQUEST creates a collapser per request, which may limit merging benefits. collapserProperties – common Hystrix command properties such as maxRequestsInBatch, timerDelayInMilliseconds, and requestCache.enabled.
@HystrixCollapser(
batchMethod = "batch",
collapserKey = "single",
scope = com.netflix.hystrix.HystrixCollapser.Scope.GLOBAL,
collapserProperties = {
@HystrixProperty(name = "maxRequestsInBatch", value = "100"),
@HystrixProperty(name = "timerDelayInMilliseconds", value = "1000"),
@HystrixProperty(name = "requestCache.enabled", value = "true")
})
public class MyService {
// ...
}BatchCollapser
A custom BatchCollapser was built to avoid the overhead of Futures and to trigger merging based on time or request count. It stores incoming requests in a thread‑safe container and uses a scheduled executor to periodically drain and process the batch.
Implementation
public class BatchCollapser<E> implements InitializingBean {
private static final Logger logger = LoggerFactory.getLogger(BatchCollapser.class);
private static final ScheduledExecutorService SCHEDULE_EXECUTOR = Executors.newScheduledThreadPool(1);
private volatile LinkedBlockingDeque<E> batchContainer = new LinkedBlockingDeque<>();
private Handler<List<E>, Boolean> cleaner;
private long interval;
private int threshHold;
public BatchCollapser(Handler<List<E>, Boolean> cleaner, int threshHold, long interval) {
this.cleaner = cleaner;
this.threshHold = threshHold;
this.interval = interval;
}
@Override
public void afterPropertiesSet() throws Exception {
SCHEDULE_EXECUTOR.scheduleAtFixedRate(() -> {
try {
clean();
} catch (Exception e) {
logger.error("clean container exception", e);
}
}, 0, interval, TimeUnit.MILLISECONDS);
}
public void submit(E event) {
batchContainer.add(event);
if (batchContainer.size() >= threshHold) {
clean();
}
}
private void clean() {
List<E> transferList = Lists.newArrayListWithExpectedSize(threshHold);
batchContainer.drainTo(transferList, 100);
if (CollectionUtils.isEmpty(transferList)) {
return;
}
try {
cleaner.handle(transferList);
} catch (Exception e) {
logger.error("batch execute error, transferList:{}", transferList, e);
}
}
}ConcurrentHashMultiset
Guava provides ConcurrentHashMultiset, a thread‑safe multiset that counts occurrences of each element instead of overwriting duplicates. It is ideal for high‑frequency counting scenarios, such as statistics or rate limiting, because it avoids locking by using a CAS loop.
Implementation Example
if (ConcurrentHashMultiset.isEmpty()) {
return;
}
List<Request> transferList = Lists.newArrayList();
ConcurrentHashMultiset.elementSet().forEach(request -> {
int count = ConcurrentHashMultiset.count(request);
if (count <= 0) {
return;
}
transferList.add(count == 1 ? request : new Request(request.getIncrement() * count));
ConcurrentHashMultiset.remove(request, count);
});Conclusion
Each technique fits different scenarios: hystrix collapser: when the result of each request is needed and the extra latency is acceptable. BatchCollapser: when results are not required and merging should be triggered by time or count thresholds. ConcurrentHashMultiset: when requests have a high duplication rate and simple counting or aggregation is sufficient.
By combining these approaches, developers can significantly reduce downstream load, improve system stability, and achieve higher throughput.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Top Architect
Top Architect focuses on sharing practical architecture knowledge, covering enterprise, system, website, large‑scale distributed, and high‑availability architectures, plus architecture adjustments using internet technologies. We welcome idea‑driven, sharing‑oriented architects to exchange and learn together.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
