Implementing a Reliable Delay Queue with Redis and Go
This article explains how to build a reliable delayed message queue using Redis data structures and Go, covering business scenarios, design requirements, key Redis keys, Lua scripts for atomic operations, message lifecycle management, and provides complete source code for implementation.
Many business scenarios require actions to be performed after a certain delay, such as automatically closing unpaid orders or sending activation SMS after a new shop is created. A naive solution is to run a periodic table scan, but this either introduces unacceptable timing errors or consumes excessive database resources.
To address these issues, a delay queue must satisfy three core requirements: persistence (tasks survive service restarts), a retry mechanism (failed or timed‑out tasks are retried), and precise timing.
While professional message‑queue products like Pulsar or RocketMQ provide built‑in delayed delivery, this article demonstrates how to implement a similar mechanism using only Redis, which is already widely deployed.
The implementation stores each message in a separate Redis string key (identified by a UUID) and uses a sorted set where the member is the message ID and the score is the delivery timestamp. The ZRANGEBYSCORE command retrieves messages whose delivery time has arrived, and LPUSH moves them to a ready list for consumption.
The queue relies on several Redis keys:
msgKey : a string key that holds the full message payload, identified by a UUID.
pendingKey : a sorted set where members are message IDs and scores are the scheduled delivery timestamps.
readyKey : a list that stores IDs of messages ready to be consumed.
unAckKey : a sorted set tracking messages that have been delivered but not yet acknowledged, with the score representing the retry time.
retryKey : a list of message IDs whose retry time has been reached.
garbageKey : a set used to temporarily hold messages that have exhausted their retry attempts.
retryCountKey : a hash mapping message IDs to the remaining retry count.
Atomic state transitions are performed by a series of Lua scripts:
pending2ReadyScript
-- keys: pendingKey, readyKey
-- argv: currentTime
local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1])
if (#msgs == 0) then return end
local args2 = {'LPush', KEYS[2]}
for _,v in ipairs(msgs) do
table.insert(args2, v)
end
redis.call(unpack(args2))
redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1])ready2UnackScript
-- keys: readyKey/retryKey, unackKey
-- argv: retryTime
local msg = redis.call('RPop', KEYS[1])
if (not msg) then return end
redis.call('ZAdd', KEYS[2], ARGV[1], msg)
return msgunack2RetryScript
-- keys: unackKey, retryCountKey, retryKey, garbageKey
-- argv: currentTime
local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1])
if (#msgs == 0) then return end
local retryCounts = redis.call('HMGet', KEYS[2], unpack(msgs))
for i,v in ipairs(retryCounts) do
local k = msgs[i]
if tonumber(v) > 0 then
redis.call('HIncrBy', KEYS[2], k, -1)
redis.call('LPush', KEYS[3], k)
else
redis.call('HDel', KEYS[2], k)
redis.call('SAdd', KEYS[4], k)
end
end
redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1])Message acknowledgment simply removes the entry from the unack set and deletes the corresponding msgKey :
func (q *DelayQueue) ack(idStr string) error {
ctx := context.Background()
err := q.redisCli.ZRem(ctx, q.unAckKey, idStr).Err()
if err != nil {
return fmt.Errorf("remove from unack failed: %v", err)
}
_ = q.redisCli.Del(ctx, q.genMsgKey(idStr)).Err()
q.redisCli.HDel(ctx, q.retryCountKey, idStr)
return nil
}A negative acknowledgment ( nack ) updates the retry time to the current moment, causing the unack2RetryScript to move the message back to the retry queue immediately:
func (q *DelayQueue) nack(idStr string) error {
ctx := context.Background()
err := q.redisCli.ZAdd(ctx, q.unAckKey, &redis.Z{Member: idStr, Score: float64(time.Now().Unix())}).Err()
if err != nil {
return fmt.Errorf("negative ack failed: %v", err)
}
return nil
}The core consumption loop runs once per second, invoking the Lua scripts to move messages between the pending, ready, unack, and retry sets, calling a user‑provided callback for each message, and handling acknowledgments:
func (q *DelayQueue) consume() error {
// move due messages to ready
if err := q.pending2Ready(); err != nil { return err }
// fetch from ready and process
var fetchCount uint
for {
idStr, err := q.ready2Unack()
if err == redis.Nil { break }
if err != nil { return err }
fetchCount++
ack, err := q.callback(idStr)
if err != nil { return err }
if ack { err = q.ack(idStr) } else { err = q.nack(idStr) }
if err != nil { return err }
if fetchCount >= q.fetchLimit { break }
}
// move timed‑out messages to retry
if err := q.unack2Retry(); err != nil { return err }
// clean up messages that exceeded retry limits
if err := q.garbageCollect(); err != nil { return err }
// process retry queue similarly
// ... (omitted for brevity)
return nil
}Garbage collection removes messages that have exhausted their retry count, deleting both the message payload and the entry in the garbageKey set:
func (q *DelayQueue) garbageCollect() error {
ctx := context.Background()
msgIds, err := q.redisCli.SMembers(ctx, q.garbageKey).Result()
if err != nil && err != redis.Nil { return fmt.Errorf("smembers failed: %v", err) }
if len(msgIds) == 0 { return nil }
msgKeys := make([]string, 0, len(msgIds))
for _, idStr := range msgIds {
msgKeys = append(msgKeys, q.genMsgKey(idStr))
}
if err = q.redisCli.Del(ctx, msgKeys...).Err(); err != nil && err != redis.Nil { return fmt.Errorf("del msgs failed: %v", err) }
if err = q.redisCli.SRem(ctx, q.garbageKey, msgIds...).Err(); err != nil && err != redis.Nil { return fmt.Errorf("remove from garbage key failed: %v", err) }
return nil
}Finally, a minimal Go program shows how to install the library, create a queue, send delayed messages, and start consumption:
package main
import (
"github.com/go-redis/redis/v8"
"github.com/hdt3213/delayqueue"
"strconv"
"time"
)
func main() {
redisCli := redis.NewClient(&redis.Options{Addr: "127.0.0.1:6379"})
queue := delayqueue.NewQueue("example-queue", redisCli, func(payload string) bool {
// process message, return true on success
return true
})
for i := 0; i < 10; i++ {
err := queue.SendDelayMsg(strconv.Itoa(i), time.Hour, delayqueue.WithRetryCount(3))
if err != nil { panic(err) }
}
done := queue.StartConsume()
<-done
}With these components, a simple yet reliable delayed queue can be built entirely on Redis without additional middleware, providing persistence, retry semantics, and at‑least‑once delivery guarantees.
Selected Java Interview Questions
A professional Java tech channel sharing common knowledge to help developers fill gaps. Follow us!
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.