1. 程式人生 > >RabbitMQ 在 PHP 下的簡單使用 (一) -- 安裝 AMQP 擴展和 Direct Exchange 模式

RabbitMQ 在 PHP 下的簡單使用 (一) -- 安裝 AMQP 擴展和 Direct Exchange 模式

定義 wamp 正則表達 board root 獲取默認 則表達式 oot ros

Windows 安裝 amqp 擴展

RabbitMQ 是基於 amqp(高級消息隊列協議) 協議的。使用 RabbitMQ 前必須為 PHP 安裝相應的 amqp 擴展。
  1. 下載相應版本的 amqp 擴展:http://pecl.php.net/package/amqp,解壓縮文件。
  2. 將 php_amqp.dll 復制到 php 的擴展目錄 ext 下,修改配置文件 php.ini:
    [amqp]
    extension=php_amqp.dll
  3. 將 rabbitmq.*.dll 文件復制到 php 的安裝目錄下,然後修改 Apache 配置文件 httpd.conf:
    #[rabbitmq]
    LoadFile "F:\wamp64\bin\php\php7.0.10\rabbitmq.*.dll"
  4. 重啟服務器,查看 phpinfo,確認擴展信息。

Direct Exchange 模式

Direct Exchange 模式的交換機適合用於消息的單播發送. 交換機根據推送消息時的 routing key 和 隊列的 routing key 判斷消息應該推送到哪個隊列. 可以實現同一交換機上的消息, 根據 routing key 推送到不同的隊列中.

默認 Direct Exchange

此種模式下,使用 RabbitMQ 的默認 Exchange 即可,默認的 Exchange 是 Direct 模式。使用默認 Exchange 時,不需要對 Exchange 進行屬性設置和聲明,也不需要對 Queue 進行顯示綁定和設置 routing key。Queue 默認會綁定到默認 Exchange,以及默認 routing key 與 Queue 的名稱相同。

