Unlocking RocketMQ: How Scheduled Tasks Keep Your Messaging System Healthy

This article examines the numerous scheduled tasks in RocketMQ—covering architecture, producer and consumer maintenance, broker housekeeping, and NameServer management—to reveal how periodic jobs ensure reliable message handling, performance monitoring, and system self‑protection.

Su San Talks Tech
Su San Talks Tech
Su San Talks Tech
Unlocking RocketMQ: How Scheduled Tasks Keep Your Messaging System Healthy

In this article we explore RocketMQ's scheduled tasks to deepen understanding of its message processing mechanisms and design philosophy.

1 Architecture Review

First, a quick recap of RocketMQ's architecture.

RocketMQ architecture diagram
RocketMQ architecture diagram

NameServer nodes are deployed in a cluster but do not synchronize data; each node holds the full routing information, so a single node failure does not affect the cluster. Brokers can be deployed in master‑slave clusters for replication and high availability, and each broker maintains long connections to all NameServers to register topic routes and send heartbeats. Producers and consumers establish long connections to any NameServer and periodically pull routing information.

2 Producer and Consumer

2.1 Fetching NameServer Addresses

Producers and consumers must first obtain NameServer addresses. Both use a scheduled task that runs every two minutes to refresh the local cache.

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        try {
            MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
        } catch (Exception e) {
            log.error("ScheduledTask fetchNameServerAddr exception", e);
        }
    }
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);

2.2 Updating Routing Information

Producers and consumers periodically pull subscription data from NameServer and update the local cache, defaulting to a 30‑second interval (configurable).

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        try {
            MQClientInstance.this.updateTopicRouteInfoFromNameServer();
        } catch (Exception e) {
            log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
        }
    }
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);

2.3 Sending Heartbeats to Brokers

Producers and consumers clean offline brokers from the cached list and send heartbeats, defaulting to a 30‑second interval (configurable).

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        try {
            MQClientInstance.this.cleanOfflineBroker();
            MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
        } catch (Exception e) {
            log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
        }
    }
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);

2.4 Persisting Consumer Offsets

Consumers periodically persist MessageQueue offsets, defaulting to every 5 seconds. In cluster mode offsets are stored on the broker; in broadcast mode they are stored locally.

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        try {
            MQClientInstance.this.persistAllConsumerOffset();
        } catch (Exception e) {
            log.error("ScheduledTask persistAllConsumerOffset exception", e);
        }
    }
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

2.5 Adjusting Core Thread Count

For push‑mode consumers, the core thread pool is adjusted based on pending messages; in version 4.9.4 the implementation is a placeholder.

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        try {
            MQClientInstance.this.adjustThreadPool();
        } catch (Exception e) {
            log.error("ScheduledTask adjustThreadPool exception", e);
        }
    }
}, 1, 1, TimeUnit.MINUTES);

2.6 Expiring Requests

Producers and consumers scan locally cached requests; if a request's start time plus timeout (plus 1 s) is earlier than the current time, it is considered expired and a callback is triggered.

this.timer.scheduleAtFixedRate(new TimerTask() {
    @Override
    public void run() {
        try {
            NettyRemotingClient.this.scanResponseTable();
        } catch (Throwable e) {
            log.error("scanResponseTable exception", e);
        }
    }
}, 1000 * 3, 1000);

2.7 Producer Performance Recording

After sending messages, the producer records success/failure counts and latency to calculate TPS and response time.

executorService.scheduleAtFixedRate(new TimerTask() {
    @Override
    public void run() {
        snapshotList.addLast(statsBenchmark.createSnapshot());
        if (snapshotList.size() > 10) {
            snapshotList.removeFirst();
        }
    }
}, 1000, 1000, TimeUnit.MILLISECONDS);

