Backend Development 19 min read

Understanding RocketMQ Push and Pull Consumption Mechanisms and Long Polling Implementation

This article explains RocketMQ's core components, the differences between push and pull consumption modes, and how the framework implements a pseudo‑push mechanism using long polling, including detailed code analysis of the consumer and broker side processes.

政采云技术
政采云技术
政采云技术
Understanding RocketMQ Push and Pull Consumption Mechanisms and Long Polling Implementation

Introduction

RocketMQ is an open‑source, pure‑Java distributed message‑queue framework from Alibaba, widely used after the Double‑11 shopping festival and offering features such as transactional, ordered, batch, delayed, and message‑trace capabilities.

Core Components

NameServer : a registration center that stores topic routing information, manages brokers, and supports dynamic broker registration and discovery.

Broker : the core role responsible for storing and delivering messages; multiple broker instances can form a broker group.

Topic : a logical name for a collection of messages, which may be spread across different broker groups.

Queue : each topic contains several queues (default 4 per broker group).

Producer and Producer Group : entities that send messages, optionally grouped.

Consumer and Consumer Group : entities that receive messages; groups isolate consumption.

Consumption Modes

RocketMQ supports two consumption modes: Push (the broker pushes messages to the consumer) and Pull (the consumer actively pulls messages).

Push Mode

When a broker receives a message it immediately pushes it to the consumer, providing low latency but putting pressure on the consumer when message volume is high. Drawbacks include high consumer load, inability to control push speed, and difficulty handling consumer failures.

Pull Mode

Consumers request messages from the broker at their own pace, which reduces consumer load but may cause latency if pull intervals are too long or increase server pressure if pulls are too frequent. Pull mode also enables flow‑control based on message count or size.

How RocketMQ Implements Push

RocketMQ actually implements Push as a “pseudo‑push” based on long polling: the consumer continuously issues Pull requests with a maximum suspend time; if no messages are available the broker holds the request (long poll) and returns it when new messages arrive.

Long Polling vs. Short Polling

Short polling sends periodic requests regardless of data changes, wasting resources. Long polling keeps the request open until new data appears, achieving push‑like immediacy.

Key Source Code

Below are essential code excerpts that illustrate the pull‑to‑push workflow.

private void pullMessage(final PullRequest pullRequest) {
  // from pullRequest get consumer group
  final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
  if (consumer != null) {
    // cast to push consumer
    DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
    // actually pull message
    impl.pullMessage(pullRequest);
  } else {
    log.warn("No matched consumer for the PullRequest {}, drop it", pullRequest);
  }
}
/** process pull request */
@Override
public void run() {
  log.info(this.getServiceName() + " service started");
  while (!this.isStopped()) {
    try {
      PullRequest pullRequest = this.pullRequestQueue.take();
      // call pullMessage
      this.pullMessage(pullRequest);
    } catch (InterruptedException ignored) {
    } catch (Exception e) {
      log.error("Pull Message Service Run Method exception", e);
    }
  }
  log.info(this.getServiceName() + " service end");
}
public void pullMessage(final PullRequest pullRequest) {
  // service state check ...
  // flow control check
  long cachedMessageCount = processQueue.getMsgCount().get();
  long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
  if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
    this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
    if ((queueFlowControlTimes++ % 1000) == 0) {
      log.warn("the cached message count exceeds the threshold {}, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
        this.defaultMQPushConsumer.getPullThresholdForQueue(),
        processQueue.getMsgTreeMap().firstKey(),
        processQueue.getMsgTreeMap().lastKey(),
        cachedMessageCount,
        cachedMessageSizeInMiB,
        pullRequest,
        queueFlowControlTimes);
    }
    return;
  }
  if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
    this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
    if ((queueFlowControlTimes++ % 1000) == 0) {
      log.warn("the cached message size exceeds the threshold {} MiB, so do flow control, minOffset={}, maxOffset={}, count={}, size={} MiB, pullRequest={}, flowControlTimes={}",
        this.defaultMQPushConsumer.getPullThresholdSizeForQueue(),
        processQueue.getMsgTreeMap().firstKey(),
        processQueue.getMsgTreeMap().lastKey(),
        cachedMessageCount,
        cachedMessageSizeInMiB,
        pullRequest,
        queueFlowControlTimes);
    }
    return;
  }
  // ... further checks and finally call pullAPIWrapper.pullKernelImpl()
}
protected void suspendPullRequest(final String topic, final int queueId, final PullRequest pullRequest) {
  String key = this.buildKey(topic, queueId);
  ManyPullRequest mpr = this.pullRequestTable.get(key);
  if (null == mpr) {
    mpr = new ManyPullRequest();
    ManyPullRequest prev = this.pullRequestTable.putIfAbsent(key, mpr);
    if (prev != null) {
      mpr = prev;
    }
  }
  mpr.addPullRequest(pullRequest);
}
public void notifyMessageArriving(final String topic, final int queueId, final long maxOffset, final Long tagsCode,
  long msgStoreTime, byte[] filterBitMap, Map
properties) {
  String key = this.buildKey(topic, queueId);
  ManyPullRequest mpr = this.pullRequestTable.get(key);
  if (mpr != null) {
    List
requestList = mpr.cloneListAndClear();
    if (requestList != null) {
      for (PullRequest request : requestList) {
        long newestOffset = maxOffset;
        if (newestOffset <= request.getPullFromThisOffset()) {
          newestOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, queueId);
        }
        if (newestOffset > request.getPullFromThisOffset()) {
          boolean match = request.getMessageFilter().isMatchedByConsumeQueue(tagsCode,
            new ConsumeQueueExt.CqExtUnit(tagsCode, msgStoreTime, filterBitMap));
          if (match && properties != null) {
            match = request.getMessageFilter().isMatchedByCommitLog(null, properties);
          }
          if (match) {
            try {
              this.brokerController.getPullMessageProcessor().executeRequestWhenWakeup(
                request.getClientChannel(), request.getRequestCommand());
            } catch (Throwable e) {
              log.error("execute request when wakeup failed.", e);
            }
            continue;
          }
        }
        // otherwise keep request for next check
      }
    }
  }
}

The PullRequestHoldService on the broker stores suspended pull requests in a concurrent map; its background thread periodically checks whether new messages have arrived and, if so, wakes the waiting requests, causing the consumer’s pull thread to retrieve the messages promptly.

Conclusion

This article explained how the DefaultMQPushConsumer client issues pull requests that are transformed into a pseudo‑push mechanism via long polling, and how the broker’s PullRequestHoldService coordinates the wake‑up of pending requests to achieve near‑real‑time delivery.

BackendJavaMessage Queuerocketmqlong pollingPushPull
政采云技术
Written by

政采云技术

ZCY Technology Team (Zero), based in Hangzhou, is a growth-oriented team passionate about technology and craftsmanship. With around 500 members, we are building comprehensive engineering, project management, and talent development systems. We are committed to innovation and creating a cloud service ecosystem for government and enterprise procurement. We look forward to your joining us.

0 followers
Reader feedback

How this landed with the community

login Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.