How to Reset RocketMQ Consumer Offsets Using Message Backtracking – A Step‑by‑Step Guide

When a SMS service fails and messages need to be re‑sent, this article shows how to use RocketMQ's message backtracking feature to reset consumer offsets, verify the reset, and explore the internal workflow and alternative pull‑consumer strategies with concrete code examples and detailed analysis.

Architect
Architect
Architect
How to Reset RocketMQ Consumer Offsets Using Message Backtracking – A Step‑by‑Step Guide

Problem Background

A SMS service built on RocketMQ lost a batch of messages during an outage. Because the service does not maintain a local message table, the missing messages cannot be simply re‑sent by resetting a database flag. RocketMQ provides a built‑in message backtrack capability that can reset consumer offsets to a specific timestamp, allowing selective replay without changing business code.

Verification

Producer Startup

Create a new topic and send 10,000 messages.

public static void main(String[] args) throws MQClientException, InterruptedException {
    DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
    producer.setNamesrvAddr("127.0.0.1:9876");
    producer.start();
    for (int i = 0; i < 10000; i++) {
        try {
            Message msg = new Message("TopicTest", "TagA",
                ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        } catch (Exception e) {
            e.printStackTrace();
            Thread.sleep(1000);
        }
    }
    producer.shutdown();
}

Consumer Startup

Start a new consumer group that consumes the topic from the first offset and records the total number of messages.

public static void main(String[] args) throws InterruptedException, MQClientException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
    consumer.setNamesrvAddr("127.0.0.1:9876");
    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    consumer.subscribe("TopicTest", "*");
    final AtomicInteger count = new AtomicInteger();
    consumer.registerMessageListener((msgs, context) -> {
        System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
        count.incrementAndGet();
        System.out.println(count.get());
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    });
    consumer.start();
}

Execute Backtrack

Reset the consumer offset to a specific timestamp (e.g., 1722240069000) using the admin tool:

mqadmin.cmd resetOffsetByTime -n 127.0.0.1:9876 -t TopicTest -g please_rename_unique_group_name_4 -s 1722240069000

The same request can be issued programmatically by invoking MQAdminStartup.main with the same arguments.

Result Verification

The client reports a successful reset and the consumer re‑processes all retained messages because the chosen timestamp precedes the earliest stored message.

Verification Summary

Parameters used:

Action flag:

resetOffsetByTime
-n

: NameServer address -t: Topic name -g: Consumer group name -s: Backtrack timestamp

The reset works only if the broker still retains messages for the target timestamp.

Backtrack Mechanism Details

Command‑line Parsing (Strategy Pattern)

The entry point org.apache.rocketmq.tools.command.MQAdminStartup#main selects the sub‑command based on the first argument and builds the command‑line options.

SubCommand cmd = findSubCommand(args[0]);
Options options = ServerUtil.buildCommandlineOptions(new Options());
CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs,
    cmd.buildCommandlineOptions(options), new PosixParser());
cmd.execute(commandLine, options, rpcHook);

Create Admin Client and Execute Reset

ResetOffsetByTimeCommand#execute

creates a DefaultMQAdminExt instance, extracts the group, topic and timestamp, decides whether to force the reset, and starts the admin client.

DefaultMQAdminExt admin = new DefaultMQAdminExt(rpcHook);
String group = commandLine.getOptionValue("g").trim();
String topic = commandLine.getOptionValue("t").trim();
String timeStampStr = commandLine.getOptionValue("s").trim();
long timestamp = timeStampStr.equals("now") ? System.currentTimeMillis() : Long.parseLong(timeStampStr);
boolean force = true; // can be overridden with -f
admin.start();
Map<MessageQueue, Long> offsetTable = admin.resetOffsetByTimestamp(topic, group, timestamp, force, false);

Topic Route Lookup

The admin client queries the NameServer for the route information of the topic, then iterates over each broker to submit the reset request.

TopicRouteData route = this.examineTopicRouteInfo(topic);
for (BrokerData brokerData : route.getBrokerDatas()) {
    String addr = brokerData.selectBrokerAddr();
    if (addr != null) {
        Map<MessageQueue, Long> table = this.mqClientInstance.getMQClientAPIImpl()
            .invokeBrokerToResetOffset(addr, topic, group, timestamp, isForce, timeoutMillis, false);
        if (table != null) {
            allOffsetTable.putAll(table);
        }
    }
}

NameServer Interaction

The admin client sends a GET_ROUTEINTO_BY_TOPIC request; the NameServer returns a TopicRouteData containing broker lists.

GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
requestHeader.setTopic(topic);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
if (response.getBody() != null) {
    return TopicRouteData.decode(response.getBody(), TopicRouteData.class);
}

Broker‑Side Offset Reset

The broker looks up the offset that corresponds to the given timestamp for each MessageQueue. If force is true or the timestamp offset is smaller than the current consumer offset, the broker builds a new offset table and pushes a RESET_CONSUMER_CLIENT_OFFSET command to online consumer clients.

