RocketMQ Message Backtracking: Verification, Core Process, and Implementation Details

This article explains how to use RocketMQ's message backtracking feature to reset consumer offsets, covering problem background, verification steps, core process analysis, code examples, and comparative evaluation of different approaches in real-world scenarios.

Zhuanzhuan Tech
Zhuanzhuan Tech
Zhuanzhuan Tech
RocketMQ Message Backtracking: Verification, Core Process, and Implementation Details

1 Problem Background

Recently, Company A's SMS service encountered a problem, resulting in a batch of messages not being sent. After the service was repaired, the missing data needed to be re‑sent.

Because the SMS service is triggered directly by RocketMQ, Company A faced a dilemma when trying to recover the data, leading to the following dialogue: Leader: "A, there’s so much data, how do you plan to fix it?" A: "It’s a headache, leader. For most business we have a local message table for idempotency, so we just reset the DB table state and re‑process the data, but the SMS service has no local table!" Leader: "Do you have any ideas?" A: "The simplest way is to let the upstream resend, then we just consume it again." Leader: "That would make the problem worse. If the upstream resends, every consumer group would have to consume again, and other teams will start bothering you." A: "Then it’s hard to handle..." Leader: "Actually RocketMQ provides a dedicated message backtracking capability, you can try it." A: "Sounds magical? Let me investigate..."

2 Verification

2.1 Producer Startup

Prepare a new topic and send 10,000 messages.

public static void main(String[] args) throws MQClientException, InterruptedException {
    DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    producer.setNamesrvAddr("127.0.0.1:9876");
    producer.start();
    for (int i = 0; i < 10000; i++) {
        try {
            Message msg = new Message("TopicTest", "TagA", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        } catch (Exception e) {
            e.printStackTrace();
            Thread.sleep(1000);
        }
    }
    producer.shutdown();
}

2.2 Consumer Startup

Prepare a new consumer group, consume the data under the topic and record the total count.

public static void main(String[] args) throws InterruptedException, MQClientException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
    consumer.setNamesrvAddr("127.0.0.1:9876");
    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    consumer.subscribe("TopicTest", "*");
    final AtomicInteger count = new AtomicInteger();
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
            count.incrementAndGet();
            System.out.printf("%s Receive New Messages End: %s %n", Thread.currentThread().getName(), msgs);
            System.out.println(count.get());
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
}

Consumer message record

2.3 Execute Backtracking

Command line execution:

mqadmin.cmd resetOffsetByTime -n 127.0.0.1:9876 -t TopicTest -g please_rename_unique_group_name_4 -s 1722240069000

The content of mqadmin.cmd can also be executed by calling MQAdminStartup.main directly.

Manual execution of MQAdminStartup Code execution:

public static void main(String[] args) {
    String[] params = new String[]{"resetOffsetByTime","-n","127.0.0.1:9876","-t","TopicTest","-g","please_rename_unique_group_name_4","-s","1722240069000"};
    MQAdminStartup.main(params);
}

2.4 Result Verification

Client reset success record

Consumer re‑consumption record

2.5 Verification Summary

From the results, the consumer offset was reset to the specified timestamp position. Because the timestamp is earlier than the earliest message creation time, all undeleted messages were re‑consumed.

What exactly does RocketMQ do?

2.5.1 Parameter Analysis

Action identifier: resetOffsetByTime Additional parameters:

-n: nameserver address

-t: target topic name

-g: consumer group name

-s: backtrack timestamp

2.5.2 Thoughts

Message backtracking considerations

3 Analysis

The following source code snippets are from version 4.2.0, with some simplifications.

3.1 Strategy Pattern – Command Line Parsing

Class:

org.apache.rocketmq.tools.command.MQAdminStartup#main
/* Parse the action identifier to obtain the corresponding handler class; for this request the actual handler is ResetOffsetByTimeCommand */
SubCommand cmd = findSubCommand(args[0]);
Options options = ServerUtil.buildCommandlineOptions(new Options());
CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser());
/* Submit the request for execution */
cmd.execute(commandLine, options, rpcHook);

3.2 Create Client and Interact with Server

Class:

org.apache.rocketmq.tools.command.offset.ResetOffsetByTimeCommand#execute
public void execute(CommandLine commandLine, Options options, RPCHook rpcHook) throws SubCommandException {
    DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
    String group = commandLine.getOptionValue("g").trim(); // consumer group
    String topic = commandLine.getOptionValue("t").trim(); // topic
    String timeStampStr = commandLine.getOptionValue("s").trim(); // reset timestamp
    long timestamp = timeStampStr.equals("now") ? System.currentTimeMillis() : Long.parseLong(timeStampStr);
    boolean isC = false; // whether C client
    boolean force = true; // force reset (if true, offset larger than current may be set)
    if (commandLine.hasOption('f')) {
        force = Boolean.valueOf(commandLine.getOptionValue("f").trim());
    }
    /* Start the client that interacts with nameserver and broker */
    defaultMQAdminExt.start();
    /* Execute the command */
    Map<MessageQueue, Long> offsetTable = defaultMQAdminExt.resetOffsetByTimestamp(topic, group, timestamp, force, isC);
}

