Unlocking RocketMQ: How Scheduled Tasks Keep Your Messaging System Healthy
This article examines the numerous scheduled tasks in RocketMQ—covering architecture, producer and consumer maintenance, broker housekeeping, and NameServer management—to reveal how periodic jobs ensure reliable message handling, performance monitoring, and system self‑protection.
In this article we explore RocketMQ's scheduled tasks to deepen understanding of its message processing mechanisms and design philosophy.
1 Architecture Review
First, a quick recap of RocketMQ's architecture.
NameServer nodes are deployed in a cluster but do not synchronize data; each node holds the full routing information, so a single node failure does not affect the cluster. Brokers can be deployed in master‑slave clusters for replication and high availability, and each broker maintains long connections to all NameServers to register topic routes and send heartbeats. Producers and consumers establish long connections to any NameServer and periodically pull routing information.
2 Producer and Consumer
2.1 Fetching NameServer Addresses
Producers and consumers must first obtain NameServer addresses. Both use a scheduled task that runs every two minutes to refresh the local cache.
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.mQClientAPIImpl.fetchNameServerAddr();
} catch (Exception e) {
log.error("ScheduledTask fetchNameServerAddr exception", e);
}
}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);2.2 Updating Routing Information
Producers and consumers periodically pull subscription data from NameServer and update the local cache, defaulting to a 30‑second interval (configurable).
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.updateTopicRouteInfoFromNameServer();
} catch (Exception e) {
log.error("ScheduledTask updateTopicRouteInfoFromNameServer exception", e);
}
}
}, 10, this.clientConfig.getPollNameServerInterval(), TimeUnit.MILLISECONDS);2.3 Sending Heartbeats to Brokers
Producers and consumers clean offline brokers from the cached list and send heartbeats, defaulting to a 30‑second interval (configurable).
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.cleanOfflineBroker();
MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
} catch (Exception e) {
log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
}
}
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);2.4 Persisting Consumer Offsets
Consumers periodically persist MessageQueue offsets, defaulting to every 5 seconds. In cluster mode offsets are stored on the broker; in broadcast mode they are stored locally.
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.persistAllConsumerOffset();
} catch (Exception e) {
log.error("ScheduledTask persistAllConsumerOffset exception", e);
}
}
}, 1000 * 10, this.clientConfig.getPersistConsumerOffsetInterval(), TimeUnit.MILLISECONDS);2.5 Adjusting Core Thread Count
For push‑mode consumers, the core thread pool is adjusted based on pending messages; in version 4.9.4 the implementation is a placeholder.
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
MQClientInstance.this.adjustThreadPool();
} catch (Exception e) {
log.error("ScheduledTask adjustThreadPool exception", e);
}
}
}, 1, 1, TimeUnit.MINUTES);2.6 Expiring Requests
Producers and consumers scan locally cached requests; if a request's start time plus timeout (plus 1 s) is earlier than the current time, it is considered expired and a callback is triggered.
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
NettyRemotingClient.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);2.7 Producer Performance Recording
After sending messages, the producer records success/failure counts and latency to calculate TPS and response time.
executorService.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
snapshotList.addLast(statsBenchmark.createSnapshot());
if (snapshotList.size() > 10) {
snapshotList.removeFirst();
}
}
}, 1000, 1000, TimeUnit.MILLISECONDS);
executorService.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
this.printStats();
} catch (Exception e) {
e.printStackTrace();
}
}
}, 10000, 10000, TimeUnit.MILLISECONDS);2.8 Consumer Tasks
2.8.1 Locking MessageQueue for Ordered Consumption
For ordered messages, a consumer periodically (default 20 s) sends a lock request to the broker; the broker binds the MessageQueue, group, and clientId, preventing other consumers from pulling from the same queue.
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
ConsumeMessageOrderlyService.this.lockMQPeriodically();
} catch (Throwable e) {
log.error("scheduleAtFixedRate lockMQPeriodically exception", e);
}
}
}, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);2.8.2 Performance Snapshots
Consumers record a snapshot each second, capturing creation‑to‑consume time, store‑to‑consume time, total received messages, and failures.
executorService.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
snapshotList.addLast(statsBenchmarkConsumer.createSnapshot());
if (snapshotList.size() > 10) {
snapshotList.removeFirst();
}
}
}, 1000, 1000, TimeUnit.MILLISECONDS);Every 10 seconds the consumer computes and prints TPS, failure count, average B2C and S2C latency, and max latencies.
private void printStats() {
if (snapshotList.size() >= 10) {
Long[] begin = snapshotList.getFirst();
Long[] end = snapshotList.getLast();
long consumeTps = (long) (((end[1] - begin[1]) / (double) (end[0] - begin[0])) * 1000L);
double averageB2CRT = (end[2] - begin[2]) / (double) (end[1] - begin[1]);
double averageS2CRT = (end[3] - begin[3]) / (double) (end[1] - begin[1]);
long failCount = end[4] - begin[4];
long b2cMax = statsBenchmarkConsumer.getBorn2ConsumerMaxRT().get();
long s2cMax = statsBenchmarkConsumer.getStore2ConsumerMaxRT().get();
statsBenchmarkConsumer.getBorn2ConsumerMaxRT().set(0);
statsBenchmarkConsumer.getStore2ConsumerMaxRT().set(0);
System.out.printf("Current Time: %s TPS: %d FAIL: %d AVG(B2C) RT(ms): %7.3f AVG(S2C) RT(ms): %7.3f MAX(B2C) RT(ms): %d MAX(S2C) RT(ms): %d%n",
System.currentTimeMillis(), consumeTps, failCount, averageB2CRT, averageS2CRT, b2cMax, s2cMax);
}
}2.8.3 Cleaning Expired Messages
Consumers periodically check the locally pulled message list; if a message is older than the configured expiration (default 15 minutes), it is resent to the broker and removed locally.
this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
cleanExpireMsg();
} catch (Throwable e) {
log.error("scheduleAtFixedRate cleanExpireMsg exception", e);
}
}
}, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES);2.8.4 Refreshing MessageQueue Information
Every 30 seconds the consumer pulls MessageQueue info from the NameServer and updates the local cache if differences are detected.
scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
fetchTopicMessageQueuesAndCompare();
} catch (Exception e) {
log.error("ScheduledTask fetchMessageQueuesAndCompare exception", e);
}
}
}, 1000 * 10, this.getDefaultLitePullConsumer().getTopicMetadataCheckIntervalMillis(), TimeUnit.MILLISECONDS);3 Broker
3.1 State Sampling
Brokers sample statistics such as total messages sent per Topic, MessageQueue, and Group, aggregating them per second, minute, and hour. Six scheduled tasks handle these samplings.
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
samplingInSeconds();
} catch (Throwable ignored) {}
}
}, 0, 10, TimeUnit.SECONDS);3.2 Recording Message Delay
Brokers record the time difference between persisting a message to disk and reading it, printing the delay at regular intervals.
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
printAtMinutes();
} catch (Throwable ignored) {}
}
}, Math.abs(UtilAll.computeNextMinutesTimeMillis() - System.currentTimeMillis()), 1000 * 60 * 5, TimeUnit.MILLISECONDS);3.3 Persisting Data
Brokers periodically persist consumer offsets, topic configurations, and subscription group settings (default every 10 seconds, configurable).
this.deliverExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
if (started.get()) {
ScheduleMessageService.this.persist();
}
} catch (Throwable e) {
log.error("scheduleAtFixedRate flush exception", e);
}
}
}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval(), TimeUnit.MILLISECONDS);3.4 Expiring Requests
Brokers scan cached requests; if a request's start time plus timeout (plus 1 s) is earlier than now, it expires and triggers a callback (run every 3 seconds).
this.timer.scheduleAtFixedRate(new TimerTask() {
@Override
public void run() {
try {
NettyRemotingServer.this.scanResponseTable();
} catch (Throwable e) {
log.error("scanResponseTable exception", e);
}
}
}, 1000 * 3, 1000);3.5 Filter Service Creation
When a consumer registers a filterClass, the broker creates a FilterServer via a scheduled task.
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
FilterServerManager.this.createFilterServer();
} catch (Exception e) {
log.error("", e);
}
}
}, 1000 * 5, 1000 * 30, TimeUnit.MILLISECONDS);3.6 Daily Message Count Recording
Brokers record the total number of messages sent and received the previous day, running once per day.
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.getBrokerStats().record();
} catch (Throwable e) {
log.error("schedule record error.", e);
}
}
}, initialDelay, period, TimeUnit.MILLISECONDS);3.7 Persisting Offset
Broker persists message offsets every 5 seconds (configurable).
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.consumerOffsetManager.persist();
} catch (Throwable e) {
log.error("schedule persist consumerOffset error.", e);
}
}
}, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);3.8 Persisting Filter Parameters
Registered filter classes are persisted to files at regular intervals.
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.consumerFilterManager.persist();
} catch (Throwable e) {
log.error("schedule persist consumer filter error.", e);
}
}
}, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS);3.9 Broker Self‑Protection
If a consumer reads messages too slowly, the broker marks it as unreadable to prevent further pulling.
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.protectBroker();
} catch (Throwable e) {
log.error("protectBroker error.", e);
}
}
}, 3, 3, TimeUnit.MINUTES);3.10 Watermark Printing
Brokers print watermarks (send/receive/transaction/query delays) every second.
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.printWaterMark();
} catch (Throwable e) {
log.error("printWaterMark error.", e);
}
}
}, 10, 1, TimeUnit.SECONDS);3.11 Offset Difference Logging
Brokers periodically log the difference between the latest message offset and the offset already dispatched to MessageQueue and Index.
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes());
} catch (Throwable e) {
log.error("schedule dispatchBehindBytes error.", e);
}
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);3.12 Periodic NameServer Address Refresh
Brokers fetch NameServer addresses and update the local cache at regular intervals.
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.brokerOuterAPI.fetchNameServerAddr();
} catch (Throwable e) {
log.error("ScheduledTask fetchNameServerAddr exception", e);
}
}
}, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS);3.13 Master‑Slave Offset Difference
Brokers log the offset gap between master and slave nodes periodically.
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.printMasterAndSlaveDiff();
} catch (Throwable e) {
log.error("schedule printMasterAndSlaveDiff error.", e);
}
}
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);3.14 Registering with NameServer
Brokers register themselves with all NameServers every 30 seconds (configurable, max 60 seconds).
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);3.15 Slave Synchronization
The master broker synchronizes data to slaves (topic config, consumer offsets, delay offsets, group config) every 10 seconds.
slaveSyncFuture = this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
BrokerController.this.slaveSynchronize.syncAll();
} catch (Throwable e) {
log.error("ScheduledTask SlaveSynchronize syncAll error.", e);
}
}
}, 1000 * 3, 1000 * 10, TimeUnit.MILLISECONDS);3.16 Deleting Expired Files
Brokers periodically delete expired CommitLog and ConsumeQueue files (default every 10 seconds, configurable).
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
DefaultMessageStore.cleanFilesPeriodically();
}
}, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS);3.17 File Size Check
Every 10 minutes the broker checks CommitLog file sizes for consistency.
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
DefaultMessageStore.checkSelf();
}
}, 1, 10, TimeUnit.MINUTES);3.18 Stack Mapping Recording
If debugLockEnable is on, the broker records stack traces of all live threads every second.
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
if (DefaultMessageStore.this.getMessageStoreConfig().isDebugLockEnable()) {
try {
if (DefaultMessageStore.this.commitLog.getBeginTimeInLock() != 0) {
long lockTime = System.currentTimeMillis() - DefaultMessageStore.this.commitLog.getBeginTimeInLock();
if (lockTime > 1000 && lockTime < 10000000) {
String stack = UtilAll.jstack();
String fileName = System.getProperty("user.home") + File.separator + "debug/lock/stack-" +
DefaultMessageStore.this.commitLog.getBeginTimeInLock() + "-" + lockTime;
MixAll.string2FileNotSafe(stack, fileName);
}
}
} catch (Exception e) {}
}
}
}, 1, 1, TimeUnit.SECONDS);3.19 Disk Space Check
The broker checks the disk space used by CommitLog every 10 seconds and logs an error if the threshold is exceeded.
this.diskCheckScheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
DefaultMessageStore.this.cleanCommitLogService.isSpaceFull();
}
}, 1000L, 10000L, TimeUnit.MILLISECONDS);3.20 Delayed Message Offset Persistence
RocketMQ defines 18 delay levels (e.g., 1s, 5s, …, 2h). Offsets for each level are stored in a ConcurrentMap and persisted to disk every 10 seconds.
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"; this.deliverExecutorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
try {
if (started.get()) {
ScheduleMessageService.this.persist();
}
} catch (Throwable e) {
log.error("scheduleAtFixedRate flush exception", e);
}
}
}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval(), TimeUnit.MILLISECONDS);4 NameServer
4.1 Scanning Expired Brokers
NameServers maintain a broker list and periodically (every 10 seconds) remove brokers whose registration messages have expired (over 120 seconds).
this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.routeInfoManager::scanNotActiveBroker, 5, 10, TimeUnit.SECONDS);4.2 Printing Configuration
Upon startup, NameServer loads KV‑format configuration into a config table and periodically prints the configuration every 10 minutes.
this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.kvConfigManager::printAllPeriodically, 1, 10, TimeUnit.MINUTES);5 Summary
RocketMQ contains many scheduled tasks that enhance its design, covering business processing, monitoring logs, heartbeats, cleanup operations, connection termination, and data persistence. Understanding these tasks provides deeper insight into RocketMQ's architecture and reliability.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Su San Talks Tech
Su San, former staff at several leading tech companies, is a top creator on Juejin and a premium creator on CSDN, and runs the free coding practice site www.susan.net.cn.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