executorService.scheduleAtFixedRate(new TimerTask() {
    @Override
    public void run() {
        try {
            this.printStats();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}, 10000, 10000, TimeUnit.MILLISECONDS);

2.8 Consumer Tasks

2.8.1 Locking MessageQueue for Ordered Consumption

For ordered messages, a consumer periodically (default 20 s) sends a lock request to the broker; the broker binds the MessageQueue, group, and clientId, preventing other consumers from pulling from the same queue.

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        try {
            ConsumeMessageOrderlyService.this.lockMQPeriodically();
        } catch (Throwable e) {
            log.error("scheduleAtFixedRate lockMQPeriodically exception", e);
        }
    }
}, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);

2.8.2 Performance Snapshots

Consumers record a snapshot each second, capturing creation‑to‑consume time, store‑to‑consume time, total received messages, and failures.

executorService.scheduleAtFixedRate(new TimerTask() {
    @Override
    public void run() {
        snapshotList.addLast(statsBenchmarkConsumer.createSnapshot());
        if (snapshotList.size() > 10) {
            snapshotList.removeFirst();
        }
    }
}, 1000, 1000, TimeUnit.MILLISECONDS);

Every 10 seconds the consumer computes and prints TPS, failure count, average B2C and S2C latency, and max latencies.

private void printStats() {
    if (snapshotList.size() >= 10) {
        Long[] begin = snapshotList.getFirst();
        Long[] end = snapshotList.getLast();
        long consumeTps = (long) (((end[1] - begin[1]) / (double) (end[0] - begin[0])) * 1000L);
        double averageB2CRT = (end[2] - begin[2]) / (double) (end[1] - begin[1]);
        double averageS2CRT = (end[3] - begin[3]) / (double) (end[1] - begin[1]);
        long failCount = end[4] - begin[4];
        long b2cMax = statsBenchmarkConsumer.getBorn2ConsumerMaxRT().get();
        long s2cMax = statsBenchmarkConsumer.getStore2ConsumerMaxRT().get();
        statsBenchmarkConsumer.getBorn2ConsumerMaxRT().set(0);
        statsBenchmarkConsumer.getStore2ConsumerMaxRT().set(0);
        System.out.printf("Current Time: %s TPS: %d FAIL: %d AVG(B2C) RT(ms): %7.3f AVG(S2C) RT(ms): %7.3f MAX(B2C) RT(ms): %d MAX(S2C) RT(ms): %d%n",
                System.currentTimeMillis(), consumeTps, failCount, averageB2CRT, averageS2CRT, b2cMax, s2cMax);
    }
}

2.8.3 Cleaning Expired Messages

Consumers periodically check the locally pulled message list; if a message is older than the configured expiration (default 15 minutes), it is resent to the broker and removed locally.

this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        try {
            cleanExpireMsg();
        } catch (Throwable e) {
            log.error("scheduleAtFixedRate cleanExpireMsg exception", e);
        }
    }
}, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);

2.8.4 Refreshing MessageQueue Information

Every 30 seconds the consumer pulls MessageQueue info from the NameServer and updates the local cache if differences are detected.

scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        try {
            fetchTopicMessageQueuesAndCompare();
        } catch (Exception e) {
            log.error("ScheduledTask fetchMessageQueuesAndCompare exception", e);
        }
    }
}, 1000 * 10, this.getDefaultLitePullConsumer().getTopicMetadataCheckIntervalMillis(), TimeUnit.MILLISECONDS);

3 Broker

3.1 State Sampling

Brokers sample statistics such as total messages sent per Topic, MessageQueue, and Group, aggregating them per second, minute, and hour. Six scheduled tasks handle these samplings.

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        try {
            samplingInSeconds();
        } catch (Throwable ignored) {}
    }
}, 0, 10, TimeUnit.SECONDS);

3.2 Recording Message Delay

Brokers record the time difference between persisting a message to disk and reading it, printing the delay at regular intervals.

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        try {
            printAtMinutes();
        } catch (Throwable ignored) {}
    }
}, Math.abs(UtilAll.computeNextMinutesTimeMillis() - System.currentTimeMillis()), 1000 * 60 * 5, TimeUnit.MILLISECONDS);