producer.php:
  1. 創建連接並發起連接
  2. 在連接上創建通道
  3. 在通道上獲取默認交換機
  4. 向交換機發送消息
 1 header(‘Content-Type: text/html; charset=utf-8‘);
 2 // 連接設置
 3 $conConfig = [
 4   ‘host‘ => ‘127.0.0.1‘,
 5   ‘port‘ => 5672,
 6   ‘login‘ => ‘root‘,
 7   ‘password‘ => ‘root‘,
 8
  ‘vhost‘ => ‘/‘ 9 ]; 10 try 11 { 12   // RabbitMQ 連接實例 13   $con = new AMQPConnection($conConfig); 14   // 發起連接 15   $con->connect(); 16   // 判斷連接是否仍然有效 17   if(!$con->isConnected()) 18   { 19     echo ‘連接失敗‘;die; 20   } 21   // 新建通道 22   $channel = new AMQPChannel($con); 23   // 使用RabbitMQ的默認Exchange 24   $exchange = new AMQPExchange($channel); 25   for($i = 1; $i < 6; $i++) 26   { 27     $message = [ 28       ‘name‘ => ‘默認交換機,消息-‘ . $i, 29       ‘info‘ => ‘Hello World!‘ 30     ]; 31     // 發送消息,為消息指定routing key,成功返回true,失敗false 32     $state = $exchange->publish(json_encode($message, JSON_UNESCAPED_UNICODE), ‘test.queue1‘); 33     if($state) 34     { 35       echo ‘Success‘ . PHP_EOL; 36     }else 37     { 38       echo ‘Fail‘ . PHP_EOL; 39     } 40   } 41   // 關閉連接 42   $con->disconnect(); 43 }catch(Exception $e) 44 { 45   echo $e->getMessage(); 46 }
consumer.php:
  1. 創建連接並發起連接
  2. 在連接上創建通道
  3. 在通道上創建隊列並聲明隊列
  4. 從隊列獲取消息
 1 header(‘Content-Type: text/html; charset=utf-8‘);
 2 $conConfig = [
 3   ‘host‘ => ‘127.0.0.1‘,
 4   ‘port‘ => 5672,
 5   ‘login‘ => ‘root‘,
 6   ‘password‘ => ‘root‘,
 7   ‘vhost‘ => ‘/‘
 8 ];
 9  
10 
11 try
12 {
13   $con = new AMQPConnection($conConfig);
14   $con->connect();
15   if(!$con->isConnected())
16   {
17     echo ‘連接失敗‘;die;
18   }
19  
20 
21   $channel = new AMQPChannel($con);
22  
23 
24   $queue = new AMQPQueue($channel);
25   $queue->setName(‘test.queue1‘);
26   // 聲明隊列,不需要對Queue進行顯示綁定到交換機和指定Queue的routing key
27   $queue->declareQueue();
28   $queue->consume(function($envelope, $queue)
29   {
30     echo $envelope->getBody() . PHP_EOL;
31   }, AMQP_AUTOACK);
32  
33 
34   $con->disconnect();
35 }catch(Exception $e)
36 {
37   echo $e->getMessage();
38 }

自定義 Direct Exchange

producer:
header(‘Content-Type: text/html; charset=utf-8‘);
// 連接設置
$conConfig = [
  ‘host‘ => ‘127.0.0.1‘,
  ‘port‘ => 5672,
  ‘login‘ => ‘root‘,
  ‘password‘ => ‘root‘,
  ‘vhost‘ => ‘/‘
];
try
{
  // RabbitMQ 連接實例
  $con = new AMQPConnection($conConfig);
  // 發起連接
  $con->connect();
  // 判斷連接結果,true成功,false失敗
  if(!$con->isConnected())
  {
    echo ‘連接失敗‘;die;
  }
  // 新建通道
  $channel = new AMQPChannel($con);
  // 新建交換機
  $exchange = new AMQPExchange($channel);
  // 交換機名稱
  $exchange->setName(‘test.direct‘);
  // 交換機類型
  $exchange->setType(‘direct‘);
  // 聲明交換機
  $exchange->declareExchange();
  for($i = 1; $i < 6; $i++)
  {
    $message = [
      ‘name‘ => ‘direct交換機,消息-‘ . $i,
      ‘info‘ => ‘Hello World!‘
    ];
    // 發送消息,同時為消息指定routing key,成功返回true,失敗false
    $state = $exchange->publish(json_encode($message, JSON_UNESCAPED_UNICODE), ‘test.queue1‘);
    if($state)
    {
      echo ‘Success‘ . PHP_EOL;
    }else
    {
      echo ‘Fail‘ . PHP_EOL;
    }
  }
 
  // 關閉連接
  $con->disconnect();
}catch(Exception $e)
{
  echo $e->getMessage();
}
consumer:
 1 header(‘Content-Type: text/html; charset=utf-8‘);
 2 $conConfig = [
 3   ‘host‘ => ‘127.0.0.1‘,
 4   ‘port‘ => 5672,
 5   ‘login‘ => ‘root‘,
 6   ‘password‘ => ‘root‘,
 7   ‘vhost‘ => ‘/‘
 8 ];
 9  
10 
11 try
12 {
13   $con = new AMQPConnection($conConfig);
14   $con->connect();
15   if(!$con->isConnected())
16   {
17     echo ‘連接失敗‘;die;
18   }
19  
20   $channel = new AMQPChannel($con);
21  
22   $exchange =new AMQPExchange($channel);
23   $exchange->setName("test.direct");
24   $exchange->setType(‘direct‘);
25   $exchange->setFlags(AMQP_DURABLE);
26   $exchange->declareExchange();
27  
28 
29   $queue = new AMQPQueue($channel);
30   $queue->setName(‘test.queue1‘);
31   // 聲明隊列,不需要對Queue進行顯示綁定到交換機和指定Queue的routing key
32   $queue->declareQueue();
33   // 綁定隊列到指定交換機,並指定routing key,即分發規則,消息的routing key與隊列的綁定routing key匹配時才。routing key可以使用正則表達式
34   $queue->bind(‘test.direct‘, ‘/^q.*/‘);
35   $queue->consume(function($envelope, $queue)
36   {
37     echo $envelope->getBody() . PHP_EOL;
38   }, AMQP_AUTOACK);
39  
40 
41   $con->disconnect();
42 }catch(Exception $e)
43 {
44   echo $e->getMessage();
45 }

RabbitMQ 在 PHP 下的簡單使用 (一) -- 安裝 AMQP 擴展和 Direct Exchange 模式