Implementing a Distributed Delayed Message Queue in Go Using Redis
Implementing a distributed delayed message queue in Go with Redis uses a List for ready jobs and a Sorted Set for delayed jobs, periodically migrating expired entries atomically via Lua or transactions, providing O(1) enqueue/dequeue performance, retry support, and scalable concurrency for multi‑instance systems.
Background: many business scenarios require delayed execution, such as automatic actions after expiration, polling for unfinished tasks, callback retries, etc. Local timers are problematic in distributed multi‑instance systems, leading to duplicate processing and lack of centralized control.
Solution: use a distributed delayed queue based on Redis. The implementation uses Redis List as the ready queue and Sorted Set to store delayed jobs with timestamps as scores.
Implementation principles:
1. Redis List provides FIFO via LPOP/RPUSH (O(1)).
2. Delayed messages are stored in a Sorted Set; the score is the execution timestamp.
3. A periodic poll reads expired members from the Sorted Set (ZRANGEBYSCORE) and moves them to the List.
Atomicity of message migration:
Three steps are required: query expired messages, remove them from the Sorted Set, and push them to the List. Two Redis mechanisms ensure atomicity:
Transaction : MULTI/EXEC groups commands into an atomic block.
Lua script : a single script runs atomically, guaranteeing that no other commands interleave.
Code example of the Lua script used for migration:
local val = redis.call('zrangebyscore', KEYS[1], '-inf', ARGV[1], 'limit', 0, 20)
if(next(val) ~= nil) then
redis.call('zremrangebyrank', KEYS[1], 0, #val - 1)
redis.call('rpush', KEYS[2], unpack(val, 1, #val))
end
return #valPerformance testing with memtier_benchmark shows O(1) latency for List operations and logarithmic complexity for Sorted Set operations (ZADD, ZREMRANGEBYRANK).
Message protocol definition (Go struct):
type Job struct {
Id string `msgpack:"1"` // task id
Topic string `msgpack:"2"` // message name
Delay int64 `msgpack:"3"` // delay time
Playload []byte `msgpack:"4"` // payload
Timestamp int64 `msgpack:"5"` // enqueue time
}Queue client wraps RPUSH and ZADD, providing Dispatch (immediate) and DispatchDelaySecond (delayed) methods.
func (c *QueueClient) Dispatch(topic string, payload []byte) error {
return c.queue.Push(&Job{Topic: topic, Playload: payload, Delay: 0, Timestamp: time.Now().Unix()})
}
func (c *QueueClient) DispatchDelaySecond(topic string, payload []byte, delaySec int) error {
return c.queue.DelayJob(&Job{Topic: topic, Playload: payload, Delay: int64(delaySec), Timestamp: time.Now().Unix()})
}The consumer runs a QueueServer with a set of TopicWorker goroutines, each processing a specific topic. Concurrency is controlled by a weighted semaphore. The server repeatedly calls GetReadyJob, which internally triggers migrateExpiredJobs, then processes jobs via the JobHandler interface.
type JobHandler interface {
Topic() string
Execute(context.Context, []byte) error
}Extensions include job retry and timeout mechanisms by adding TryCount and TryTimeOut fields to the Job struct.
Conclusion: The Go‑Redis implementation demonstrates a practical, high‑performance distributed delayed queue. Core ideas—using List for ready jobs, Sorted Set for delayed jobs, and atomic migration via Lua—are applicable to many languages and platforms.
Tencent Cloud Developer
Official Tencent Cloud community account that brings together developers, shares practical tech insights, and fosters an influential tech exchange community.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.