Backend Development 10 min read

Understanding RocketMQ Consumer Load Balancing and Message Queue Allocation Strategies

This article explains how increasing consumers affects RocketMQ message backlog, details conditions where adding consumers helps or not, describes ProcessQueue flow control, outlines six load‑balancing strategies—including average, round‑robin, custom, machine‑room, nearby, and consistent‑hash—and provides corresponding Java code examples.

Code Ape Tech Column
Code Ape Tech Column
Code Ape Tech Column
Understanding RocketMQ Consumer Load Balancing and Message Queue Allocation Strategies

The interview starts with a question about whether adding more consumers can alleviate RocketMQ message backlog. The answer explains that if the number of consumers is less than the number of MessageQueues, adding consumers can increase consumption speed; otherwise it has no effect.

Consumer pull speed also depends on local consumption speed. If local processing is slow, the consumer will delay pulling new messages. Three ProcessQueue thresholds can trigger this delay: message count > 1000 (default), message size > 100M (default), and offset difference > 2000 for non‑ordered consumption. For ordered consumption, a lock failure causes a fixed 3‑second delay.

Slow consumption may be caused by complex business logic, slow database queries, slow cache (e.g., Redis), or slow external service calls. For slow external calls, asynchronous retries or fallback to default values can be used.

When adding consumers, one must consider the impact on downstream services: a sudden increase in request volume may exceed the external system's throughput, and local database or cache pressure may also increase.

RocketMQ assigns MessageQueues to consumers using a periodic load‑balancing timer (default every 20 seconds). Six allocation strategies are provided:

Average Allocation : Sort consumers, compute average queues per consumer, distribute remainder using modulo.

Round‑Robin Allocation : Iterate consumers and assign queues in a circular fashion.

Custom Allocation : Users specify which queues a consumer should handle via AllocateMessageQueueByConfig .

Machine‑Room Allocation : Consumers only consume queues from specified data‑center rooms, then apply average allocation.

Machine‑Room Nearby Allocation : Similar to machine‑room allocation but also assigns queues from rooms without consumers to the whole cluster.

Consistent‑Hash Allocation : Consumers are placed on a hash ring; each queue is hashed and bound to the nearest consumer node.

Example Java code for the average allocation strategy:

int index = cidAll.indexOf(currentCID);
int mod = mqAll.size() % cidAll.size();
int averageSize = mqAll.size() <= cidAll.size() ? 1 : (mod > 0 && index < mod ? mqAll.size() / cidAll.size() + 1 : mqAll.size() / cidAll.size());
int startIndex = (mod > 0 && index < mod) ? index * averageSize : index * averageSize + mod;
int range = Math.min(averageSize, mqAll.size() - startIndex);
for (int i = 0; i < range; i++) {
    result.add(mqAll.get((startIndex + i) % mqAll.size()));
}

Round‑robin allocation example:

int index = cidAll.indexOf(currentCID);
for (int i = index; i < mqAll.size(); i++) {
    if (i % cidAll.size() == index) {
        result.add(mqAll.get(i));
    }
}

Custom allocation example:

AllocateMessageQueueByConfig allocateMessageQueueByConfig = new AllocateMessageQueueByConfig();
allocateMessageQueueByConfig.setMessageQueueList(Arrays.asList(new MessageQueue("messageQueue1", "broker1", 0)));
consumer.setAllocateMessageQueueStrategy(allocateMessageQueueByConfig);
consumer.start();

Machine‑room allocation example:

AllocateMessageQueueByMachineRoom allocateMessageQueueByMachineRoom = new AllocateMessageQueueByMachineRoom();
allocateMessageQueueByMachineRoom.setConsumeridcs(new HashSet<>(Arrays.asList("room1", "room2")));
consumer.setAllocateMessageQueueStrategy(allocateMessageQueueByMachineRoom);
consumer.start();

Consistent‑hash allocation example:

Collection
cidNodes = new ArrayList<>();
for (String cid : cidAll) {
    cidNodes.add(new ClientNode(cid));
}
ConsistentHashRouter
router = customHashFunction != null ?
    new ConsistentHashRouter<>(cidNodes, virtualNodeCnt, customHashFunction) :
    new ConsistentHashRouter<>(cidNodes, virtualNodeCnt);
List
results = new ArrayList<>();
for (MessageQueue mq : mqAll) {
    ClientNode clientNode = router.routeNode(mq.toString());
    if (clientNode != null && currentCID.equals(clientNode.getKey())) {
        results.add(mq);
    }
}

Finally, the interview concludes with a congratulatory note.

backendload balancingMessage QueuerocketmqConsumerallocation-strategy
Code Ape Tech Column
Written by

Code Ape Tech Column

Former Ant Group P8 engineer, pure technologist, sharing full‑stack Java, job interview and career advice through a column. Site: java-family.cn

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.