3.3 Get Broker Addresses for Topic and Submit Reset Request

Class:

org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl#resetOffsetByTimestamp
public Map<MessageQueue, Long> resetOffsetByTimestamp(String topic, String group, long timestamp, boolean isForce, boolean isC) throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
    // Get broker addresses from nameserver
    TopicRouteData topicRouteData = this.examineTopicRouteInfo(topic);
    List<BrokerData> brokerDatas = topicRouteData.getBrokerDatas();
    Map<MessageQueue, Long> allOffsetTable = new HashMap<>();
    if (brokerDatas != null) {
        for (BrokerData brokerData : brokerDatas) {
            String addr = brokerData.selectBrokerAddr();
            if (addr != null) {
                Map<MessageQueue, Long> offsetTable = this.mqClientInstance.getMQClientAPIImpl().invokeBrokerToResetOffset(addr, topic, group, timestamp, isForce, timeoutMillis, isC);
                if (offsetTable != null) {
                    allOffsetTable.putAll(offsetTable);
                }
            }
        }
    }
    return allOffsetTable;
}

3.4 Interact with Nameserver to Get Broker Addresses

Class:

org.apache.rocketmq.tools.admin.DefaultMQAdminExtImpl#getTopicRouteInfoFromNameServer
public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis, boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
    GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
    requestHeader.setTopic(topic);
    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader);
    RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
    byte[] body = response.getBody();
    if (body != null) {
        return TopicRouteData.decode(body, TopicRouteData.class);
    }
    return null;
}

3.4.1 Nameserver Receives Request and Returns Routing Info

Class:

org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getRouteInfoByTopic
public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
    RemotingCommand response = RemotingCommand.createResponseCommand(null);
    GetRouteInfoRequestHeader requestHeader = (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);
    TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());
    byte[] content = topicRouteData.encode();
    response.setBody(content);
    response.setCode(ResponseCode.SUCCESS);
    response.setRemark(null);
    return response;
}

3.4.2 Core Attributes of RouteInfoManager

// topic routing information, used for load balancing, QueueData records brokerName
private final HashMap<String /* topic */, List<QueueData>> topicQueueTable;
// broker basic info: name, cluster, master/slave address, brokerId=0 means master, >0 means slave
private final HashMap<String /* brokerName */, BrokerData> brokerAddrTable;
// cluster information, contains all broker info of the cluster
private final HashMap<String /* clusterName */, Set<String /* brokerName */>> clusterAddrTable;
// live broker info and corresponding channel
private final HashMap<String /* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
// broker filter server info
private final HashMap<String /* brokerAddr */, List<String> /* Filter Server */> filterServerTable;

3.5 Interact with Broker to Execute Reset

Class:

org.apache.rocketmq.client.impl.MQClientAPIImpl#invokeBrokerToResetOffset
public Map<MessageQueue, Long> invokeBrokerToResetOffset(final String addr, final String topic, final String group, final long timestamp, final boolean isForce, final long timeoutMillis, boolean isC) throws RemotingException, MQClientException, InterruptedException {
    ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader();
    requestHeader.setTopic(topic);
    requestHeader.setGroup(group);
    requestHeader.setTimestamp(timestamp);
    requestHeader.setForce(isForce);
    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.INVOKE_BROKER_TO_RESET_OFFSET, requestHeader);
    if (isC) {
        request.setLanguage(LanguageCode.CPP);
    }
    RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr), request, timeoutMillis);
    if (response.getBody() != null) {
        ResetOffsetBody body = ResetOffsetBody.decode(response.getBody(), ResetOffsetBody.class);
        return body.getOffsetTable();
    }
    return null;
}

The broker receives the request and starts processing it.

Class:

org.apache.rocketmq.broker.client.net.Broker2Client#resetOffset
public RemotingCommand resetOffset(String topic, String group, long timeStamp, boolean isForce, boolean isC) {
    RemotingCommand response = RemotingCommand.createResponseCommand(null);
    TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);
    // Record the offset to which each queue of the consumer group should be reset
    Map<MessageQueue, Long> offsetTable = new HashMap<>();
    for (int i = 0; i < topicConfig.getWriteQueueNums(); i++) {
        MessageQueue mq = new MessageQueue();
        mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
        mq.setTopic(topic);
        mq.setQueueId(i);
        long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset(group, topic, i);
        if (consumerOffset == -1) {
            response.setCode(ResponseCode.SYSTEM_ERROR);
            response.setRemark(String.format("The consumer group <%s> not exist", group));
            return response;
        }
        long timeStampOffset;
        if (timeStamp == -1) {
            // No specific timestamp, use the max offset of the queue
            timeStampOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i);
        } else {
            // Find offset by timestamp
            timeStampOffset = this.brokerController.getMessageStore().getOffsetInQueueByTime(topic, i, timeStamp);
        }
        if (timeStampOffset < 0) {
            // Message already deleted
            log.warn("reset offset is invalid. topic={}, queueId={}, timeStampOffset={}", topic, i, timeStampOffset);
            timeStampOffset = 0;
        }
        if (isForce || timeStampOffset < consumerOffset) {
            offsetTable.put(mq, timeStampOffset);
        } else {
            offsetTable.put(mq, consumerOffset);
        }
    }
    // Build request to reset consumer client offset (request code 220)
    ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader();
    requestHeader.setTopic(topic);
    requestHeader.setGroup(group);
    requestHeader.setTimestamp(timeStamp);
    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.RESET_CONSUMER_CLIENT_OFFSET, requestHeader);
    if (isC) {
        ResetOffsetBodyForC body = new ResetOffsetBodyForC();
        List<MessageQueueForC> offsetList = convertOffsetTable2OffsetList(offsetTable);
        body.setOffsetTable(offsetList);
        request.setBody(body.encode());
    } else {
        ResetOffsetBody body = new ResetOffsetBody();
        body.setOffsetTable(offsetTable);
        request.setBody(body.encode());
    }
    ConsumerGroupInfo consumerGroupInfo = this.brokerController.getConsumerManager().getConsumerGroupInfo(group);
    if (consumerGroupInfo != null && !consumerGroupInfo.getAllChannel().isEmpty()) {
        ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable = consumerGroupInfo.getChannelInfoTable();
        for (Map.Entry<Channel, ClientChannelInfo> entry : channelInfoTable.entrySet()) {
            int version = entry.getValue().getVersion();
            if (version >= MQVersion.Version.V3_0_7_SNAPSHOT.ordinal()) {
                try {
                    this.brokerController.getRemotingServer().invokeOneway(entry.getKey(), request, 5000);
                    log.info("[reset-offset] reset offset success. topic={}, group={}, clientId={}", topic, group, entry.getValue().getClientId());
                } catch (Exception e) {
                    log.error("[reset-offset] reset offset exception. topic={}, group={}", new Object[]{topic, group}, e);
                }
            } else {
                response.setCode(ResponseCode.SYSTEM_ERROR);
                response.setRemark("the client does not support this feature. version=" + MQVersion.getVersionDesc(version));
                log.warn("[reset-offset] the client does not support this feature. version={}", RemotingHelper.parseChannelRemoteAddr(entry.getKey()), MQVersion.getVersionDesc(version));
                return response;
            }
        }
    } else {
        String errorInfo = String.format("Consumer not online, so cannot reset offset, Group: %s Topic: %s Timestamp: %d", requestHeader.getGroup(), requestHeader.getTopic(), requestHeader.getTimestamp());
        log.error(errorInfo);
        response.setCode(ResponseCode.CONSUMER_NOT_ONLINE);
        response.setRemark(errorInfo);
        return response;
    }
    response.setCode(ResponseCode.SUCCESS);
    ResetOffsetBody resBody = new ResetOffsetBody();
    resBody.setOffsetTable(offsetTable);
    response.setBody(resBody.encode());
    return response;
}

3.6 Consumer Client Receives Request and Processes It

Class:

org.apache.rocketmq.client.impl.factory.MQClientInstance#resetOffset
public void resetOffset(String topic, String group, Map<MessageQueue, Long> offsetTable) {
    DefaultMQPushConsumerImpl consumer = null;
    try {
        MQConsumerInner impl = this.consumerTable.get(group);
        if (impl != null && impl instanceof DefaultMQPushConsumerImpl) {
            consumer = (DefaultMQPushConsumerImpl) impl;
        } else {
            log.info("[reset-offset] consumer does not exist. group={}", group);
            return;
        }
        consumer.suspend(); // pause consumption
        // Clear process queues for the target topic
        ConcurrentMap<MessageQueue, ProcessQueue> processQueueTable = consumer.getRebalanceImpl().getProcessQueueTable();
        for (Map.Entry<MessageQueue, ProcessQueue> entry : processQueueTable.entrySet()) {
            MessageQueue mq = entry.getKey();
            if (topic.equals(mq.getTopic()) && offsetTable.containsKey(mq)) {
                ProcessQueue pq = entry.getValue();
                pq.setDropped(true);
                pq.clear();
            }
        }
        // Wait a short time to ensure broker‑client sync
        try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) {}
        Iterator<MessageQueue> iterator = processQueueTable.keySet().iterator();
        while (iterator.hasNext()) {
            MessageQueue mq = iterator.next();
            Long offset = offsetTable.get(mq);
            if (topic.equals(mq.getTopic()) && offset != null) {
                try {
                    consumer.updateConsumeOffset(mq, offset);
                    consumer.getRebalanceImpl().removeUnnecessaryMessageQueue(mq, processQueueTable.get(mq));
                    iterator.remove();
                } catch (Exception e) {
                    log.warn("reset offset failed. group={}, {}", group, mq, e);
                }
            }
        }
    } finally {
        if (consumer != null) {
            consumer.resume(); // trigger rebalance
        }
    }
}

