1. 程式人生 > >用 Redis 實現 PHP 的簡單訊息佇列

用 Redis 實現 PHP 的簡單訊息佇列

參考:PHP高階程式設計之訊息佇列

訊息佇列就是在訊息的傳輸過程中,可以儲存訊息的容器。

常見用途:

  • 儲存轉發:非同步處理耗時的任務
  • 分散式事務:多個消費者消費同一個訊息佇列
  • 應對高併發:通過訊息佇列儲存任務,慢慢處理
  • 釋出訂閱:實現解耦

PHP 可以基於 Redis 的 List 資料型別實現簡單的訊息佇列,可以參考 php-resque。當然也可以使用更強大的 RabbitMQ。

實現方式

PHP 守護程序

PHP 業務程式碼:

<?php

class MyDaemon
{
    public $procNum
= 8; // 程序總數 // 啟動程序 public function run() { for ($i = 0; $i < $this->procNum; $i++) { $nPID = \pcntl_fork();//建立子程序 if ($nPID == 0) { //子程序 \Org\Util\MsgQ::init(); $this->work(); exit(0); }
} // 等待子程序執行完畢,避免殭屍程序 $n = 0; while ($n < $this->procNum) { $nStatus = -1; $nPID = \pcntl_wait($nStatus); if ($nPID > 0) { ++$n; } } } //業務程式碼 public function work() { $MsgData
= ""; while (true) { usleep(10000); // 10 ms 執行一次 $ret = MsgQ::BlockSubsribe("MyMsgName", $MsgData); // 業務程式碼 } }

訊息佇列(基於Redis)庫程式碼:

<?php

namespace Org\Util;

class MsgQ {
	public static $errCode = 0;
	public static $errMsg = "";
	public static $redis;
	private static $preFix = "MsgQ.";
	private static $timeOut = 10;

	private static $redisHost = '';
	private static $redisPort = '';
	private static $redisAuth = '';

	function __construct()
	{
		self::$redis = new \Redis();
		$ret = self::$redis->connect($redisHost,$redisPort,self::$timeOut);
		$ret = self::$redis->auth($redisAuth);
	}

	function __destruct()
	{
		if(self::$redis) {
			self::$redis->close();
		}
	}

	public static function init($timeOut = 0){
		if (!self::$redis) {
			self::$redis = new \Redis();
			if(!empty($timeOut)){
				self::$timeOut = $timeOut;
				ini_set('default_socket_timeout', 259200);
				$ret = self::$redis->connect($redisHost,$redisPort,self::$timeOut);
				$ret = self::$redis->auth($redisAuth);
			}
			else{
				self::$timeOut = 0;
				ini_set('default_socket_timeout', 259200);
				$ret = self::$redis->connect($redisHost,$redisPort,259200);
				$ret = self::$redis->auth($redisAuth);
			}
		}
	}

	public static function Publish($pubKey,$data){
		if(!self::PingAndConnect()){
			return false;
		}
		$ret = self::$redis->rPush(self::$preFix.$pubKey,$data);
		if ($ret === false){
			return false;
		}
		return true;
	}

	public static function GetListLen($pubKey,&$len){
		if(!self::PingAndConnect()){
			return false;
		}
		$len = 0;
		$ret = self::$redis->lLen(self::$preFix.$pubKey);
		if ($ret === false){
			return false;
		}
		$len = $ret;
		return true;
	}

	public static function Subsribe($pubKey,&$data){
		if(!self::PingAndConnect()){
			return false;
		}
		$ret = self::$redis->lPop(self::$preFix.$pubKey);
		if ($ret === false){
			return false;
		}
		$data = $ret;
		return true;
	}

	public static function BlockSubsribe($pubKey,&$data){
		if(!self::PingAndConnect()){
			return false;
		}
		try{
			$ret = self::$redis->blPop(array(self::$preFix.$pubKey),0);
		}
		catch(Exception $e){
			if(!self::PingAndConnect(true)){
				return false;
			}
			return false;
		}
		if ($ret === false){
			return false;
		}
		if ($ret === array()){
			return false;
		}
		$data = $ret[1];
		return true;
	}

	private static function PingAndConnect($isException = false){
		if (!self::$redis) {
			self::$redis = new \Redis();
			if (self::$timeOut == 0){
			     ini_set('default_socket_timeout', 259200);
				$ret = self::$redis->connect($redisHost,$redisPort,259200);
			}
			else{
				$ret = self::$redis->connect($redisHost,$redisPort,self::$timeOut);
			}
			if ($ret === false){
				return false;
			}
			$ret = self::$redis->auth($redisAuth);
			if ($ret === false){
				return false;
			}
		}
		else{
			if (self::$timeOut == 0 && !$isException){
				return true;
			}
			$ret = self::$redis->ping();
			if ($ret === false){
				if (self::$timeOut == 0){
				    ini_set('default_socket_timeout', 259200);
					$ret = self::$redis->connect($redisHost,$redisPort,259200);
				}
				else{
					$ret = self::$redis->connect($redisHost,$redisPort,self::$timeOut);
				}
				if ($ret === false){
					return false;
				}
				$ret = self::$redis->auth($redisAuth);
				if ($ret === false){
					return false;
				}
			}
		}
		return true;
	}
}

重啟守護程序的 shell 指令碼 restartprocess.sh

#!/bin/sh
if [ ! -n "$1" ]; then
	echo "input proc name"
	exit
else
	procname=$1
fi

pids=`(ps -ef | grep "$procname" | grep -v "grep" | grep -v $0) | awk '{print $2}'`

for pid in ${pids[*]}
do
	kill -9 $pid
done
cd /path/to/your/project/
setsid $procname &

啟動守護程序的命令:

restartprocess.sh "php index.php /path/to/your/MyDaemon/func/run"

Linux 定時任務

可以設定一分鐘或一秒鐘執行一次 PHP 指令碼。因為每次處理訊息的時間不固定,可能導致訊息積壓或伺服器負載過大。

手工執行指令碼

用於處理偶然需求,簡單。