Concepts, Principles, and Implementation Methods of Message Queues
This article explains the fundamentals of message queues, their typical use cases such as decoupling and traffic shaping, compares implementation options like MySQL, Redis, and dedicated message systems, and provides detailed PHP code examples for flash‑sale and RabbitMQ integrations.
Message queues are middleware that use a queue structure, allowing messages to be stored without immediate consumption and processed sequentially by consumers or subscribers.
The basic workflow is illustrated with a diagram (image).
Typical application scenarios include redundancy, system decoupling, traffic shaping (peak‑shaving), and asynchronous communication.
Implementation options discussed are MySQL (reliable but slow), Redis (fast but slower for large messages), and dedicated message‑queue systems (reliable and professional).
Trigger mechanisms for processing messages include infinite loops (no fault recovery), scheduled tasks (balanced load but limited capacity), and daemon processes.
An example of decoupling an order and delivery system shows using scheduled tasks to update order status and batch updates.
For traffic shaping in a flash‑sale scenario, Redis list commands such as LPUSH, RPUSH, LPOP, RPOP, LTRIM, LLEN, LSET, LINDEX, and LRANGE are employed.
Code flow for the spike program writes requests to Redis, checks the list length, and pushes the request if the length is below a threshold:
// Spike.php (flash‑sale program)
if (Redis::llen('lottery') < 10) {
// success
Redis::lpush('lottery', $uid . '%' . microtime());
} else {
// failure
}The warehousing program continuously reads from the Redis list, processes each entry, and inserts it into the database, handling empty reads with a sleep interval:
// Warehousing.php (consumer)
while (true) {
$user = Redis::rpop('lottery');
if (!$user || $user == 'nil') {
sleep(2);
continue;
}
$user_arr = explode('%', $user);
$insert_user = [
'uid' => $user_arr[0],
'time' => $user_arr[1]
];
$res = DB::table('lottery_queue')->insert($insert_user);
if (!$res) {
Redis::lpush('lottery', $user);
}
}To prevent overselling under high concurrency, the article suggests using file locks or Redis distributed locks, loading goods into a Redis list, and popping items for each purchase.
// Load goods into Redis list
$goods_id = 2;
$sql = "select id,num from goods where id = $goods_id";
$res = DB::select($sql);
if (!empty($res)) {
Redis::del('lottery_goods' . $goods_id);
for ($i = 0; $i < $res['num']; $i++) {
Redis::lpush('lottery_goods' . $goods_id, $i);
}
LOG::info('Goods loaded into queue, count: ' . Redis::llen('lottery_goods' . $goods_id));
} // Flash‑sale processing
$count = Redis::rpop('lottery_goods' . $goods_id);
if (!$count) {
// sold out
...
}
$user_list = 'user_goods_id_' . $goods_id;
$user_status = Redis::sismember($user_list, $user_id);
if ($user_status) {
// already purchased
...
}
Redis::sadd($user_list, $uid);
$msg = 'User: ' . $uid . ' order: ' . $count;
Log::info($msg);
$sql = "update goods set num = num - 1 where id = $goods_id and num > 0;"; // prevent oversell
DB::update($sql);The article also introduces RabbitMQ architecture (producer, exchange, consumer) and provides simple PHP examples using the PhpAmqpLib library.
// Send.php (producer)
require __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('user_email', false, false, false, false);
$msg = new AMQPMessage('send email');
$channel->basic_publish($msg, '', 'user_email');
echo "[x] send email";
$channel->close();
$connection->close(); // Receive.php (consumer)
require __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('user_email', false, false, false, false);
$callback = function($msg) {
echo 'Received ' . $msg->body . "
";
};
$channel->basic_consume('user_email', '', false, true, false, false, $callback);
while ($channel->is_open()) {
$channel->wait();
}Overall, the article provides a comprehensive overview of message‑queue concepts, practical use cases, and concrete code implementations for building reliable asynchronous systems in backend development.
Signed-in readers can open the original source through BestHub's protected redirect.
This article has been distilled and summarized from source material, then republished for learning and reference. If you believe it infringes your rights, please contactand we will review it promptly.
php Courses
php中文网's platform for the latest courses and technical articles, helping PHP learners advance quickly.
How this landed with the community
Was this worth your time?
0 Comments
Thoughtful readers leave field notes, pushback, and hard-won operational detail here.
