1. 程式人生 > >PHP操作RabbitMQ的類 exchange、queue、route kye、bind

PHP操作RabbitMQ的類 exchange、queue、route kye、bind

null class 應用 取數 span date 特殊 json 功能

RabbitMQ是常見的消息中間件。也許是還是不夠了解的緣故,感覺功能還好吧。

講到隊列,大家腦子裏第一印象是下邊這樣的。

P生產者推送消息-->隊列-->C消費者取出消息

結構很簡單,但是RabbitMQ應該是為了豐富的功能吧,把“隊列”拆分了。

分成了:exchange(交換機)和queue(隊列)兩個部分

同時說明:

生產者推送消息只推到exchange,不知道會進入哪個queue。

exchange通過一個route key與queue綁定,這時才會知道消息具體落到了哪個queue裏。

而消費則獲取消息,是直接從隊列裏取的。

大概就是以上這個意思,問題是還有兩個特殊的說明:

1.消費者是無法訂閱或者獲取不存在的MessageQueue中信息。

2.消息被Exchange接受以後,如果沒有匹配的Queue,則會被丟棄。

簡單的理解就是,如果exchange和queue不綁定,生產者推送的消息到exchange會直接丟棄(丟失),同時consume也無法完成訂閱。

所以,這裏就有一個問題,無論是推送消息進隊列,還是訂閱消息消費,必須先定義好exchange和queue並通過route key綁定一起。

那麽到底是推送消息的時候定義並綁定呢?還是訂閱的時候定義並綁定呢?

根據那兩個特殊的說明理解,無論是誰定義綁定,都有可能會出現問題。

所以,最終就是推送和消費之前,都嘗試定義exchange和queue,並完成綁定。

推送消息,相對簡單,就一個publish指定exchange和route key就完成了。

消費消息,相對的復雜一點,有兩種方式:

1、推送(push)訂閱方式,使用consume方法訂閱隊列,只要隊列有消息就消費

2、拉取(poll)主動拉取,使用get方法,主動去從隊列一次拉取一條消息

這兩種情況,都有各自的應用場景,可以根據需要自行選擇。

額外提醒一點:盡量不要使用循環的方式調用get方法消費隊列,尤其是處理的消息很多的情況。

如果大量的消費隊列,建議直接使用consume方法。

還有一個情況,當消費者取出消息時,可以不對消息隊列做任何操作,也可以將取出的消息刪除。

畢竟,隊列裏的消息,消費後是需要刪除的,取到消息,發給隊列一個然虧,隊列就刪除該消息。

這個取出消息後的刪除,也分兩種情況:

1、一種是取出就刪除,consume和get兩個方法都有一個參數AMQP_AUTOACK自動反饋

2、另一種是取出後,並不會自動返回刪除,而是將取出來的消息處理之後確認沒有問題了,手動反饋給消息隊列

至於選擇哪種反饋方式,根據需求自行選擇。

關於RabbitMQ還有很多要說的,比如一個exchange可以綁定多個queue,多個exchange可以綁定一個queue(多對多的關系)

還可以根據exchange不同的模式,搭配不同的route key做不同的匹配。各種組合吧。

靈活應用起來功能還是很強大的,只是具體使用時需要仔細,因為一個不小心,不是丟失消息就是多出很多消息。

所知有限,在此不做特復雜的說明,下邊的例子也是一個簡單的完成一個簡單的隊列的操作。演示學習用。

更多的情況,就自行研究擴展。

