Merge Duplicate Requests Using Hystrix Collapser, BatchCollapser, and ConcurrentHashMultiset

This article explains how merging similar or duplicate requests upstream with Hystrix Collapser, a custom BatchCollapser, and Guava's ConcurrentHashMultiset can dramatically reduce downstream load, improve system throughput, and outlines their implementations, configurations, and ideal use‑cases.

Architect's Guide
Architect's Guide
Architect's Guide
Merge Duplicate Requests Using Hystrix Collapser, BatchCollapser, and ConcurrentHashMultiset

Combining similar or duplicate requests in the upstream system before sending them downstream can greatly reduce downstream load and improve overall system throughput. The article introduces three request‑merging techniques—Hystrix Collapser, a custom BatchCollapser, and Guava's ConcurrentHashMultiset—and compares their implementations and applicable scenarios.

Preface

In typical request‑response models, each request occupies its own thread and memory space, and each I/O operation incurs significant cost. If many requests perform the same I/O, merging them into a single I/O operation can substantially lower the burden on downstream services.

Hystrix Collapser

hystrix

Hystrix, an open‑source library from Netflix, provides circuit‑breaker functionality to keep web servers stable under high concurrency. Request collapsing is a natural extension of Hystrix's protection mechanisms.

When using Hystrix, the hystrix-javanica module allows annotation‑based definitions. Projects typically depend on hystrix-core and hystrix-javanica, and the HystrixAspect bean must be configured in XML to enable AOP support.

<aop:aspectj-autoproxy/>
<bean id="hystrixAspect" class="com.netflix.hystrix.contrib.javanica.aop.aspectj.HystrixCommandAspect"/>

collapser

Hystrix Collapser is the request‑merging component inside Hystrix. It offers two implementation styles: a custom batchMethod and an annotation‑based approach. The annotation style requires only two annotations but lacks clear documentation, making this article a unique reference.

Key points for implementation:

Add @HystrixCollapser on the method to be merged and @HystrixCommand on the batch method.

The single method can accept only one parameter; for multiple parameters, wrap them in a custom class. The batch method must accept java.util.List<SingleParam>.

Single method returns java.util.concurrent.Future<SingleReturn>; batch method returns java.util.List<SingleReturn>, and the result count must match the input count.

Simple example:

public class HystrixCollapserSample {
    @HystrixCollapser(batchMethod = "batch")
    public Future<Boolean> single(String input) {
        return null; // single method will not be executed
    }

    public List<Boolean> batch(List<String> inputs) {
        return inputs.stream().map(it -> Boolean.TRUE).collect(Collectors.toList());
    }
}

Source Code Analysis

The collapser works as follows:

Spring Boot registers an aspect bean that contains the @HystrixCollapser annotation handling.

When a method annotated with @HystrixCollapser is invoked, Spring calls methodsAnnotatedWithHystrixCommand to create a Hystrix proxy.

Hystrix obtains a collapser instance (created per scope if not already present).

The request arguments are stored in a

ConcurrentHashMap<RequestArgumentType, CollapsedRequest>

, which creates an Observable and returns a Future to the business thread.

A timer thread periodically consumes stored requests, builds a batch request, invokes the batch method, maps results back to the original callers, and completes the futures.

Important configuration (placed on the @HystrixCollapser annotation): collapserKey – optional, defaults to method name. batchMethod – name of the batch method, usually defined in the same class. scopeREQUEST (default) creates a collapser per request; GLOBAL shares a collapser across requests. collapserProperties – allows setting Hystrix command properties such as maxRequestsInBatch, timerDelayInMilliseconds, and requestCache.enabled.

BatchCollapser

Design

For scenarios where the result of merged requests is not needed, a simple custom collapser was built. Business threads submit requests to a container; when the container reaches a size threshold or a time interval elapses, the accumulated requests are sent downstream in a single batch.

