How to Build a High‑Performance Redis Delay Queue with SpringBoot
This article explains the design, implementation, and optimization of a Redis‑based delay queue using SpringBoot, Redisson, and distributed locks, covering use cases, architecture, data structures, job lifecycle, core code snippets, and future improvements for reliability and scalability.
Background
Typical business scenarios such as automatically closing unpaid orders, periodically checking refund status, implementing step‑wise order status synchronization, and handling asynchronous payment notifications require a delayed message queue.
Solution Options
Simple table scan : Periodically scan the database (e.g., every 2 seconds) to close expired orders. Easy to implement but wastes resources and may cause delays under high load.
Use a message broker like RabbitMQ to provide a ready‑made delay queue. Open‑source and stable, but adds operational overhead if the team does not already use a broker.
Leverage Redis data structures (zset, list) to create a custom delay queue, referred to as RedisDelayQueue .
Design Goals
Real‑time: allow second‑level error margin.
High availability: support single‑node and cluster deployments.
Message deletion: enable removal of specific messages at any time.
Reliability: guarantee at least once consumption.
Persistence: rely on Redis persistence, with optional backup to MongoDB.
Design Overview
Store all messages in Redis as a key‑value pool.
Use a ZSET as a priority queue, ordered by execution timestamp (score).
Use a LIST for FIFO consumption.
Maintain routing objects that map ZSET entries to the appropriate LIST.
Employ a timer to move due messages from the ZSET to the LIST.
Implement TTL‑based delay handling.
Data Structures
ZING:DELAY_QUEUE:JOB_POOL– a hash table storing complete job information. ZING:DELAY_QUEUE:BUCKET – a ZSET where each member is a job ID with its execution timestamp as the score. ZING:DELAY_QUEUE:QUEUE – a LIST for each topic that holds jobs ready for consumption.
Job Lifecycle
When a job is created, it is inserted into JOB_POOL and its execution time is recorded in BUCKET.
A carrier thread scans BUCKET, extracts jobs whose timestamps are past, removes them from the bucket, and pushes their IDs to the corresponding QUEUE.
Each topic’s LIST has a consumer thread that batches pending jobs and forwards them to a thread pool.
The consumer thread retrieves the full job data from JOB_POOL and executes the callback; on success the job is removed, otherwise it is rescheduled with an increased retry count.
Message Structure
jobId : unique identifier.
topic : business category.
delay : delay time in seconds (converted to absolute time on the server).
body : JSON payload for business processing.
retry : number of retry attempts.
url : callback URL.
Core Code Implementation
Technical Stack
SpringBoot, Redisson, Redis, distributed locks, scheduled tasks.
Job Entity
/**
* Message structure
*
* @author 睁眼看世界
* @date 2020年1月15日
*/
@Data
public class Job implements Serializable {
private static final long serialVersionUID = 1L;
/**
* Job's unique identifier.
*/
@NotBlank
private String jobId;
/**
* Job type, representing the business name.
*/
@NotBlank
private String topic;
/**
* Delay time in seconds (converted to absolute time on the server).
*/
private Long delay;
/**
* Job payload in JSON format.
*/
@NotBlank
private String body;
/**
* Number of retry attempts.
*/
private int retry = 0;
/**
* Notification URL.
*/
@NotBlank
private String url;
}
/**
* Job deletion object
*/
@Data
public class JobDie implements Serializable {
private static final long serialVersionUID = 1L;
@NotBlank
private String jobId;
@NotBlank
private String topic;
}Carrier Thread
/**
* Carrier thread
*
* @author 睁眼看世界
* @date 2020年1月17日
*/
@Slf4j
@Component
public class CarryJobScheduled {
@Autowired
private RedissonClient redissonClient;
/**
* Scheduled task that moves due jobs to the ready queue.
*/
@Scheduled(cron = "*/1 * * * * *")
public void carryJobToQueue() {
System.out.println("carryJobToQueue --->");
RLock lock = redissonClient.getLock(RedisQueueKey.CARRY_THREAD_LOCK);
try {
boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS);
if (!lockFlag) {
throw new BusinessException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL);
}
RScoredSortedSet<Object> bucketSet = redissonClient.getScoredSortedSet(RD_ZSET_BUCKET_PRE);
long now = System.currentTimeMillis();
Collection<Object> jobCollection = bucketSet.valueRange(0, false, now, true);
List<String> jobList = jobCollection.stream().map(String::valueOf).collect(Collectors.toList());
RList<String> readyQueue = redissonClient.getList(RD_LIST_TOPIC_PRE);
readyQueue.addAll(jobList);
bucketSet.removeAllAsync(jobList);
} catch (InterruptedException e) {
log.error("carryJobToQueue error", e);
} finally {
if (lock != null) {
lock.unlock();
}
}
}
}Consumer Thread
@Slf4j
@Component
public class ReadyQueueContext {
@Autowired
private RedissonClient redissonClient;
@Autowired
private ConsumerService consumerService;
/**
* Start consumer threads for each topic.
*/
@PostConstruct
public void startTopicConsumer() {
TaskManager.doTask(this::runTopicThreads, "开启TOPIC消费线程");
}
@SuppressWarnings("InfiniteLoopStatement")
private void runTopicThreads() {
while (true) {
RLock lock = null;
try {
lock = redissonClient.getLock(CONSUMER_TOPIC_LOCK);
} catch (Exception e) {
log.error("runTopicThreads getLock error", e);
}
try {
if (lock == null) {
continue;
}
boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS);
if (!lockFlag) {
continue;
}
// 1. Get data from ReadyQueue
RBlockingQueue<String> queue = redissonClient.getBlockingQueue(RD_LIST_TOPIC_PRE);
String topicId = queue.poll(60, TimeUnit.SECONDS);
if (StringUtils.isEmpty(topicId)) {
continue;
}
// 2. Get job metadata
RMap<String, Job> jobPoolMap = redissonClient.getMap(JOB_POOL_KEY);
Job job = jobPoolMap.get(topicId);
// 3. Consume
FutureTask<Boolean> taskResult = TaskManager.doFutureTask(() -> consumerService.consumerMessage(job.getUrl(), job.getBody()), job.getTopic() + "-->消费JobId-->" + job.getJobId());
if (taskResult.get()) {
// success, remove job
jobPoolMap.remove(topicId);
} else {
int retrySum = job.getRetry() + 1;
if (retrySum > RetryStrategyEnum.RETRY_FIVE.getRetry()) {
jobPoolMap.remove(topicId);
continue;
}
job.setRetry(retrySum);
long nextTime = job.getDelay() + RetryStrategyEnum.getDelayTime(job.getRetry()) * 1000;
log.info("next retryTime is [{}]", DateUtil.long2Str(nextTime));
RScoredSortedSet<Object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);
delayBucket.add(nextTime, topicId);
jobPoolMap.put(topicId, job);
}
} catch (Exception e) {
log.error("runTopicThreads error", e);
} finally {
if (lock != null) {
try {
lock.unlock();
} catch (Exception e) {
log.error("runTopicThreads unlock error", e);
}
}
}
}
}
}Job Add/Delete Service
@Slf4j
@Service
public class RedisDelayQueueServiceImpl implements RedisDelayQueueService {
@Autowired
private RedissonClient redissonClient;
@Override
public void addJob(Job job) {
RLock lock = redissonClient.getLock(ADD_JOB_LOCK + job.getJobId());
try {
boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS);
if (!lockFlag) {
throw new BusinessException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL);
}
String topicId = RedisQueueKey.getTopicId(job.getTopic(), job.getJobId());
RMap<String, Job> jobPool = redissonClient.getMap(RedisQueueKey.JOB_POOL_KEY);
if (jobPool.get(topicId) != null) {
throw new BusinessException(ErrorMessageEnum.JOB_ALREADY_EXIST);
}
jobPool.put(topicId, job);
RScoredSortedSet<Object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);
delayBucket.add(job.getDelay(), topicId);
} catch (InterruptedException e) {
log.error("addJob error", e);
} finally {
if (lock != null) {
lock.unlock();
}
}
}
@Override
public void deleteJob(JobDie jobDie) {
RLock lock = redissonClient.getLock(DELETE_JOB_LOCK + jobDie.getJobId());
try {
boolean lockFlag = lock.tryLock(LOCK_WAIT_TIME, LOCK_RELEASE_TIME, TimeUnit.SECONDS);
if (!lockFlag) {
throw new BusinessException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL);
}
String topicId = RedisQueueKey.getTopicId(jobDie.getTopic(), jobDie.getJobId());
RMap<String, Job> jobPool = redissonClient.getMap(RedisQueueKey.JOB_POOL_KEY);
jobPool.remove(topicId);
RScoredSortedSet<Object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);
delayBucket.remove(topicId);
} catch (InterruptedException e) {
log.error("addJob error", e);
} finally {
if (lock != null) {
lock.unlock();
}
}
}
}Future Optimizations
Introduce multiple queue instances and routing to improve throughput when message volume is high.
Persist messages to MongoDB to avoid data loss.
That concludes the current sharing.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Java Interview Crash Guide
Dedicated to sharing Java interview Q&A; follow and reply "java" to receive a free premium Java interview guide.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