RabbitMQ操作類(rabbitmq.class.php)

  1 <?php
  2 // rabbitmq 操作類
  3 class RabbitMQ
  4 {
  5     // 配置變量
  6     public $configs = array(
  7         ‘host‘ => ‘localhost‘, 
  8         ‘port‘ => ‘5672‘, 
  9         ‘login‘ => ‘guest‘, 
 10         ‘password‘ => ‘guest‘,
 11         ‘vhost‘ => ‘/‘
 12     );
 13     public $exchange_name = ‘ex_q_def‘;// 交換機名稱
 14     public $queue_name = ‘ex_q_def‘;// 隊列名稱
 15     public $route_key = ‘‘;// 路由key的名稱
 16     public $durable = true;// 持久化,默認true
 17     public $autodelete = false;// 自動刪除
 18 
 19     // 內部通用變量
 20     private $_conn = null;
 21     private $_exchange = null;
 22     private $_channel = null;
 23     private $_queue = null;
 24 
 25     // 構造函數
 26     public function __construct()
 27     {
 28         // 初始化隊列
 29         $this->init();
 30     }
 31 
 32     // 配置rabbitmq
 33     public function set_configs($configs)
 34     {
 35         // 初始化配置
 36         if (!is_array($configs)) {
 37             echo ‘configs is not array.‘;
 38         }
 39         if (!($configs[‘host‘] && $configs[‘port‘] && $configs[‘login‘] && $configs[‘password‘])) {
 40             echo ‘configs is empty.‘;
 41         }
 42         if (!isset($configs[‘vhost‘])) {// 沒有vhost元素,給出默認值
 43             $configs[‘vhost‘] = ‘/‘;
 44         } else {
 45             if (empty($configs[‘vhost‘])) {// 有vhost元素,但是值為空,給出默認值
 46                 $configs[‘vhost‘] = ‘/‘;
 47             }
 48         }
 49         $this->configs = $configs;
 50     }
 51 
 52     // 初始化rabbitmq
 53     private function init()
 54     {
 55         if (!$this->_conn) {
 56             $this->_conn = new AMQPConnection($this->configs);// 創建連接對象
 57             if (!$this->_conn->connect()) {
 58                 echo "Cannot connect to the broker \n ";
 59                 exit(0);
 60             }
 61         }
 62 
 63         // 創建channel
 64         $this->_channel = new AMQPChannel($this->_conn);
 65     }
 66 
 67     // 創建隊列(為了保證正常訂閱,避免消息丟失,生產者和消費則都要嘗試創建隊列:交換機和隊列通過路由綁定一起)
 68     public function create_queue($exchange_name=‘‘, $route_key=‘‘, $queue_name=‘‘)
 69     {
 70         if ($exchange_name != ‘‘) {
 71             // 隊列名參數可以省略,默認與交換機同名
 72             $this->exchange_name = $exchange_name;// 更新交換機名稱
 73             $this->queue_name = $exchange_name;// 更新隊列名稱
 74         }
 75         if ($route_key != ‘‘) $this->route_key = $route_key;// 更新路由
 76         if ($queue_name != ‘‘) $this->queue_name = $queue_name;// 獨立更新隊列名稱
 77 
 78         // 創建exchange交換機
 79         $this->_exchange = new AMQPExchange($this->_channel);// 創建交換機
 80         $this->_exchange->setType(AMQP_EX_TYPE_DIRECT);// 設置交換機模式為direct
 81         if ($this->durable) {
 82             $this->_exchange->setFlags(AMQP_DURABLE);// 設置是否持久化
 83         }
 84         if ($this->autodelete) {
 85             $this->_exchange->setFlags(AMQP_AUTODELETE);// 設置是否自動刪除
 86         }
 87         $this->_exchange->setName($this->exchange_name);// 設置交換機名稱
 88         $this->_exchange->declare();
 89 
 90         // 創建queue隊列
 91         $this->_queue = new AMQPQueue($this->_channel);
 92         if ($this->durable) {
 93             $this->_queue->setFlags(AMQP_DURABLE);// 設置是否持久化
 94         }
 95         if ($this->autodelete) {
 96             $this->_queue->setFlags(AMQP_AUTODELETE);// 設置是否自動刪除
 97         }
 98         $this->_queue->setName($this->queue_name);// 設置隊列名稱
 99         $this->_queue->declare();// 完成隊列的定義
100 
101         // 將queue和exchange通過route_key綁定在一起
102         $this->_queue->bind($this->exchange_name, $this->route_key);
103     }
104 
105     // 生產者,向隊列交換機發送消息
106     public function send($msg, $exchange_name=‘‘, $route_key=‘‘, $queue_name=‘‘)
107     {
108         $this->create_queue($exchange_name, $route_key, $queue_name);
109         // 消息處理
110         if (is_array($msg)) {
111             $msg = json_encode($msg);// 將數組類型轉換成JSON格式
112         } else {
113             $msg = trim(strval($msg));// 簡單處理一下要發送的消息內容
114         }
115 
116         // 生產者推送消息進隊列時,只能將消息推送到交換機exchange中
117         $this->_exchange->publish($msg, $this->route_key);
118     }
119 
120     // 消費者,從隊列中獲取數,消費隊列(訂閱)
121     public function run($fun_name, $exchange_name=‘‘, $route_key=‘‘, $queue_name=‘‘, $autoack=false)
122     {
123         if (!$fun_name) {// 沒有返回函數,或者隊列不存在
124             return false;
125         }
126         $this->create_queue($exchange_name, $route_key, $queue_name);
127         // 訂閱消息
128         while (true) {
129             if ($autoack) {
130                 $this->_queue->consume($fun_name, AMQP_AUTOACK);// 自動應答
131             } else {
132                 $this->_queue->consume($fun_name);// 需要手動應答
133             }
134         }
135     }
136 
137     // 消費者,從隊列中獲取數,消費隊列(主動獲取)
138     public function get($exchange_name=‘‘, $route_key=‘‘, $queue_name=‘‘, $autoack=false)
139     {
140         $this->create_queue($exchange_name, $route_key, $queue_name);
141         // 主動獲取消息
142         if ($autoack) {
143             $msg = $this->_queue->get(AMQP_AUTOACK);// 自動應答
144         } else {
145             $msg = $this->_queue->get();// 需要手動應答
146         }
147         return [‘msg‘=>$msg, ‘queue‘=>$this->_queue];
148     }
149 }
150 ?>

