How Distributed Delayed Scheduling Guarantees High Availability with Redis and RabbitMQ
This article explains how a distributed delayed‑execution component solves business scenarios that require timed actions, ensuring high availability and data consistency by leveraging Redis sorted sets and RabbitMQ dead‑letter queues, while providing a simple Java API and optional DB‑backed reliability.
Distributed Delay Scheduling: What Problems It Solves
In many business workflows a task must be triggered after a certain condition is met and after a configurable delay, such as sending reminder SMSes 1, 3, or 7 days after an order remains unpaid. The component described here is packaged as a JAR, can be integrated into third‑party systems with minimal configuration, and guarantees high availability and data consistency as long as not all nodes in the cluster are down.
Implementation Logic
The solution uses two different back‑ends:
Redis : stores delayed tasks in a zset (score = execution timestamp) and a string that holds the task payload. Workers poll the zset, fetch due task IDs, retrieve the payload from the string, and execute it.
RabbitMQ : relies on the dead‑letter‑queue mechanism. A message is published with an expiration time; once expired it is routed to a designated exchange and then to a specific queue where a consumer processes it.
Redis Implementation Details
When the JVM starts, a time‑wheel thread continuously polls the Redis zset for entries whose score is less than the current time. To avoid duplicate execution in a distributed environment, a Lua script atomically fetches the task ID from the zset, retrieves the payload from the string, and deletes the entry.
public Object pop(String appName, String key) {
String script = "local v = redis.call('get', KEYS[1]) "
+ "if v == nil then return v "
+ "else redis.call('del', KEYS[1]); return v "
+ "end";
return RedisUtil.evalByte(appName, script, Collections.singletonList(key), Collections.EMPTY_LIST);
}The core processing loop retrieves due task IDs, removes them from the bucket, deserializes the payload, and dispatches it to the appropriate handler:
private void dealJob() {
while (true) {
Set<String> traceIds = redisOperator.rangeByScoreWithSortedSet(null, DELAY_BUCKET, 0, DateUtil.getTodayLong(), 0, DELAY_BUCKET_ONCE_GET_MAX_COUNT);
if (ListHelper.isNotEmpty(traceIds)) {
redisOperator.zrem(null, DELAY_BUCKET, traceIds);
for (String traceId : traceIds) {
Object result = redisOperator.pop(null, DELAY_JOB_PRE + traceId);
if (result != null) {
DelayExecuteMessage message = SerializationUtil.decode((byte[]) result, DelayExecuteMessage.class);
String key = message.getExecuteKey();
ExecutorService pool = pools.get(key);
MessageHandler handler = dealhandles.get(key);
pool.execute(() -> {
try {
handler.execute((byte[]) result);
} catch (Exception e) {
logger.error("consumer error:[key:{},traceId:{},exception:{}]", key, message.getTraceId(), e);
}
});
} else {
// short sleep to avoid busy loop
try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }
}
}
}
}
}Adding a new delayed task involves inserting the payload into a Redis hash pool and the task ID into the zset:
public <T extends Serializable> void sendDelayMessage(DelayExecuteMessage<T> message, int delayTime) throws Exception {
// add task to Redis hash pool
addJobToPool(message, delayTime);
// add task ID to the sorted set bucket
addToBucket(message, delayTime);
}RabbitMQ Implementation Details
RabbitMQ implements delayed delivery via dead‑letter queues. When a delayed message is published with an expiration, it is first routed to a dedicated exchange and stored in a queue. After expiration, the broker routes the message to another exchange, which forwards it to the queue associated with the @DelayExecute(dealKey="...") annotation. The consumer then processes the message.
Integration Example
Add the following Maven dependencies:
<dependency>
<groupId>com.ziroom.sms</groupId>
<artifactId>medusa-util</artifactId>
</dependency>
<dependency>
<groupId>com.ziroom.sms</groupId>
<artifactId>medusa-redis</artifactId>
</dependency>Send a delayed message:
public void sendDelayMessage(int delayTime) {
DelayExecuteMessage<String> message = new DelayExecuteMessage<>();
message.setExecuteKey("threeDatSendMsg");
message.setData("亲爱的用户,为了不影响您的后续使用……");
try {
SendDelayMessageUtil.sendDelayMessage(message, delayTime, false);
} catch (Exception e) {
e.printStackTrace();
}
}Receive and handle the delayed message:
@DelayExecute(dealKey = "threeDatSendMsg")
public void dealDelayMessage(DelayExecuteMessage<String> message) {
System.out.println("接收到延时消息:" + message.getData());
}The component also provides a DB‑backed reliability layer. Three tables ( ready, finish, error) store pending tasks, successfully processed tasks, and failed tasks respectively. When retries are enabled, task methods must be idempotent; otherwise, set the retry flag to false when sending.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
