RabbitMQ 在 PHP 下的簡單使用 (一) -- 安裝 AMQP 擴展和 Direct Exchange 模式
阿新 • • 發佈:2019-03-16
定義 wamp 正則表達 board root 獲取默認 則表達式 oot ros Direct Exchange 模式
producer.php:
Windows 安裝 amqp 擴展
RabbitMQ 是基於 amqp(高級消息隊列協議) 協議的。使用 RabbitMQ 前必須為 PHP 安裝相應的 amqp 擴展。- 下載相應版本的 amqp 擴展:http://pecl.php.net/package/amqp,解壓縮文件。
-
將 php_amqp.dll 復制到 php 的擴展目錄 ext 下,修改配置文件 php.ini:
[amqp] extension=php_amqp.dll
-
將 rabbitmq.*.dll 文件復制到 php 的安裝目錄下,然後修改 Apache 配置文件 httpd.conf:
#[rabbitmq] LoadFile "F:\wamp64\bin\php\php7.0.10\rabbitmq.*.dll"
- 重啟服務器,查看 phpinfo,確認擴展信息。
Direct Exchange 模式
Direct Exchange 模式的交換機適合用於消息的單播發送. 交換機根據推送消息時的 routing key 和 隊列的 routing key 判斷消息應該推送到哪個隊列. 可以實現同一交換機上的消息, 根據 routing key 推送到不同的隊列中.
默認 Direct Exchange
此種模式下,使用 RabbitMQ 的默認 Exchange 即可,默認的 Exchange 是 Direct 模式。使用默認 Exchange 時,不需要對 Exchange 進行屬性設置和聲明,也不需要對 Queue 進行顯示綁定和設置 routing key。Queue 默認會綁定到默認 Exchange,以及默認 routing key 與 Queue 的名稱相同。
- 創建連接並發起連接
- 在連接上創建通道
- 在通道上獲取默認交換機
- 向交換機發送消息
1 header(‘Content-Type: text/html; charset=utf-8‘); 2 // 連接設置 3 $conConfig = [ 4 ‘host‘ => ‘127.0.0.1‘, 5 ‘port‘ => 5672, 6 ‘login‘ => ‘root‘, 7 ‘password‘ => ‘root‘, 8consumer.php:‘vhost‘ => ‘/‘ 9 ]; 10 try 11 { 12 // RabbitMQ 連接實例 13 $con = new AMQPConnection($conConfig); 14 // 發起連接 15 $con->connect(); 16 // 判斷連接是否仍然有效 17 if(!$con->isConnected()) 18 { 19 echo ‘連接失敗‘;die; 20 } 21 // 新建通道 22 $channel = new AMQPChannel($con); 23 // 使用RabbitMQ的默認Exchange 24 $exchange = new AMQPExchange($channel); 25 for($i = 1; $i < 6; $i++) 26 { 27 $message = [ 28 ‘name‘ => ‘默認交換機,消息-‘ . $i, 29 ‘info‘ => ‘Hello World!‘ 30 ]; 31 // 發送消息,為消息指定routing key,成功返回true,失敗false 32 $state = $exchange->publish(json_encode($message, JSON_UNESCAPED_UNICODE), ‘test.queue1‘); 33 if($state) 34 { 35 echo ‘Success‘ . PHP_EOL; 36 }else 37 { 38 echo ‘Fail‘ . PHP_EOL; 39 } 40 } 41 // 關閉連接 42 $con->disconnect(); 43 }catch(Exception $e) 44 { 45 echo $e->getMessage(); 46 }
- 創建連接並發起連接
- 在連接上創建通道
- 在通道上創建隊列並聲明隊列
- 從隊列獲取消息
1 header(‘Content-Type: text/html; charset=utf-8‘); 2 $conConfig = [ 3 ‘host‘ => ‘127.0.0.1‘, 4 ‘port‘ => 5672, 5 ‘login‘ => ‘root‘, 6 ‘password‘ => ‘root‘, 7 ‘vhost‘ => ‘/‘ 8 ]; 9 10 11 try 12 { 13 $con = new AMQPConnection($conConfig); 14 $con->connect(); 15 if(!$con->isConnected()) 16 { 17 echo ‘連接失敗‘;die; 18 } 19 20 21 $channel = new AMQPChannel($con); 22 23 24 $queue = new AMQPQueue($channel); 25 $queue->setName(‘test.queue1‘); 26 // 聲明隊列,不需要對Queue進行顯示綁定到交換機和指定Queue的routing key 27 $queue->declareQueue(); 28 $queue->consume(function($envelope, $queue) 29 { 30 echo $envelope->getBody() . PHP_EOL; 31 }, AMQP_AUTOACK); 32 33 34 $con->disconnect(); 35 }catch(Exception $e) 36 { 37 echo $e->getMessage(); 38 }
自定義 Direct Exchange
producer:header(‘Content-Type: text/html; charset=utf-8‘); // 連接設置 $conConfig = [ ‘host‘ => ‘127.0.0.1‘, ‘port‘ => 5672, ‘login‘ => ‘root‘, ‘password‘ => ‘root‘, ‘vhost‘ => ‘/‘ ]; try { // RabbitMQ 連接實例 $con = new AMQPConnection($conConfig); // 發起連接 $con->connect(); // 判斷連接結果,true成功,false失敗 if(!$con->isConnected()) { echo ‘連接失敗‘;die; } // 新建通道 $channel = new AMQPChannel($con); // 新建交換機 $exchange = new AMQPExchange($channel); // 交換機名稱 $exchange->setName(‘test.direct‘); // 交換機類型 $exchange->setType(‘direct‘); // 聲明交換機 $exchange->declareExchange(); for($i = 1; $i < 6; $i++) { $message = [ ‘name‘ => ‘direct交換機,消息-‘ . $i, ‘info‘ => ‘Hello World!‘ ]; // 發送消息,同時為消息指定routing key,成功返回true,失敗false $state = $exchange->publish(json_encode($message, JSON_UNESCAPED_UNICODE), ‘test.queue1‘); if($state) { echo ‘Success‘ . PHP_EOL; }else { echo ‘Fail‘ . PHP_EOL; } } // 關閉連接 $con->disconnect(); }catch(Exception $e) { echo $e->getMessage(); }consumer:
1 header(‘Content-Type: text/html; charset=utf-8‘); 2 $conConfig = [ 3 ‘host‘ => ‘127.0.0.1‘, 4 ‘port‘ => 5672, 5 ‘login‘ => ‘root‘, 6 ‘password‘ => ‘root‘, 7 ‘vhost‘ => ‘/‘ 8 ]; 9 10 11 try 12 { 13 $con = new AMQPConnection($conConfig); 14 $con->connect(); 15 if(!$con->isConnected()) 16 { 17 echo ‘連接失敗‘;die; 18 } 19 20 $channel = new AMQPChannel($con); 21 22 $exchange =new AMQPExchange($channel); 23 $exchange->setName("test.direct"); 24 $exchange->setType(‘direct‘); 25 $exchange->setFlags(AMQP_DURABLE); 26 $exchange->declareExchange(); 27 28 29 $queue = new AMQPQueue($channel); 30 $queue->setName(‘test.queue1‘); 31 // 聲明隊列,不需要對Queue進行顯示綁定到交換機和指定Queue的routing key 32 $queue->declareQueue(); 33 // 綁定隊列到指定交換機,並指定routing key,即分發規則,消息的routing key與隊列的綁定routing key匹配時才。routing key可以使用正則表達式 34 $queue->bind(‘test.direct‘, ‘/^q.*/‘); 35 $queue->consume(function($envelope, $queue) 36 { 37 echo $envelope->getBody() . PHP_EOL; 38 }, AMQP_AUTOACK); 39 40 41 $con->disconnect(); 42 }catch(Exception $e) 43 { 44 echo $e->getMessage(); 45 }
RabbitMQ 在 PHP 下的簡單使用 (一) -- 安裝 AMQP 擴展和 Direct Exchange 模式