生產者推送(test_send.php)

 1 <?php
 2 require_once(‘rabbitmq.class.php‘);
 3 
 4 $rmq = new RabbitMQ;
 5 for ($i = 0; $i < 10; $i++) {
 6     echo ‘test_consume_‘ . $i .‘<br />‘;
 7     $rmq->send(‘test_consume_‘ . $i, ‘test_consume‘);
 8 }
 9 
10 for ($i = 0; $i < 10; $i++) {
11     echo ‘get_msg_‘.$i.‘<br />‘;
12     $rmq->send(‘get_msg_‘ . $i, ‘test_get‘);
13 }
14 
15 echo ‘send ok ! ‘ . date(‘Y-m-d H:i:s‘);
16 ?>

消費者consume(test_run.php)

 1 <?php
 2 require_once(‘rabbitmq.class.php‘);
 3 
 4 $rmq = new RabbitMQ;
 5 
 6 $s = $rmq->run(‘processMessage‘, ‘test_consume‘);
 7 
 8 function processMessage($envelope, $queue) {
 9     $msg = $envelope->getBody();
10     sleep(1);  //sleep1秒模擬任務處理
11     echo $msg."\n"; //處理消息
12     $queue->ack($envelope->getDeliveryTag()); //手動發送ACK應答
13 }
14 ?>

消費者get(test_get.php)

 1 <?php
 2 require_once(‘rabbitmq.class.php‘);
 3 
 4 $rmq = new RabbitMQ;
 5 
 6 $r = $rmq->get(‘test_get‘);
 7 
 8 echo $r[‘msg‘]->getBody();// 取到的消息
 9 $r[‘queue‘]->ack($r[‘msg‘]->getDeliveryTag());// 手動反饋,刪除消費的消息
10 ?>

訂閱consume可以起多個程序,隊列會輪詢平均的分到每一個訂閱裏。當然,前提是處理速度是一樣,並且都有反饋。

如果處理速度不同,哪個快 ,哪就會分配更多的消息。如果沒有反饋,默認只會推送3條消息,如果一直不給反饋,就不會再有推送了。

此時如果中斷這個沒有反饋的訂閱,因為隊列中沒有刪除,會再次分配到其他訂閱者哪裏繼續推送消費。

同樣的,如果隊列中有消息,隨時開啟新的訂閱,隨時就會分配到消費的消息。

PHP操作RabbitMQ的類 exchange、queue、route kye、bind