Understanding RocketMQ Consumer Types: Push, Simple, and Pull Explained
This article explains the three consumer types introduced in RocketMQ 5.0—PushConsumer, SimpleConsumer, and PullConsumer—detailing their workflows, ConsumerGroup semantics, code examples, configuration options, and best‑practice scenarios for each usage pattern.
Consumer Type Overview
RocketMQ 5.0 emphasizes the concept of client types, especially consumer types. Three consumer types are supported: PushConsumer , SimpleConsumer , and PullConsumer . Each type fits different business scenarios.
Common Workflow
All consumers receive messages by actively polling the server with long‑polling. The client continuously sends requests to ensure timely delivery; when a new message arrives on the server, it is pushed to the client. The server records the processing result based on the client’s response.
ConsumerGroup Concept
PushConsumer and SimpleConsumer share a ConsumerGroup identifier, which groups consumers with identical subscription relationships. The server tracks consumption progress per group, allowing coordinated consumption rather than independent processing.
Consumer Types in Detail
PushConsumer : Fully managed consumer. Users only register a MessageListener; matching messages trigger the listener automatically. Ideal for most business integrations.
SimpleConsumer : Decouples message receipt from progress synchronization. Users pull messages themselves, acknowledge each message, and can control consumption rate.
PullConsumer : Exposes queue management to the user. Consumers pull messages per queue (topic partition) and can choose automatic or manual offset commits.
PushConsumer Usage
PushConsumer pushConsumer = provider.newPushConsumerBuilder()
.setClientConfiguration(clientConfiguration)
.setConsumerGroup(consumerGroup)
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
.setMessageListener(messageView -> {
LOGGER.info("consume message={}", messageView);
return ConsumeResult.SUCCESS;
})
.build();
Thread.sleep(Long.MAX_VALUE);
pushConsumer.close();The listener must return ConsumeResult.SUCCESS or ConsumeResult.FAILURE. On failure, the server retries delivery with exponential back‑off. Each ConsumerGroup can define a maximum consumption count; exceeding it routes the message to a dead‑letter queue.
Best Practices for PushConsumer
Process messages quickly and return a success result; avoid long‑running logic inside the listener. For heavy processing, acknowledge first and handle the payload asynchronously. Configure local cache limits to prevent memory leaks:
pushConsumer.setMaxCachedMessageCount(16);
pushConsumer.setMaxCachedMessageSizeInBytes(128 * 1024 * 1024);SimpleConsumer Usage
SimpleConsumer consumer = provider.newSimpleConsumerBuilder()
.setClientConfiguration(clientConfiguration)
.setConsumerGroup(consumerGroup)
.setAwaitDuration(awaitDuration)
.setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
.build();
int maxMessageNum = 16;
Duration invisibleDuration = Duration.ofSeconds(15);
List<MessageView> messages = consumer.receive(maxMessageNum, invisibleDuration);
LOGGER.info("Received {} message(s)", messages.size());
for (MessageView message : messages) {
MessageId messageId = message.getMessageId();
try {
consumer.ack(message);
LOGGER.info("Message is acknowledged successfully, messageId={}", messageId);
} catch (Throwable t) {
LOGGER.error("Message is failed to be acknowledged, messageId={}", messageId, t);
}
}
consumer.close();SimpleConsumer requires the user to call receive (long‑polling) to pull messages, then explicitly ack or adjust invisibility via changeInvisibleDuration. The invisible window prevents the same message from being redelivered during processing. Consumption counts are still limited by the ConsumerGroup’s maximum.
Best Practices for SimpleConsumer
Batch‑pull messages to improve throughput in I/O‑intensive scenarios. Adjust awaitDuration and maxMessageNum based on processing capacity, and consider increasing concurrency of receive calls for high‑throughput topics.
PullConsumer (LitePullConsumer) Usage
public interface LitePullConsumer {
void registerTopicMessageQueueChangeListener(String topic, TopicMessageQueueChangeListener listener) throws MQClientException;
void assign(Collection<MessageQueue> messageQueues);
List<MessageExt> poll(long timeout);
Long committed(MessageQueue messageQueue) throws MQClientException;
void setAutoCommit(boolean autoCommit);
void commitSync();
}PullConsumer exposes queue assignment to the client. After assigning queues, poll retrieves messages from those queues. If setAutoCommit(true) is enabled, offsets are committed automatically upon receipt; otherwise, commitSync must be called manually.
Best Practices for PullConsumer
PullConsumer gives full control over offset management, making it suitable for stream‑processing workloads that need precise rate and progress control. It is often integrated with dedicated stream‑processing frameworks.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Alibaba Cloud Native
We publish cloud-native tech news, curate in-depth content, host regular events and live streams, and share Alibaba product and user case studies. Join us to explore and share the cloud-native insights you need.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
