Implementing a Reliable Delay Queue with Redis and Go
This article explains how to build a precise, persistent delay queue using Redis data structures and Lua scripts, demonstrates a Go client library with code examples for sending, consuming, acknowledging, and retrying delayed messages, and discusses the design requirements such as durability, retry mechanisms, and timing accuracy.
When an order stays unpaid or a newly created shop needs to trigger activation messages, simple periodic table scans cause unacceptable latency and heavy database load; therefore a more accurate and reliable delayed‑task solution is required.
The required characteristics of a delay queue, ordered by importance, are:
Persistence – tasks must survive service restarts or crashes.
Retry mechanism – failed or timed‑out tasks should be retried.
Precise timing – delivery should be as close to the scheduled time as possible.
Using Redis, each message is stored in a separate string key (identified by a UUID) and several auxiliary data structures are used:
msgKey : the actual message content.
pendingKey : a sorted set where the score is the delivery timestamp.
readyKey : a list of message IDs ready for consumption.
unAckKey : a sorted set of messages that have been delivered but not yet acknowledged.
retryKey , garbageKey , retryCountKey : structures supporting retries and garbage collection.
Four atomic Lua scripts move messages between these structures:
pending2ReadyScript – moves messages whose delivery time has arrived from pendingKey to readyKey :
-- keys: pendingKey, readyKey
-- argv: currentTime
local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1])
if #msgs == 0 then return end
local args2 = {'LPush', KEYS[2]}
for _,v in ipairs(msgs) do table.insert(args2, v) end
redis.call(unpack(args2))
redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1])ready2UnackScript – pops a message from readyKey (or retryKey ) and adds it to unAckKey with a retry timestamp:
-- keys: readyKey/retryKey, unackKey
-- argv: retryTime
local msg = redis.call('RPop', KEYS[1])
if not msg then return end
redis.call('ZAdd', KEYS[2], ARGV[1], msg)
return msgunack2RetryScript – moves expired un‑acknowledged messages to the retry list, decreasing the remaining retry count and moving exhausted messages to garbageKey :
-- keys: unackKey, retryCountKey, retryKey, garbageKey
-- argv: currentTime
local msgs = redis.call('ZRangeByScore', KEYS[1], '0', ARGV[1])
if #msgs == 0 then return end
local retryCounts = redis.call('HMGet', KEYS[2], unpack(msgs))
for i,v in ipairs(retryCounts) do
local k = msgs[i]
if tonumber(v) > 0 then
redis.call('HIncrBy', KEYS[2], k, -1)
redis.call('LPush', KEYS[3], k)
else
redis.call('HDel', KEYS[2], k)
redis.call('SAdd', KEYS[4], k)
end
end
redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1])ack – permanently removes a successfully processed message:
func (q *DelayQueue) ack(idStr string) error {
ctx := context.Background()
if err := q.redisCli.ZRem(ctx, q.unAckKey, idStr).Err(); err != nil {
return fmt.Errorf("remove from unack failed: %v", err)
}
_ = q.redisCli.Del(ctx, q.genMsgKey(idStr)).Err()
q.redisCli.HDel(ctx, q.retryCountKey, idStr)
return nil
}nack – schedules a retry by resetting the retry timestamp:
func (q *DelayQueue) nack(idStr string) error {
ctx := context.Background()
err := q.redisCli.ZAdd(ctx, q.unAckKey, &redis.Z{Member: idStr, Score: float64(time.Now().Unix())}).Err()
if err != nil { return fmt.Errorf("negative ack failed: %v", err) }
return nil
}The core consume loop runs every second, invoking the scripts in order, pulling messages, calling a user‑provided callback, and performing ack/nack handling, retry processing, and garbage collection:
func (q *DelayQueue) consume() error {
if err := q.pending2Ready(); err != nil { return err }
var fetchCount uint
for {
idStr, err := q.ready2Unack()
if err == redis.Nil { break }
if err != nil { return err }
fetchCount++
ack, err := q.callback(idStr)
if err != nil { return err }
if ack { err = q.ack(idStr) } else { err = q.nack(idStr) }
if err != nil { return err }
if fetchCount >= q.fetchLimit { break }
}
if err := q.unack2Retry(); err != nil { return err }
if err := q.garbageCollect(); err != nil { return err }
// repeat for retry queue …
return nil
}To start using the library, install it with:
go get github.com/hdt3213/delayqueueand run a simple program that registers a callback, sends delayed messages, and starts the consumer:
package main
import (
"github.com/go-redis/redis/v8"
"github.com/hdt3213/delayqueue"
"strconv"
"time"
)
func main() {
redisCli := redis.NewClient(&redis.Options{Addr: "127.0.0.1:6379"})
queue := delayqueue.NewQueue("example-queue", redisCli, func(payload string) bool { return true })
for i := 0; i < 10; i++ {
_ = queue.SendDelayMsg(strconv.Itoa(i), time.Hour, delayqueue.WithRetryCount(3))
}
done := queue.StartConsume()
<-done
}With this design, the delay queue provides at‑least‑once delivery, persistence, automatic retries, and can be deployed across multiple consumer instances without distributed locks.
Top Architect
Top Architect focuses on sharing practical architecture knowledge, covering enterprise, system, website, large‑scale distributed, and high‑availability architectures, plus architecture adjustments using internet technologies. We welcome idea‑driven, sharing‑oriented architects to exchange and learn together.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.