3.3 Persisting Data

Brokers periodically persist consumer offsets, topic configurations, and subscription group settings (default every 10 seconds, configurable).

this.deliverExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        try {
            if (started.get()) {
                ScheduleMessageService.this.persist();
            }
        } catch (Throwable e) {
            log.error("scheduleAtFixedRate flush exception", e);
        }
    }
}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval(), TimeUnit.MILLISECONDS);

3.4 Expiring Requests

Brokers scan cached requests; if a request's start time plus timeout (plus 1 s) is earlier than now, it expires and triggers a callback (run every 3 seconds).

this.timer.scheduleAtFixedRate(new TimerTask() {
    @Override
    public void run() {
        try {
            NettyRemotingServer.this.scanResponseTable();
        } catch (Throwable e) {
            log.error("scanResponseTable exception", e);
        }
    }
}, 1000 * 3, 1000);

3.5 Filter Service Creation

When a consumer registers a filterClass, the broker creates a FilterServer via a scheduled task.

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        try {
            FilterServerManager.this.createFilterServer();
        } catch (Exception e) {
            log.error("", e);
        }
    }
}, 1000 * 5, 1000 * 30, TimeUnit.MILLISECONDS);
FilterServer diagram
FilterServer diagram

3.6 Daily Message Count Recording

Brokers record the total number of messages sent and received the previous day, running once per day.

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        try {
            BrokerController.this.getBrokerStats().record();
        } catch (Throwable e) {
            log.error("schedule record error.", e);
        }
    }
}, initialDelay, period, TimeUnit.MILLISECONDS);

3.7 Persisting Offset

Broker persists message offsets every 5 seconds (configurable).

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        try {
            BrokerController.this.consumerOffsetManager.persist();
        } catch (Throwable e) {
            log.error("schedule persist consumerOffset error.", e);
        }
    }
}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);

3.8 Persisting Filter Parameters

Registered filter classes are persisted to files at regular intervals.

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        try {
            BrokerController.this.consumerFilterManager.persist();
        } catch (Throwable e) {
            log.error("schedule persist consumer filter error.", e);
        }
    }
}, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);

3.9 Broker Self‑Protection

If a consumer reads messages too slowly, the broker marks it as unreadable to prevent further pulling.

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        try {
            BrokerController.this.protectBroker();
        } catch (Throwable e) {
            log.error("protectBroker error.", e);
        }
    }
}, 3, 3, TimeUnit.MINUTES);

3.10 Watermark Printing

Brokers print watermarks (send/receive/transaction/query delays) every second.

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        try {
            BrokerController.this.printWaterMark();
        } catch (Throwable e) {
            log.error("printWaterMark error.", e);
        }
    }
}, 10, 1, TimeUnit.SECONDS);

3.11 Offset Difference Logging

Brokers periodically log the difference between the latest message offset and the offset already dispatched to MessageQueue and Index.

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        try {
            log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());
        } catch (Throwable e) {
            log.error("schedule dispatchBehindBytes error.", e);
        }
    }
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);

3.12 Periodic NameServer Address Refresh

Brokers fetch NameServer addresses and update the local cache at regular intervals.

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        try {
            BrokerController.this.brokerOuterAPI.fetchNameServerAddr();
        } catch (Throwable e) {
            log.error("ScheduledTask fetchNameServerAddr exception", e);
        }
    }
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);

3.13 Master‑Slave Offset Difference

Brokers log the offset gap between master and slave nodes periodically.

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        try {
            BrokerController.this.printMasterAndSlaveDiff();
        } catch (Throwable e) {
            log.error("schedule printMasterAndSlaveDiff error.", e);
        }
    }
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);

3.14 Registering with NameServer

