How to Build a High‑Performance Lightweight PHP Queue Using Redis Streams

This guide introduces a lightweight, high‑performance PHP queue library built on Redis 5.0+ Streams, detailing its features such as high concurrency, delayed tasks, multi‑producer/consumer support, ACK handling, message replay, audit mode, and provides step‑by‑step installation, configuration, and usage examples with code snippets.

Open Source Tech Hub
Open Source Tech Hub
Open Source Tech Hub
How to Build a High‑Performance Lightweight PHP Queue Using Redis Streams

Key Features

High Performance – Built on Redis 5.0+ Streams, supports high concurrency.

Delayed Tasks – Uses Sorted Set to schedule delays from seconds up to years.

Multi‑Producer/Consumer – Allows multiple producers and consumers to work simultaneously.

Message Persistence – Reliable storage of messages.

ACK Confirmation – Complete acknowledgment mechanism.

Smart Retry – Configurable retry count and delay strategy.

Message Replay – Ability to reprocess historical messages.

Message Audit – Read‑only mode to audit all messages.

PSR‑3 Logging – Implements the standard PSR‑3 logger interface.

Singleton Pattern – Prevents duplicate instance creation.

Environment Requirements

PHP >= 7.4

Redis >= 5.0

Composer >= 2.0

ext‑redis extension

Quick Installation

composer require tinywan/redis-stream

Quick Start

Basic Usage

Create a queue instance:

<?php
require_once __DIR__ . '/vendor/autoload.php';
use Tinywan\RedisStream\RedisStreamQueue;

$queue = RedisStreamQueue::getInstance();

Send a message:

<?php
$messageId = $queue->send('Hello, Redis Stream!');
echo "Message ID: $messageId
";

Consume messages:

<?php
// Consume messages
$message = $queue->consume(function ($message) {
    echo "Processing: " . $message['message'] . "
";
    return true; // Acknowledge
});

Using Producer and Consumer Classes

use Tinywan\RedisStream\RedisStreamQueue;
use Tinywan\RedisStream\Producer;
use Tinywan\RedisStream\Consumer;

$queue = RedisStreamQueue::getInstance();

// Producer
$producer = new Producer($queue);
$messageId = $producer->send('Task data', ['task_type' => 'email'], 10); // delay 10 seconds

// Consumer
$consumer = new Consumer($queue);
$consumer->run(function ($message) {
    $task = json_decode($message['message'], true);
    return handleTask($task['type'], $task['data']);
});

Main Functions

Delayed Messages

Supports delays ranging from seconds to years:

// Immediate execution
$queue->send('Immediate message');

// Delayed execution (30 seconds later)
$queue->send('Delayed message', [], 30);

// Scheduled execution (1 hour later)
$timestamp = time() + 3600;
$queue->send('Scheduled message', [], $timestamp);

// Year‑level delay (next day)
$queue->send('Next day message', [], 86400);

Message Replay & Audit

Reprocess historical messages and audit in read‑only mode:

// Replay up to 10 messages, auto‑acknowledge
$count = $queue->replayMessages(function ($message) {
    echo "Replaying: " . $message['message'] . "
";
    return true;
}, 10);

// Audit messages (does not affect state)
$count = $queue->auditMessages(function ($message) {
    echo "Auditing: " . $message['message'] . "
";
    return true;
}, 20);

Position‑Based Consumption

Flexible control over consumption position:

// Read all messages from the beginning
$message = $queue->consume(null, '0-0');

// Read the latest messages
$message = $queue->consume(null, '$');

// Start from a specific message ID
$message = $queue->consumeFrom('1758943564547-0');

Configuration

Redis Configuration

$redisConfig = [
    'host' => '127.0.0.1',
    'port' => 6379,
    'password' => null,
    'database' => 0,
    'timeout' => 5,
];

Queue Configuration

$queueConfig = [
    'stream_name'       => 'redis_stream_queue',
    'consumer_group'   => 'redis_stream_group',
    'consumer_name'    => 'consumer_' . getmypid(),
    'block_timeout'     => 5000,
    'retry_attempts'    => 3,
    'retry_delay'       => 1000,
    'delayed_queue_suffix' => '_delayed',
    'scheduler_interval'   => 1,
];

Production Deployment

Supervisor Configuration

[program:redis-stream-consumer]
command=php /path/to/your/project/examples/consumer.php
directory=/path/to/your/project
autostart=true
autorestart=true
user=www-data
redirect_stderr=true
stdout_logfile=/var/log/supervisor/redis-stream-consumer.log

Docker Deployment

FROM php:8.1-cli
RUN pecl install redis && docker-php-ext-enable redis
COPY --from=composer:latest /usr/bin/composer /usr/bin/composer
COPY . /app
WORKDIR /app
RUN composer install --no-dev --optimize-autoloader
CMD ["php", "examples/consumer.php"]

Advanced Features

Singleton Management

// Get status of all queue instances
$status = RedisStreamQueue::getInstancesStatus();

// Get connection‑pool status
$poolStatus = $queue->getConnectionPoolStatus();

Delayed Queue Management

// Retrieve delayed‑queue statistics
$stats = $queue->getDelayedQueueStats();

// Manually run the scheduler for a batch of 100 messages
$processedCount = $queue->runDelayedScheduler(100);

// Start the scheduler for 60 seconds
$queue->startDelayedScheduler(60);

Queue Monitoring

// Get overall queue status
$status = [
    'stream_length' => $queue->getStreamLength(),
    'pending_count' => $queue->getPendingCount(),
    'delayed_count' => $queue->getDelayedQueueLength(),
];

Running Examples

# Basic example
php examples/quickstart.php

# Producer example
php examples/producer.php

# Consumer example
php examples/consumer.php

# Run tests
./vendor/bin/phpunit
Original Source

Signed-in readers can open the original source through BestHub's protected redirect.

Sign in to view source
Republication Notice

This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactadmin@besthub.devand we will review it promptly.

BackendredisMessage QueuePHPQueueRedis Streams
Open Source Tech Hub
Written by

Open Source Tech Hub

Sharing cutting-edge internet technologies and practical AI resources.

0 followers
Reader feedback

How this landed with the community

Sign in to like

Rate this article

Was this worth your time?

Sign in to rate
Discussion

0 Comments

Thoughtful readers leave field notes, pushback, and hard-won operational detail here.