for (int i = 0; i < topicConfig.getWriteQueueNums(); i++) {
    MessageQueue mq = new MessageQueue();
    mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());
    mq.setTopic(topic);
    mq.setQueueId(i);
    long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset(group, topic, i);
    long timeStampOffset = (timestamp == -1) ?
        this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i) :
        this.brokerController.getMessageStore().getOffsetInQueueByTime(topic, i, timestamp);
    if (isForce || timeStampOffset < consumerOffset) {
        offsetTable.put(mq, timeStampOffset);
    } else {
        offsetTable.put(mq, consumerOffset);
    }
}
ResetOffsetRequestHeader reqHeader = new ResetOffsetRequestHeader();
reqHeader.setTopic(topic);
reqHeader.setGroup(group);
reqHeader.setTimestamp(timestamp);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.RESET_CONSUMER_CLIENT_OFFSET, reqHeader);
for (Channel channel : consumerGroupInfo.getChannelInfoTable().keySet()) {
    this.brokerController.getRemotingServer().invokeOneway(channel, request, 5000);
}

Consumer Client Processing

When the consumer receives the reset command it suspends consumption, clears the affected ProcessQueue, updates the local offset table, and then resumes to trigger a rebalance.

consumer.suspend();
for (Map.Entry<MessageQueue, ProcessQueue> entry : consumer.getRebalanceImpl().getProcessQueueTable().entrySet()) {
    MessageQueue mq = entry.getKey();
    if (topic.equals(mq.getTopic()) && offsetTable.containsKey(mq)) {
        ProcessQueue pq = entry.getValue();
        pq.setDropped(true);
        pq.clear();
    }
}
TimeUnit.SECONDS.sleep(10); // wait for broker to push new offsets
for (MessageQueue mq : offsetTable.keySet()) {
    consumer.updateConsumeOffset(mq, offsetTable.get(mq));
    consumer.getRebalanceImpl().removeUnnecessaryMessageQueue(mq, consumer.getRebalanceImpl().getProcessQueueTable().get(mq));
}
consumer.resume();

Core Flow Diagram

Message backtrack flow diagram
Message backtrack flow diagram

Summary

RocketMQ’s message backtrack provides a reliable way to replay data after loss or processing errors. By resetting consumer offsets to a timestamp earlier than the earliest retained message, all messages are re‑consumed without code changes. The feature requires that the broker still stores the target messages and that the consumer client version is ≥ 3.0.7.

Extension – Consuming a Specific Offset Range

When precise control over the consumed range is needed (e.g., to catch up after a slow consumer), a pull consumer can be used because it fully manages offsets.

public class PullConsumerLocalTest {
    private static final Map<MessageQueue, Long> OFFSET_TABLE = new HashMap<>();
    private static final Map<MessageQueue, Pair<Long, Long>> QUEUE_OFFSET_SECTION_TABLE = new HashMap<>();
    private static final Long MIN_TIMESTAMP = 1722240069000L;
    private static final Long MAX_TIMESTAMP = 1722240160000L;

    public static void main(String[] args) throws MQClientException {
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.start();
        String topic = "TopicTest";
        init(consumer, topic);
        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues(topic);
        for (MessageQueue mq : mqs) {
            System.out.printf("Consume from the queue: %s%n", mq);
            while (true) {
                try {
                    PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
                    System.out.printf("%s%n", pullResult);
                    putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            // process messages
                            break;
                        case NO_NEW_MSG:
                            break SINGLE_MQ;
                        default:
                            break;
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
        consumer.shutdown();
    }

    private static void init(DefaultMQPullConsumer consumer, String topic) throws MQClientException {
        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues(topic);
        for (MessageQueue mq : mqs) {
            long minOffset = consumer.searchOffset(mq, MIN_TIMESTAMP);
            long maxOffset = consumer.searchOffset(mq, MAX_TIMESTAMP);
            QUEUE_OFFSET_SECTION_TABLE.put(mq, new Pair<>(minOffset, maxOffset));
            OFFSET_TABLE.put(mq, minOffset);
        }
    }

    private static long getMessageQueueOffset(MessageQueue mq) {
        return OFFSET_TABLE.getOrDefault(mq, 0L);
    }

    private static void putMessageQueueOffset(MessageQueue mq, long offset) {
        OFFSET_TABLE.put(mq, offset);
    }
}

Comparison of Recovery Strategies

Local message table – full business control; requires extra storage and duplicate‑consume logic.

Message reset (backtrack) – no code change; works for broadcast/cluster and ordered/unordered messages (requires idempotent state reset); unsupported on client versions < 3.0.7.

Pull consumer manual control – complete offset control; higher implementation complexity and offset lifecycle management.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

JavaRocketMQDistributed MessagingOffset ResetMessage Backtracking
Architect
Written by

Architect

Professional architect sharing high‑quality architecture insights. Topics include high‑availability, high‑performance, high‑stability architectures, big data, machine learning, Java, system and distributed architecture, AI, and practical large‑scale architecture case studies. Open to ideas‑driven architects who enjoy sharing and learning.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.