RocketMQ Message Backtracking: Verification, Core Process, and Implementation Details
This article explains how to use RocketMQ's message backtracking feature to reset consumer offsets, covering problem background, verification steps, core process analysis, code examples, and comparative evaluation of different approaches in real-world scenarios.
1 Problem Background
Recently, Company A's SMS service encountered a problem, resulting in a batch of messages not being sent. After the service was repaired, the missing data needed to be re‑sent.
Because the SMS service is triggered directly by RocketMQ, Company A faced a dilemma when trying to recover the data, leading to the following dialogue: Leader: "A, there’s so much data, how do you plan to fix it?" A: "It’s a headache, leader. For most business we have a local message table for idempotency, so we just reset the DB table state and re‑process the data, but the SMS service has no local table!" Leader: "Do you have any ideas?" A: "The simplest way is to let the upstream resend, then we just consume it again." Leader: "That would make the problem worse. If the upstream resends, every consumer group would have to consume again, and other teams will start bothering you." A: "Then it’s hard to handle..." Leader: "Actually RocketMQ provides a dedicated message backtracking capability, you can try it." A: "Sounds magical? Let me investigate..."
2 Verification
2.1 Producer Startup
Prepare a new topic and send 10,000 messages.
public static void main(String[] args) throws MQClientException, InterruptedException {
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
for (int i = 0; i < 10000; i++) {
try {
Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n", sendResult);
} catch (Exception e) {
e.printStackTrace();
Thread.sleep(1000);
}
}
producer.shutdown();
}2.2 Consumer Startup
Prepare a new consumer group, consume the data under the topic and record the total count.
public static void main(String[] args) throws InterruptedException, MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "*");
final AtomicInteger count = new AtomicInteger();
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
count.incrementAndGet();
System.out.printf("%s Receive New Messages End: %s %n", Thread.currentThread().getName(), msgs);
System.out.println(count.get());
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}Consumer message record
2.3 Execute Backtracking
Command line execution:
mqadmin.cmd resetOffsetByTime -n 127.0.0.1:9876 -t TopicTest -g please_rename_unique_group_name_4 -s 1722240069000The content of mqadmin.cmd can also be executed by calling MQAdminStartup.main directly.
Manual execution of MQAdminStartup Code execution:
public static void main(String[] args) {
String[] params = new String[]{"resetOffsetByTime","-n","127.0.0.1:9876","-t","TopicTest","-g","please_rename_unique_group_name_4","-s","1722240069000"};
MQAdminStartup.main(params);
}2.4 Result Verification
Client reset success record
Consumer re‑consumption record
2.5 Verification Summary
From the results, the consumer offset was reset to the specified timestamp position. Because the timestamp is earlier than the earliest message creation time, all undeleted messages were re‑consumed.
What exactly does RocketMQ do?
2.5.1 Parameter Analysis
Action identifier: resetOffsetByTime Additional parameters:
-n: nameserver address
-t: target topic name
-g: consumer group name
-s: backtrack timestamp
2.5.2 Thoughts
Message backtracking considerations
3 Analysis
The following source code snippets are from version 4.2.0, with some simplifications.
3.1 Strategy Pattern – Command Line Parsing
Class:
org.apache.rocketmq.tools.command.MQAdminStartup#main /* Parse the action identifier to obtain the corresponding handler class; for this request the actual handler is ResetOffsetByTimeCommand */
SubCommand cmd = findSubCommand(args[0]);
Options options = ServerUtil.buildCommandlineOptions(new Options());
CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
/* Submit the request for execution */
cmd.execute(commandLine, options, rpcHook);3.2 Create Client and Interact with Server
Class:
org.apache.rocketmq.tools.command.offset.ResetOffsetByTimeCommand#execute public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
String group = commandLine.getOptionValue("g").trim(); // consumer group
String topic = commandLine.getOptionValue("t").trim(); // topic
String timeStampStr = commandLine.getOptionValue("s").trim(); // reset timestamp
long timestamp = timeStampStr.equals("now") ? System.currentTimeMillis() : Long.parseLong(timeStampStr);
boolean isC = false; // whether C client
boolean force = true; // force reset (if true, offset larger than current may be set)
if (commandLine.hasOption('f')) {
force = Boolean.valueOf(commandLine.getOptionValue("f").trim());
}
/* Start the client that interacts with nameserver and broker */
defaultMQAdminExt.start();
/* Execute the command */
Map<MessageQueue, Long> offsetTable = defaultMQAdminExt.resetOffsetByTimestamp(topic, group, timestamp, force, isC);
}3.3 Get Broker Addresses for Topic and Submit Reset Request
Class:
org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl#resetOffsetByTimestamp public Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String group, long timestamp, boolean isForce, boolean isC) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
// Get broker addresses from nameserver
TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);
List<BrokerData> brokerDatas = topicRouteData.getBrokerDatas();
Map<MessageQueue, Long> allOffsetTable = new HashMap<>();
if (brokerDatas != null) {
for (BrokerData brokerData : brokerDatas) {
String addr = brokerData.selectBrokerAddr();
if (addr != null) {
Map<MessageQueue, Long> offsetTable = this.mqClientInstance.getMQClientAPIImpl().invokeBrokerToResetOffset(addr, topic, group, timestamp, isForce, timeoutMillis, isC);
if (offsetTable != null) {
allOffsetTable.putAll(offsetTable);
}
}
}
}
return allOffsetTable;
}3.4 Interact with Nameserver to Get Broker Addresses
Class:
org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl#getTopicRouteInfoFromNameServer public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis, boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
requestHeader.setTopic(topic);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
byte[] body = response.getBody();
if (body != null) {
return TopicRouteData.decode(body, TopicRouteData.class);
}
return null;
}3.4.1 Nameserver Receives Request and Returns Routing Info
Class:
org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getRouteInfoByTopic public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
RemotingCommand response = RemotingCommand.createResponseCommand(null);
GetRouteInfoRequestHeader requestHeader = (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
byte[] content = topicRouteData.encode();
response.setBody(content);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
}3.4.2 Core Attributes of RouteInfoManager
// topic routing information, used for load balancing, QueueData records brokerName
private final HashMap<String /* topic */, List<QueueData>> topicQueueTable;
// broker basic info: name, cluster, master/slave address, brokerId=0 means master, >0 means slave
private final HashMap<String /* brokerName */, BrokerData> brokerAddrTable;
// cluster information, contains all broker info of the cluster
private final HashMap<String /* clusterName */, Set<String /* brokerName */>> clusterAddrTable;
// live broker info and corresponding channel
private final HashMap<String /* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
// broker filter server info
private final HashMap<String /* brokerAddr */, List<String> /* Filter Server */> filterServerTable;3.5 Interact with Broker to Execute Reset
Class:
org.apache.rocketmq.client.impl.MQClientAPIImpl#invokeBrokerToResetOffset public Map<MessageQueue, Long> invokeBrokerToResetOffset(final String addr, final String topic, final String group, final long timestamp, final boolean isForce, final long timeoutMillis, boolean isC) throws RemotingException, MQClientException, InterruptedException {
ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader();
requestHeader.setTopic(topic);
requestHeader.setGroup(group);
requestHeader.setTimestamp(timestamp);
requestHeader.setForce(isForce);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.INVOKE_BROKER_TO_RESET_OFFSET, requestHeader);
if (isC) {
request.setLanguage(LanguageCode.CPP);
}
RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
if (response.getBody() != null) {
ResetOffsetBody body = ResetOffsetBody.decode(response.getBody(), ResetOffsetBody.class);
return body.getOffsetTable();
}
return null;
}The broker receives the request and starts processing it.
Class:
org.apache.rocketmq.broker.client.net.Broker2Client#resetOffset public RemotingCommand resetOffset(String topic, String group, long timeStamp, boolean isForce, boolean isC) {
RemotingCommand response = RemotingCommand.createResponseCommand(null);
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
// Record the offset to which each queue of the consumer group should be reset
Map<MessageQueue, Long> offsetTable = new HashMap<>();
for (int i = 0; i < topicConfig.getWriteQueueNums(); i++) {
MessageQueue mq = new MessageQueue();
mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
mq.setTopic(topic);
mq.setQueueId(i);
long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset(group, topic, i);
if (consumerOffset == -1) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(String.format("The consumer group <%s> not exist", group));
return response;
}
long timeStampOffset;
if (timeStamp == -1) {
// No specific timestamp, use the max offset of the queue
timeStampOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i);
} else {
// Find offset by timestamp
timeStampOffset = this.brokerController.getMessageStore().getOffsetInQueueByTime(topic, i, timeStamp);
}
if (timeStampOffset < 0) {
// Message already deleted
log.warn("reset offset is invalid. topic={}, queueId={}, timeStampOffset={}", topic, i, timeStampOffset);
timeStampOffset = 0;
}
if (isForce || timeStampOffset < consumerOffset) {
offsetTable.put(mq, timeStampOffset);
} else {
offsetTable.put(mq, consumerOffset);
}
}
// Build request to reset consumer client offset (request code 220)
ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader();
requestHeader.setTopic(topic);
requestHeader.setGroup(group);
requestHeader.setTimestamp(timeStamp);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.RESET_CONSUMER_CLIENT_OFFSET, requestHeader);
if (isC) {
ResetOffsetBodyForC body = new ResetOffsetBodyForC();
List<MessageQueueForC> offsetList = convertOffsetTable2OffsetList(offsetTable);
body.setOffsetTable(offsetList);
request.setBody(body.encode());
} else {
ResetOffsetBody body = new ResetOffsetBody();
body.setOffsetTable(offsetTable);
request.setBody(body.encode());
}
ConsumerGroupInfo consumerGroupInfo = this.brokerController.getConsumerManager().getConsumerGroupInfo(group);
if (consumerGroupInfo != null && !consumerGroupInfo.getAllChannel().isEmpty()) {
ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable = consumerGroupInfo.getChannelInfoTable();
for (Map.Entry<Channel, ClientChannelInfo> entry : channelInfoTable.entrySet()) {
int version = entry.getValue().getVersion();
if (version >= MQVersion.Version.V3_0_7_SNAPSHOT.ordinal()) {
try {
this.brokerController.getRemotingServer().invokeOneway(entry.getKey(), request, 5000);
log.info("[reset-offset] reset offset success. topic={}, group={}, clientId={}", topic, group, entry.getValue().getClientId());
} catch (Exception e) {
log.error("[reset-offset] reset offset exception. topic={}, group={}", new Object[]{topic, group}, e);
}
} else {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("the client does not support this feature. version=" + MQVersion.getVersionDesc(version));
log.warn("[reset-offset] the client does not support this feature. version={}", RemotingHelper.parseChannelRemoteAddr(entry.getKey()), MQVersion.getVersionDesc(version));
return response;
}
}
} else {
String errorInfo = String.format("Consumer not online, so cannot reset offset, Group: %s Topic: %s Timestamp: %d", requestHeader.getGroup(), requestHeader.getTopic(), requestHeader.getTimestamp());
log.error(errorInfo);
response.setCode(ResponseCode.CONSUMER_NOT_ONLINE);
response.setRemark(errorInfo);
return response;
}
response.setCode(ResponseCode.SUCCESS);
ResetOffsetBody resBody = new ResetOffsetBody();
resBody.setOffsetTable(offsetTable);
response.setBody(resBody.encode());
return response;
}3.6 Consumer Client Receives Request and Processes It
Class:
org.apache.rocketmq.client.impl.factory.MQClientInstance#resetOffset public void resetOffset(String topic, String group, Map<MessageQueue, Long> offsetTable) {
DefaultMQPushConsumerImpl consumer = null;
try {
MQConsumerInner impl = this.consumerTable.get(group);
if (impl != null && impl instanceof DefaultMQPushConsumerImpl) {
consumer = (DefaultMQPushConsumerImpl) impl;
} else {
log.info("[reset-offset] consumer does not exist. group={}", group);
return;
}
consumer.suspend(); // pause consumption
// Clear process queues for the target topic
ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = consumer.getRebalanceImpl().getProcessQueueTable();
for (Map.Entry<MessageQueue, ProcessQueue> entry : processQueueTable.entrySet()) {
MessageQueue mq = entry.getKey();
if (topic.equals(mq.getTopic()) && offsetTable.containsKey(mq)) {
ProcessQueue pq = entry.getValue();
pq.setDropped(true);
pq.clear();
}
}
// Wait a short time to ensure broker‑client sync
try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) {}
Iterator<MessageQueue> iterator = processQueueTable.keySet().iterator();
while (iterator.hasNext()) {
MessageQueue mq = iterator.next();
Long offset = offsetTable.get(mq);
if (topic.equals(mq.getTopic()) && offset != null) {
try {
consumer.updateConsumeOffset(mq, offset);
consumer.getRebalanceImpl().removeUnnecessaryMessageQueue(mq, processQueueTable.get(mq));
iterator.remove();
} catch (Exception e) {
log.warn("reset offset failed. group={}, {}", group, mq, e);
}
}
}
} finally {
if (consumer != null) {
consumer.resume(); // trigger rebalance
}
}
}4 Core Process
Full message backtracking flow
5 Summary
The message backtracking feature is RocketMQ's safety net for business teams; after any unrecoverable issue, they can quickly restore or correct data by backtracking. In streaming or batch‑processing scenarios, re‑running data is common.
RocketMQ can achieve backtracking thanks to its simple offset management mechanism, which can be reset easily with the mqadmin tool. However, because messages are stored on brokers and may be deleted, it is essential to ensure that the target broker cluster is online and that the backtrack timestamp precedes any deletion, otherwise data loss may occur.
6 Extension
With backtracking, we can move offsets forward or backward arbitrarily. If we want to consume a specific interval, for example when consumption is too slow and we move the offset forward, the un‑consumed messages must be processed later, requiring interval‑based consumption.
Note that
org.apache.rocketmq.client.impl.factory.MQClientInstance#resetOffsetshows that DefaultMQPullConsumerImpl does not support offset reset because its progress is fully controlled by the consumer. In such cases, a pull‑mode consumer can be used.
Example code:
public class PullConsumerLocalTest {
private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<>();
private static final Map<MessageQueue, Pair<Long/*min*/, Long/*max*/>> QUEUE_OFFSE_SECTION_TABLE = new HashMap<>();
private static final Long MIN_TIMESTAMP = 1722240069000L; // minimum timestamp
private static final Long MAX_TIMESTAMP = 1722240160000L; // maximum timestamp
public static void main(String[] args) throws MQClientException {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.start();
String topic = "TopicTest";
init(consumer, topic);
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues(topic);
for (MessageQueue mq : mqs) {
System.out.printf("Consume from the queue: %s%n", mq);
SINGLE_MQ:
while (true) {
try {
PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
System.out.printf("%s%n", pullResult);
putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
switch (pullResult.getPullStatus()) {
case FOUND:
// process messages, check max offset, etc.
break;
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
break SINGLE_MQ;
case OFFSET_ILLEGAL:
break;
default:
break;
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
consumer.shutdown();
}
private static void init(DefaultMQPullConsumer consumer, String topic) throws MQClientException {
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues(topic);
for (MessageQueue mq : mqs) {
long minOffset = consumer.searchOffset(mq, MIN_TIMESTAMP);
long maxOffset = consumer.searchOffset(mq, MAX_TIMESTAMP);
QUEUE_OFFSE_SECTION_TABLE.put(mq, new Pair<>(minOffset, maxOffset));
OFFSE_TABLE.put(mq, minOffset);
}
}
private static long getMessageQueueOffset(MessageQueue mq) {
Long offset = OFFSE_TABLE.get(mq);
if (offset != null) return offset;
return 0L;
}
private static void putMessageQueueOffset(MessageQueue mq, long offset) {
OFFSE_TABLE.put(mq, offset);
}
}7 Comparison
Method
Advantages
Disadvantages
Local message table in consumer
Business fully controllable
Additional storage overhead; duplicate consumption requires custom development
Message reset
No business changes needed; supports broadcast/cluster and ordered/unordered messages (requires idempotent state reset)
Not supported in versions prior to 3.0.7
Pull‑mode manual control
Complete consumption progress control
Offset management complexity is higher
About the author Li Zhihao, Java Development Engineer at CaiHuoXia
For more business practice from ZhuanZhuan Company, feel free to follow the public account below:
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Zhuanzhuan Tech
A platform for Zhuanzhuan R&D and industry peers to learn and exchange technology, regularly sharing frontline experience and cutting‑edge topics. We welcome practical discussions and sharing; contact waterystone with any questions.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
