RocketMQ Producer and Consumer Best Practices and Configuration Guide
This article provides a comprehensive guide to using RocketMQ, covering producer and consumer message handling, tag and key usage, logging, failure retry strategies, oneway sending, broker roles, name server addressing, client configuration, JVM and Linux kernel tuning, and practical code examples.
1 Producer
1.1 Sending Message Considerations
1 Tags Usage
An application should use a single Topic, and use tags to identify sub‑types of messages. Tags are set by the producer (e.g., message.setTags("TagA") ) and can be used by consumers for broker‑side filtering.
2 Keys Usage
Each message should have a unique business identifier set in the keys field to facilitate later troubleshooting. The broker creates a hash index on this field, so keys should be as unique as possible to avoid hash collisions.
// Order Id
String orderId = "20034568923546";
message.setKeys(orderId);3 Logging
When a message is sent, log the SendResult and the key. A successful send call (no exception) indicates the message was accepted, but reliability depends on the broker’s sync settings (e.g., SYNC_MASTER or SYNC_FLUSH).
SEND_OK – Message sent successfully (may still be lost if not persisted).
FLUSH_DISK_TIMEOUT – Message reached the broker’s memory queue but disk flush timed out.
FLUSH_SLAVE_TIMEOUT – Message reached the master but replication to a slave timed out.
SLAVE_NOT_AVAILABLE – No slave is configured for synchronous replication.
1.2 Message Send Failure Handling
The producer’s send method retries internally:
Up to 2 retries for synchronous sends (0 for async).
If a send fails, the request is rotated to the next broker; total time does not exceed sendMsgTimeout (default 10 s).
No retry if a timeout exception occurs while contacting the broker.
For higher reliability, applications should add their own retry logic, such as persisting the message to a database and retrying later.
1.3 Using Oneway Send
In scenarios where latency must be minimal and reliability is less critical (e.g., log collection), the oneway mode sends the request without waiting for a response, reducing the operation to a single OS socket write (microseconds).
2 Consumer
2.1 Idempotent Consumption
RocketMQ cannot guarantee exactly‑once delivery, so applications must handle deduplication, typically using a relational database keyed by msgId or another unique business field.
2.2 Slow Consumption Handling
1 Increase Parallelism
Most consumption is I/O‑bound; increase parallelism by adding more consumer instances within the same consumer group or by raising consumeThreadMin and consumeThreadMax .
2 Batch Consumption
Enable batch consumption by setting consumeMessageBatchMaxSize to a value greater than 1, reducing per‑message overhead.
3 Skip Unimportant Messages
If message backlog exceeds a threshold (e.g., 100 000), optionally discard low‑priority messages to catch up.
public ConsumeConcurrentlyStatus consumeMessage(
List
msgs,
ConsumeConcurrentlyContext context) {
long offset = msgs.get(0).getQueueOffset();
String maxOffset = msgs.get(0).getProperty(Message.PROPERTY_MAX_OFFSET);
long diff = Long.parseLong(maxOffset) - offset;
if (diff > 100000) {
// TODO: special handling for backlog
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
// TODO: normal consumption logic
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}4 Optimize Per‑Message DB Access
Reduce the number of DB round‑trips per message (e.g., from four to two) to cut latency from 25 ms to 15 ms.
2.3 Logging in Consumption
For low‑volume consumption, log the received messages and processing time.
public ConsumeConcurrentlyStatus consumeMessage(
List
msgs,
ConsumeConcurrentlyContext context) {
log.info("RECEIVE_MSG_BEGIN: " + msgs.toString());
// TODO: normal consumption process
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}2.4 Other Consumer Recommendations
1 Consumer Groups and Subscriptions
Different consumer groups can independently consume the same topic; ensure all consumers in a group have identical subscription information.
2 Ordered Messages
When order matters, lock each message queue; if an exception occurs, return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT instead of throwing.
3 Concurrent Consumption
Use concurrent consumption for better performance; return ConsumeConcurrentlyStatus.RECONSUME_LATER to indicate temporary failure.
4 Blocking
Avoid blocking listeners as they can exhaust thread pools.
5 Thread Count Settings
Adjust setConsumeThreadMin and setConsumeThreadMax to control the internal thread pool.
6 Consumption Offsets
When a new consumer group starts, choose a consumption strategy: CONSUME_FROM_LAST_OFFSET , CONSUME_FROM_FIRST_OFFSET , or CONSUME_FROM_TIMESTAMP .
3 Broker
3.1 Broker Roles
Broker roles include ASYNC_MASTER, SYNC_MASTER, and SLAVE. Choose SYNC_MASTER + SLAVE for high reliability, or ASYNC_MASTER + SLAVE for lower latency.
3.2 FlushDiskType
SYNC_FLUSH provides durability at the cost of performance; ASYNC_FLUSH is faster but less reliable.
3.3 Broker Configuration
(Image omitted – shows broker configuration parameters.)
4 NameServer
NameServers provide simple routing information: brokers register routes, and clients retrieve the latest routes from the NameServer.
5 Client Configuration
5.1 Client Addressing
Clients locate NameServers, then brokers. Addressing can be set via code, JVM arguments, environment variables, or a static HTTP server.
producer.setNamesrvAddr("192.168.0.1:9876;192.168.0.2:9876");
consumer.setNamesrvAddr("192.168.0.1:9876;192.168.0.2:9876"); -Drocketmq.namesrv.addr=192.168.0.1:9876;192.168.0.2:9876 export NAMESRV_ADDR=192.168.0.1:9876;192.168.0.2:9876Clients poll the static HTTP URL http://jmenv.tbsite.net:8080/rocketmq/nsaddr every two minutes.
192.168.0.1:9876;192.168.0.2:98765.2 Common Client Settings
All producer and consumer classes inherit from ClientConfig , which provides getter/setter methods for configuration parameters.
1 Common Configuration
(Image omitted – shows common client config fields.)
2 Producer Configuration
(Image omitted – shows producer‑specific settings.)
3 PushConsumer Configuration
(Image omitted – shows push consumer settings.)
4 PullConsumer Configuration
(Image omitted – shows pull consumer settings.)
5 Message Data Structure
(Image omitted – illustrates the message model.)
6 System Configuration
6.1 JVM Options
Recommended JDK 1.8 with matching -Xms and -Xmx values for stable heap size.
-server -Xms8g -Xmx8g -Xmn4gAdditional options for pre‑touching the heap, disabling biased locking, and using G1 GC:
-XX:+AlwaysPreTouch
-XX:-UseBiasedLocking
-XX:+UseG1GC -XX:G1HeapRegionSize=16m
-XX:G1ReservePercent=25
-XX:InitiatingHeapOccupancyPercent=30GC log rotation settings:
-XX:+UseGCLogFileRotation
-XX:NumberOfGCLogFiles=5
-XX:GCLogFileSize=30mRedirect GC logs to a RAM‑disk to reduce latency:
-Xloggc:/dev/shm/mq_gc_%p.log1236.2 Linux Kernel Parameters
vm.extra_free_kbytes – Extra free memory for background reclamation.
vm.min_free_kbytes – Minimum free memory; setting too low can cause deadlocks under load.
vm.max_map_count – Increase for RocketMQ’s mmap usage.
vm.swappiness – Set to 10 to avoid swapping.
File descriptor limits – Recommend 655350.
Disk scheduler – Use an I/O deadline scheduler for predictable latency.
Architecture Digest
Focusing on Java backend development, covering application architecture from top-tier internet companies (high availability, high performance, high stability), big data, machine learning, Java architecture, and other popular fields.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.