Brokers register themselves with all NameServers every 30 seconds (configurable, max 60 seconds).

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        try {
            BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
        } catch (Throwable e) {
            log.error("registerBrokerAll Exception", e);
        }
    }
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);

3.15 Slave Synchronization

The master broker synchronizes data to slaves (topic config, consumer offsets, delay offsets, group config) every 10 seconds.

slaveSyncFuture = this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        try {
            BrokerController.this.slaveSynchronize.syncAll();
        } catch (Throwable e) {
            log.error("ScheduledTask SlaveSynchronize syncAll error.", e);
        }
    }
}, 1000 * 3, 1000 * 10, TimeUnit.MILLISECONDS);

3.16 Deleting Expired Files

Brokers periodically delete expired CommitLog and ConsumeQueue files (default every 10 seconds, configurable).

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        DefaultMessageStore.cleanFilesPeriodically();
    }
}, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);

3.17 File Size Check

Every 10 minutes the broker checks CommitLog file sizes for consistency.

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        DefaultMessageStore.checkSelf();
    }
}, 1, 10, TimeUnit.MINUTES);

3.18 Stack Mapping Recording

If debugLockEnable is on, the broker records stack traces of all live threads every second.

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        if (DefaultMessageStore.this.getMessageStoreConfig().isDebugLockEnable()) {
            try {
                if (DefaultMessageStore.this.commitLog.getBeginTimeInLock() != 0) {
                    long lockTime = System.currentTimeMillis() - DefaultMessageStore.this.commitLog.getBeginTimeInLock();
                    if (lockTime > 1000 && lockTime < 10000000) {
                        String stack = UtilAll.jstack();
                        String fileName = System.getProperty("user.home") + File.separator + "debug/lock/stack-" +
                                DefaultMessageStore.this.commitLog.getBeginTimeInLock() + "-" + lockTime;
                        MixAll.string2FileNotSafe(stack, fileName);
                    }
                }
            } catch (Exception e) {}
        }
    }
}, 1, 1, TimeUnit.SECONDS);

3.19 Disk Space Check

The broker checks the disk space used by CommitLog every 10 seconds and logs an error if the threshold is exceeded.

this.diskCheckScheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        DefaultMessageStore.this.cleanCommitLogService.isSpaceFull();
    }
}, 1000L, 10000L, TimeUnit.MILLISECONDS);

3.20 Delayed Message Offset Persistence

RocketMQ defines 18 delay levels (e.g., 1s, 5s, …, 2h). Offsets for each level are stored in a ConcurrentMap and persisted to disk every 10 seconds.

private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
this.deliverExecutorService.scheduleAtFixedRate(new Runnable() {
    @Override
    public void run() {
        try {
            if (started.get()) {
                ScheduleMessageService.this.persist();
            }
        } catch (Throwable e) {
            log.error("scheduleAtFixedRate flush exception", e);
        }
    }
}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval(), TimeUnit.MILLISECONDS);

4 NameServer

4.1 Scanning Expired Brokers

NameServers maintain a broker list and periodically (every 10 seconds) remove brokers whose registration messages have expired (over 120 seconds).

this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.routeInfoManager::scanNotActiveBroker, 5, 10, TimeUnit.SECONDS);

4.2 Printing Configuration

Upon startup, NameServer loads KV‑format configuration into a config table and periodically prints the configuration every 10 minutes.

this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.kvConfigManager::printAllPeriodically, 1, 10, TimeUnit.MINUTES);

5 Summary

RocketMQ contains many scheduled tasks that enhance its design, covering business processing, monitoring logs, heartbeats, cleanup operations, connection termination, and data persistence. Understanding these tasks provides deeper insight into RocketMQ's architecture and reliability.

Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

JavaBackend DevelopmentRocketMQScheduled Tasks
Su San Talks Tech
Written by

Su San Talks Tech

Su San, former staff at several leading tech companies, is a top creator on Juejin and a premium creator on CSDN, and runs the free coding practice site www.susan.net.cn.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.