4 Core Process

Full message backtracking flow

5 Summary

The message backtracking feature is RocketMQ's safety net for business teams; after any unrecoverable issue, they can quickly restore or correct data by backtracking. In streaming or batch‑processing scenarios, re‑running data is common.

RocketMQ can achieve backtracking thanks to its simple offset management mechanism, which can be reset easily with the mqadmin tool. However, because messages are stored on brokers and may be deleted, it is essential to ensure that the target broker cluster is online and that the backtrack timestamp precedes any deletion, otherwise data loss may occur.

6 Extension

With backtracking, we can move offsets forward or backward arbitrarily. If we want to consume a specific interval, for example when consumption is too slow and we move the offset forward, the un‑consumed messages must be processed later, requiring interval‑based consumption.

Note that

org.apache.rocketmq.client.impl.factory.MQClientInstance#resetOffset

shows that DefaultMQPullConsumerImpl does not support offset reset because its progress is fully controlled by the consumer. In such cases, a pull‑mode consumer can be used.

Example code:

public class PullConsumerLocalTest {
    private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<>();
    private static final Map<MessageQueue, Pair<Long/*min*/, Long/*max*/>> QUEUE_OFFSE_SECTION_TABLE = new HashMap<>();
    private static final Long MIN_TIMESTAMP = 1722240069000L; // minimum timestamp
    private static final Long MAX_TIMESTAMP = 1722240160000L; // maximum timestamp

    public static void main(String[] args) throws MQClientException {
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.start();
        String topic = "TopicTest";
        init(consumer, topic);
        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues(topic);
        for (MessageQueue mq : mqs) {
            System.out.printf("Consume from the queue: %s%n", mq);
            SINGLE_MQ:
            while (true) {
                try {
                    PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
                    System.out.printf("%s%n", pullResult);
                    putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            // process messages, check max offset, etc.
                            break;
                        case NO_MATCHED_MSG:
                            break;
                        case NO_NEW_MSG:
                            break SINGLE_MQ;
                        case OFFSET_ILLEGAL:
                            break;
                        default:
                            break;
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
        consumer.shutdown();
    }

    private static void init(DefaultMQPullConsumer consumer, String topic) throws MQClientException {
        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues(topic);
        for (MessageQueue mq : mqs) {
            long minOffset = consumer.searchOffset(mq, MIN_TIMESTAMP);
            long maxOffset = consumer.searchOffset(mq, MAX_TIMESTAMP);
            QUEUE_OFFSE_SECTION_TABLE.put(mq, new Pair<>(minOffset, maxOffset));
            OFFSE_TABLE.put(mq, minOffset);
        }
    }

    private static long getMessageQueueOffset(MessageQueue mq) {
        Long offset = OFFSE_TABLE.get(mq);
        if (offset != null) return offset;
        return 0L;
    }

    private static void putMessageQueueOffset(MessageQueue mq, long offset) {
        OFFSE_TABLE.put(mq, offset);
    }
}

7 Comparison

Method

Advantages

Disadvantages

Local message table in consumer

Business fully controllable

Additional storage overhead; duplicate consumption requires custom development

Message reset

No business changes needed; supports broadcast/cluster and ordered/unordered messages (requires idempotent state reset)

Not supported in versions prior to 3.0.7

Pull‑mode manual control

Complete consumption progress control

Offset management complexity is higher

About the author Li Zhihao, Java Development Engineer at CaiHuoXia

For more business practice from ZhuanZhuan Company, feel free to follow the public account below:

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

JavaRocketMQDistributedMessagingMessageBacktrackingOffsetReset
Zhuanzhuan Tech
Written by

Zhuanzhuan Tech

A platform for Zhuanzhuan R&D and industry peers to learn and exchange technology, regularly sharing frontline experience and cutting‑edge topics. We welcome practical discussions and sharing; contact waterystone with any questions.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.