How to Build a High‑Performance Lightweight PHP Queue Using Redis Streams
This guide introduces a lightweight, high‑performance PHP queue library built on Redis 5.0+ Streams, detailing its features such as high concurrency, delayed tasks, multi‑producer/consumer support, ACK handling, message replay, audit mode, and provides step‑by‑step installation, configuration, and usage examples with code snippets.
Key Features
High Performance – Built on Redis 5.0+ Streams, supports high concurrency.
Delayed Tasks – Uses Sorted Set to schedule delays from seconds up to years.
Multi‑Producer/Consumer – Allows multiple producers and consumers to work simultaneously.
Message Persistence – Reliable storage of messages.
ACK Confirmation – Complete acknowledgment mechanism.
Smart Retry – Configurable retry count and delay strategy.
Message Replay – Ability to reprocess historical messages.
Message Audit – Read‑only mode to audit all messages.
PSR‑3 Logging – Implements the standard PSR‑3 logger interface.
Singleton Pattern – Prevents duplicate instance creation.
Environment Requirements
PHP >= 7.4
Redis >= 5.0
Composer >= 2.0
ext‑redis extension
Quick Installation
composer require tinywan/redis-streamQuick Start
Basic Usage
Create a queue instance:
<?php
require_once __DIR__ . '/vendor/autoload.php';
use Tinywan\RedisStream\RedisStreamQueue;
$queue = RedisStreamQueue::getInstance();Send a message:
<?php
$messageId = $queue->send('Hello, Redis Stream!');
echo "Message ID: $messageId
";Consume messages:
<?php
// Consume messages
$message = $queue->consume(function ($message) {
echo "Processing: " . $message['message'] . "
";
return true; // Acknowledge
});Using Producer and Consumer Classes
use Tinywan\RedisStream\RedisStreamQueue;
use Tinywan\RedisStream\Producer;
use Tinywan\RedisStream\Consumer;
$queue = RedisStreamQueue::getInstance();
// Producer
$producer = new Producer($queue);
$messageId = $producer->send('Task data', ['task_type' => 'email'], 10); // delay 10 seconds
// Consumer
$consumer = new Consumer($queue);
$consumer->run(function ($message) {
$task = json_decode($message['message'], true);
return handleTask($task['type'], $task['data']);
});Main Functions
Delayed Messages
Supports delays ranging from seconds to years:
// Immediate execution
$queue->send('Immediate message');
// Delayed execution (30 seconds later)
$queue->send('Delayed message', [], 30);
// Scheduled execution (1 hour later)
$timestamp = time() + 3600;
$queue->send('Scheduled message', [], $timestamp);
// Year‑level delay (next day)
$queue->send('Next day message', [], 86400);Message Replay & Audit
Reprocess historical messages and audit in read‑only mode:
// Replay up to 10 messages, auto‑acknowledge
$count = $queue->replayMessages(function ($message) {
echo "Replaying: " . $message['message'] . "
";
return true;
}, 10);
// Audit messages (does not affect state)
$count = $queue->auditMessages(function ($message) {
echo "Auditing: " . $message['message'] . "
";
return true;
}, 20);Position‑Based Consumption
Flexible control over consumption position:
// Read all messages from the beginning
$message = $queue->consume(null, '0-0');
// Read the latest messages
$message = $queue->consume(null, '$');
// Start from a specific message ID
$message = $queue->consumeFrom('1758943564547-0');Configuration
Redis Configuration
$redisConfig = [
'host' => '127.0.0.1',
'port' => 6379,
'password' => null,
'database' => 0,
'timeout' => 5,
];Queue Configuration
$queueConfig = [
'stream_name' => 'redis_stream_queue',
'consumer_group' => 'redis_stream_group',
'consumer_name' => 'consumer_' . getmypid(),
'block_timeout' => 5000,
'retry_attempts' => 3,
'retry_delay' => 1000,
'delayed_queue_suffix' => '_delayed',
'scheduler_interval' => 1,
];Production Deployment
Supervisor Configuration
[program:redis-stream-consumer]
command=php /path/to/your/project/examples/consumer.php
directory=/path/to/your/project
autostart=true
autorestart=true
user=www-data
redirect_stderr=true
stdout_logfile=/var/log/supervisor/redis-stream-consumer.logDocker Deployment
FROM php:8.1-cli
RUN pecl install redis && docker-php-ext-enable redis
COPY --from=composer:latest /usr/bin/composer /usr/bin/composer
COPY . /app
WORKDIR /app
RUN composer install --no-dev --optimize-autoloader
CMD ["php", "examples/consumer.php"]Advanced Features
Singleton Management
// Get status of all queue instances
$status = RedisStreamQueue::getInstancesStatus();
// Get connection‑pool status
$poolStatus = $queue->getConnectionPoolStatus();Delayed Queue Management
// Retrieve delayed‑queue statistics
$stats = $queue->getDelayedQueueStats();
// Manually run the scheduler for a batch of 100 messages
$processedCount = $queue->runDelayedScheduler(100);
// Start the scheduler for 60 seconds
$queue->startDelayedScheduler(60);Queue Monitoring
// Get overall queue status
$status = [
'stream_length' => $queue->getStreamLength(),
'pending_count' => $queue->getPendingCount(),
'delayed_count' => $queue->getDelayedQueueLength(),
];Running Examples
# Basic example
php examples/quickstart.php
# Producer example
php examples/producer.php
# Consumer example
php examples/consumer.php
# Run tests
./vendor/bin/phpunitSigned-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
Open Source Tech Hub
Sharing cutting-edge internet technologies and practical AI resources.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