The design mirrors Hystrix: a container holds requests, a timer thread periodically drains the container, and an optional second trigger fires when the request count exceeds a threshold. Thread‑safety is achieved using LinkedBlockingDeque from java.util.concurrent, which provides a blocking deque with safe concurrent access and a drainTo method.

Implementation

Key implementation details:

The collapser is a singleton; a double‑checked locking factory stores instances in a ConcurrentHashMap keyed by the handler class.

A ScheduledExecutorService replaces a traditional Timer to avoid blocking issues.

The submit method adds events to the deque and triggers immediate cleaning if the threshold is reached.

public class BatchCollapser<E> implements InitializingBean {
    private static final Logger logger = LoggerFactory.getLogger(BatchCollapser.class);
    private static final ScheduledExecutorService SCHEDULE_EXECUTOR = Executors.newScheduledThreadPool(1);
    private volatile LinkedBlockingDeque<E> batchContainer = new LinkedBlockingDeque<>();
    private Handler<List<E>, Boolean> cleaner;
    private long interval;
    private int threshHold;

    public BatchCollapser(Handler<List<E>, Boolean> cleaner, int threshHold, long interval) {
        this.cleaner = cleaner;
        this.threshHold = threshHold;
        this.interval = interval;
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        SCHEDULE_EXECUTOR.scheduleAtFixedRate(() -> {
            try { this.clean(); } catch (Exception e) { logger.error("clean container exception", e); }
        }, 0, interval, TimeUnit.MILLISECONDS);
    }

    public void submit(E event) {
        batchContainer.add(event);
        if (batchContainer.size() >= threshHold) { clean(); }
    }

    private void clean() {
        List<E> transferList = Lists.newArrayListWithExpectedSize(threshHold);
        batchContainer.drainTo(transferList, 100);
        if (CollectionUtils.isEmpty(transferList)) { return; }
        try { cleaner.handle(transferList); }
        catch (Exception e) { logger.error("batch execute error, transferList:{}", transferList, e); }
    }

    public static <E> BatchCollapser<E> getInstance(Handler<List<E>, Boolean> cleaner, int threshHold, long interval) {
        Class jobClass = cleaner.getClass();
        if (instance.get(jobClass) == null) {
            synchronized (BatchCollapser.class) {
                if (instance.get(jobClass) == null) {
                    instance.put(jobClass, new BatchCollapser<>(cleaner, threshHold, interval));
                }
            }
        }
        return instance.get(jobClass);
    }
}

ConcurrentHashMultiset

Design

Guava's ConcurrentHashMultiset stores elements with a count rather than overwriting duplicates. It provides lock‑free thread safety by using a CAS‑based loop. This structure is ideal for high‑frequency counting scenarios, allowing in‑memory aggregation before persisting to a downstream system.

Implementation

Using ConcurrentHashMultiset for request merging looks similar to using a regular container, but the count of each element is considered when building the batch request.

if (ConcurrentHashMultiset.isEmpty()) { return; }
List<Request> transferList = Lists.newArrayList();
ConcurrentHashMultiset.elementSet().forEach(request -> {
    int count = ConcurrentHashMultiset.count(request);
    if (count <= 0) { return; }
    transferList.add(count == 1 ? request : new Request(request.getIncrement() * count));
    ConcurrentHashMultiset.remove(request, count);
});

Summary

Suitable scenarios for each technique: hystrix collapser: when each request's result is required and the extra latency introduced by the timer is acceptable. BatchCollapser: when results are not needed and merging can be triggered by time or count thresholds. ConcurrentHashMultiset: high‑duplicate‑rate counting or statistical aggregation scenarios.

By combining a custom BatchCollapser with ConcurrentHashMultiset, one can enjoy both time‑based merging and efficient duplicate counting.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

Batch Processingbackend optimizationJava concurrencyHystrixrequest collapsing
Architect's Guide
Written by

Architect's Guide

Dedicated to sharing programmer-architect skills—Java backend, system, microservice, and distributed architectures—to help you become a senior architect.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.