Implementing Message Queues with Redis: Lists, Zsets, Pub/Sub, and Streams
This article explains how Redis can be used to build lightweight, high‑performance message queues by leveraging lists for simple queues, sorted sets for delayed delivery, Pub/Sub for multicast, and the Stream data structure for durable, scalable messaging, while also covering practical PHP implementations, acknowledgment, retry, graceful restart, and rate‑limiting techniques.
To decouple systems, smooth traffic spikes, and enable asynchronous processing, developers often turn to message queues; Redis offers a lightweight alternative for internal use cases where absolute reliability is not critical.
Data structures and commands : Redis supports several structures for queuing. Lists act as FIFO queues using LPUSH/RPUSH to enqueue and LPOP/RPOP (or their blocking variants BLPOP/BRPOP ) to dequeue. Example PHP consumer:
<?php
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
while ($message = $redis->rpop('queue_list_test')) {
var_dump($message);
// process then sleep 1s
sleep(1);
}Blocking dequeue can be done with BRPOP to avoid busy‑waiting:
<?php
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
while ($message = $redis->brpop('queue_list_test', 3600)) {
var_dump($message);
}Because the default PHP socket timeout (60 s) may abort the script, it is common to disable the timeout with ini_set('default_socket_timeout', -1) and use an infinite block time:
<?php
ini_set('default_socket_timeout', -1);
$redis = new Redis();
$redis->connect('127.0.0.1', 6379);
while ($message = $redis->brpop('queue_list_test', 0)) {
var_dump($message);
}Lists alone are unsafe for ack/retry; Redis provides RPOPLPUSH / LPOPLPUSH (and their blocking versions) to move a message to a backup list while processing, enabling manual acknowledgment.
Sorted sets (ZSET) enable delayed queues: the message is added with ZADD key score member where score is the future timestamp. Consumers retrieve ready messages with ZRANGEBYSCORE key -inf now .
Pub/Sub offers multicast but lacks persistence; if no subscriber is online, the message is lost, and a Redis restart discards all pending messages.
Streams (Redis 5.0+) provide a durable, Kafka‑like queue. Each entry has an ID timestamp-sequence . Streams support consumer groups, independent cursors, pending entry lists for at‑least‑once delivery, and can be used for both real‑time and delayed tasks.
Traditional Redis queue design combines a list for immediate jobs and a ZSET for delayed jobs. A helper method first migrates expired ZSET entries to the list (using a Lua script for atomicity) and then pops a job. Example Lua migration script:
/**
* Get the Lua script to migrate expired jobs back onto the queue.
* KEYS[1] - source ZSET (e.g., queues:foo:reserved)
* KEYS[2] - destination LIST (e.g., queues:foo)
* ARGV[1] - current UNIX timestamp
*/
local val = redis.call('zrangebyscore', KEYS[1], '-inf', ARGV[1])
if (next(val) ~= nil) then
redis.call('zremrangebyrank', KEYS[1], 0, #val - 1)
for i = 1, #val, 100 do
redis.call('rpush', KEYS[2], unpack(val, i, math.min(i+99, #val)))
end
end
return valProducers simply use LPUSH/RPUSH for immediate jobs and ZADD for delayed jobs, both atomic operations.
Acknowledgment and retry : after a consumer fetches a job, it is placed in an ACK ZSET with a retry timestamp. If the job is not acknowledged before the timeout, a background process moves it back to the ready queue. This guarantees at‑least‑once processing.
Graceful restart : Consumers listen for termination signals (e.g., SIGTERM, SIGUSR2) and set a flag to stop after completing the current job. PHP example using pcntl_async_signals and pcntl_signal :
class Worker {
public $shouldQuit = false;
public function daemon(string $queueName = '') {
$this->listenForSignals();
while (! $this->shouldQuit) {
// business logic
}
}
protected function listenForSignals() {
if ($this->supportsAsyncSignals()) {
pcntl_async_signals(true);
pcntl_signal(SIGTERM, function() { $this->shouldQuit = true; });
pcntl_signal(SIGUSR2, function() { $this->shouldQuit = true; });
pcntl_signal(SIGCONT, function() { $this->shouldQuit = true; });
}
}
}Failure handling : Consumers catch exceptions, optionally invoke a user‑defined failure handler, and log the error for later inspection.
Rate limiting : A leaky‑bucket algorithm can be implemented with a Lua script that increments a counter key, sets an expiry, and rejects requests exceeding a configured limit. Example Lua script:
-- Limit the speed of pulling messages from the queue
local times = redis.call('incr', KEYS[1])
if times == 1 then
redis.call('expire', KEYS[1], ARGV[1])
end
if times > tonumber(ARGV[2]) then
return 0
end
return 1By adjusting ARGV[1] (time window) and ARGV[2] (bucket capacity), the consumer’s throughput can be throttled.
In summary, building a robust Redis‑based message queue involves choosing the right data structure (list, ZSET, Stream), handling acknowledgment and retries, supporting graceful restarts, and optionally applying rate‑limiting, all while keeping the implementation lightweight and efficient.
Xueersi Online School Tech Team
The Xueersi Online School Tech Team, dedicated to innovating and promoting internet education technology.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.