Building a Redis-Based Delay Queue with Spring Boot: Design & Implementation
This article walks through the design, core components, task states, API endpoints, and full Java implementation of a Redis-powered delay queue using Spring Boot, complete with code samples, container structures, timer processing, and testing procedures.
Design Overview
The article implements a Redis‑based delay queue for Spring Boot, following the design published by the Youzan team. The system stores tasks in a job pool, schedules execution times, and moves tasks through delay buckets to ready queues based on topic.
Business Process
User submits a task; the task is pushed to the delay queue.
The delay queue stores the task metadata in a job pool (key: task ID, value: Job) and calculates the absolute execution timestamp.
A lightweight DelayJob containing only the task ID and execution time is inserted into one of several delay buckets.
A timer component continuously polls each bucket. When the scheduled time arrives, it retrieves the full task metadata from the job pool.
If the task has been deleted, it is ignored; otherwise the timer recomputes the next execution time.
If the recomputed time is still in the future, the task is moved to a ready queue that corresponds to its topic and removed from the bucket. If the time has already passed, the task is re‑queued with a new timestamp.
Consumers poll the ready queue for their topic, process the job, and the server re‑calculates a new execution time based on the task’s TTR (time‑to‑run) and places the task back into a delay bucket.
After successful consumption, a finish message is sent and the server deletes the task metadata from the job pool.
Components
Delay Queue (Redis) – orchestrates delayed message delivery.
Job Pool – a Redis hash that stores full Job objects (key = ID, value = job).
Delay Bucket – a set of Redis sorted‑set (ZSet) buckets; each bucket holds DelayJob entries ordered by execution timestamp.
Timer – a thread pool where each thread scans a single bucket, extracts the earliest DelayJob, validates it, and moves it to the appropriate ready queue.
Ready Queue – a Redis list per topic that stores tasks ready for immediate processing.
Task States
ready – executable.
delay – waiting for its scheduled time.
reserved – consumed but not yet completed (TTR in effect).
deleted – finished or removed.
API Interface
Implementation
Job and DelayJob Entities
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Job implements Serializable {
@JsonSerialize(using = ToStringSerializer.class)
private Long id; // unique identifier
private String topic; // business type
private long delayTime; // delay duration (ms)
private long ttrTime; // execution timeout (ms)
private String message; // payload
private int retryCount;
private JobStatus status;
}
@Data
@AllArgsConstructor
public class DelayJob implements Serializable {
private long jodId; // reference to Job.id
private long delayDate; // execution timestamp (ms)
private String topic;
public DelayJob(Job job) {
this.jodId = job.getId();
this.delayDate = System.currentTimeMillis() + job.getDelayTime();
this.topic = job.getTopic();
}
public DelayJob(Object value, Double score) {
this.jodId = Long.parseLong(String.valueOf(value));
this.delayDate = System.currentTimeMillis() + score.longValue();
}
}JobPool (job metadata container)
@Component
@Slf4j
public class JobPool {
@Autowired
private RedisTemplate redisTemplate;
private static final String NAME = "job.pool";
private BoundHashOperations getPool() {
return redisTemplate.boundHashOps(NAME);
}
public void addJob(Job job) {
log.info("Add job: {}", JSON.toJSONString(job));
getPool().put(job.getId(), job);
}
public Job getJob(Long jobId) {
Object o = getPool().get(jobId);
return (o instanceof Job) ? (Job) o : null;
}
public void removeDelayJob(Long jobId) {
log.info("Remove job: {}", jobId);
getPool().delete(jobId);
}
}DelayBucket (sorted‑set buckets)
@Component
@Slf4j
public class DelayBucket {
@Autowired
private RedisTemplate redisTemplate;
private static final AtomicInteger index = new AtomicInteger(0);
@Value("${thread.size}")
private int bucketsSize;
private final List<String> bucketNames = new ArrayList<>();
@Bean
public List<String> createBuckets() {
for (int i = 0; i < bucketsSize; i++) {
bucketNames.add("bucket" + i);
}
return bucketNames;
}
private String getThisBucketName() {
int thisIndex = index.incrementAndGet();
return bucketNames.get(thisIndex % bucketsSize);
}
private BoundZSetOperations getBucket(String bucketName) {
return redisTemplate.boundZSetOps(bucketName);
}
public void addDelayJob(DelayJob job) {
log.info("Add delay job: {}", JSON.toJSONString(job));
String bucketName = getThisBucketName();
getBucket(bucketName).add(job, job.getDelayDate());
}
public DelayJob getFirstDelayTime(Integer idx) {
BoundZSetOperations bucket = getBucket(bucketNames.get(idx));
Set<ZSetOperations.TypedTuple> set = bucket.rangeWithScores(0, 0);
if (CollectionUtils.isEmpty(set)) return null;
Object value = set.iterator().next().getValue();
return (value instanceof DelayJob) ? (DelayJob) value : null;
}
public void removeDelayTime(Integer idx, DelayJob delayJob) {
getBucket(bucketNames.get(idx)).remove(delayJob);
}
}ReadyQueue (per‑topic list)
@Component
@Slf4j
public class ReadyQueue {
@Autowired
private RedisTemplate redisTemplate;
private static final String NAME = "process.queue";
private String getKey(String topic) { return NAME + topic; }
private BoundListOperations getQueue(String topic) {
return redisTemplate.boundListOps(getKey(topic));
}
public void pushJob(DelayJob delayJob) {
log.info("Push to ready queue: {}", delayJob);
getQueue(delayJob.getTopic()).leftPush(delayJob);
}
public DelayJob popJob(String topic) {
Object o = getQueue(topic).leftPop();
if (o instanceof DelayJob) {
log.info("Pop from ready queue: {}", JSON.toJSONString((DelayJob) o));
return (DelayJob) o;
}
return null;
}
}Timer Processing (bucket poller)
@Component
public class DelayTimer implements ApplicationListener<ContextRefreshedEvent> {
@Autowired private DelayBucket delayBucket;
@Autowired private JobPool jobPool;
@Autowired private ReadyQueue readyQueue;
@Value("${thread.size}") private int length;
@Override
public void onApplicationEvent(ContextRefreshedEvent event) {
ExecutorService executor = new ThreadPoolExecutor(
length, length, 0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
for (int i = 0; i < length; i++) {
executor.execute(new DelayJobHandler(delayBucket, jobPool, readyQueue, i));
}
}
}REST Controller
@RestController
@RequestMapping("delay")
public class DelayController {
@Autowired private JobService jobService;
@PostMapping("add")
public String addDefJob(Job request) {
DelayJob delayJob = jobService.addDefJob(request);
return JSON.toJSONString(delayJob);
}
@GetMapping("pop")
public String getProcessJob(String topic) {
Job process = jobService.getProcessJob(topic);
return JSON.toJSONString(process);
}
@DeleteMapping("finish")
public String finishJob(Long jobId) {
jobService.finishJob(jobId);
return "success";
}
@DeleteMapping("delete")
public String deleteJob(Long jobId) {
jobService.deleteJob(jobId);
return "success";
}
}Testing Procedure
Use an HTTP client (e.g., Postman) to interact with the API:
POST http://localhost:8000/delay/add with a JSON body representing a Job. The service stores the job in the job pool and creates a DelayJob in a bucket. Logs show entries similar to:
{"delayTime":10000,"id":3,"message":"tag:testid:3","retryCount":0,"status":"DELAY","topic":"test","ttrTime":10000}After the configured delay (e.g., 10 s), the timer moves the entry to the ready queue. Verify with logs that a DelayJob appears in the ready queue.
GET http://localhost:8000/delay/pop?topic=test retrieves the ready job, changes its status to RESERVED, and re‑inserts it into a bucket with a new execution timestamp based on TTR.
DELETE http://localhost:8000/delay/finish?jobId=3 or /delete removes the job from the job pool; subsequent bucket scans detect the missing metadata and clean up the stale DelayJob.
Source code repository: https://gitee.com/daifyutils/springboot-samples
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Code Ape Tech Column
Former Ant Group P8 engineer, pure technologist, sharing full‑stack Java, job interview and career advice through a column. Site: java-family.cn
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
