Master RocketMQ on CentOS: Deployment, Client Code, and Best Practices
This guide walks through installing RocketMQ on a CentOS 6.5 VM, launching NameServer and Broker, provides sample Java consumer and producer code, and outlines best‑practice tips for idempotent consumption, batch processing, message filtering, performance tuning, and reliable message delivery.
Server Installation and Deployment
I deployed on a CentOS 6.5 virtual machine.
1. Download the program.
2. Extract the tarball: tar -xvf alibaba-rocketmq-3.0.7.tar.gz 3. Start RocketMQ NameServer: nohup sh mqnamesrv & 4. Start a Broker and point it to the NameServer:
nohup sh mqbroker -n "127.0.0.1:9876" &Client Development
Sample quickstart code includes a consumer and a producer.
Consumer example:
/**
* Consumer, subscribe messages
*/
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("QuickStartConsumer");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setInstanceName("QuickStartConsumer");
consumer.subscribe("QuickStart", "*");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started.");
}
}Producer example:
/**
* Producer, send messages
*/
public class Producer {
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("QuickStartProducer");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.setInstanceName("QuickStartProducer");
producer.start();
for (int i = 0; i < 1000; i++) {
try {
Message msg = new Message("QuickStart", // topic
"TagA", // tag
("Hello RocketMQ ,QuickStart" + i).getBytes());
SendResult sendResult = producer.send(msg);
System.out.println(sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
producer.shutdown();
}
}Run the consumer first; it stays active to receive messages. Then run the producer to send messages; the broker forwards them to the consumer.
Consumer Best Practices
1. Ensure idempotent consumption (deduplication) at the business layer, e.g., using MsgId or a unique field such as order ID, checking existence in a DB or KV store before processing.
2. Use batch consumption by setting consumerMessageBatchMaxSize to improve throughput.
3. Skip non‑critical messages when the queue is back‑logged.
Example handling large back‑log:
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
long offset = msgs.get(0).getQueueOffset();
String maxOffset = msgs.get(0).getProperty(MessageConst.PROPERTY_MAX_OFFSET);
long diff = Long.parseLong(maxOffset) - offset;
if (diff > 100000) {
// discard or defer messages
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}4. Optimize per‑message processing by reducing DB round‑trips; combine queries or use caching to cut latency.
Producer Best Practices
1. Use a single topic per application and tags for sub‑types to enable broker‑side filtering.
2. Set a unique key for each message (e.g., order ID) in the keys field to facilitate later lookup.
3. Log send results and keys for traceability.
4. Understand send status codes: SEND_OK, FLUSH_DISK_TIMEOUT, FLUSH_SLAVE_TIMEOUT, SLAVE_NOT_AVAILABLE. Retry only when status is not SEND_OK for ordering‑sensitive scenarios.
5. Implement application‑level retry for non‑loss‑tolerant messages, storing failed messages in a DB and retrying asynchronously.
6. Consider one‑way sends for ultra‑low latency scenarios where reliability can be relaxed.
Pull Consumer Example
/**
* PullConsumer, subscribe messages
*/
public class PullConsumer {
private static final Map<MessageQueue, Long> offseTable = new HashMap<>();
public static void main(String[] args) throws MQClientException {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("PullConsumerGroup");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.start();
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTestMapBody");
for (MessageQueue mq : mqs) {
System.out.println("Consume from the queue: " + mq);
SINGLE_MQ: while (true) {
try {
PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
List<MessageExt> list = pullResult.getMsgFoundList();
if (list != null && list.size() < 100) {
for (MessageExt msg : list) {
System.out.println(SerializableInterface.deserialize(msg.getBody()));
}
}
System.out.println(pullResult.getNextBeginOffset());
putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
switch (pullResult.getPullStatus()) {
case FOUND:
// TODO
break;
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
break SINGLE_MQ;
case OFFSET_ILLEGAL:
break;
default:
break;
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
consumer.shutdown();
}
private static void putMessageQueueOffset(MessageQueue mq, long offset) {
offseTable.put(mq, offset);
}
private static long getMessageQueueOffset(MessageQueue mq) {
Long offset = offseTable.get(mq);
if (offset != null) {
System.out.println(offset);
return offset;
}
return 0;
}
}Note the static offseTable tracks the next offset for each queue, enabling incremental pulls.
Source: http://www.uml.org.cn/zjjs/201504021.asp
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
ITFLY8 Architecture Home
ITFLY8 Architecture Home - focused on architecture knowledge sharing and exchange, covering project management and product design. Includes large-scale distributed website architecture (high performance, high availability, caching, message queues...), design patterns, architecture patterns, big data, project management (SCRUM, PMP, Prince2), product design, and more.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
