1. 程式人生 > >RabbitMQ的安裝與基本使用

RabbitMQ的安裝與基本使用

表示 消息生產者 routing .cn error 回調 方法名 while bar

  運行環境:https://oneinstack.com/install/

在項目中,將一些無需即時返回且耗時的操作提取出來,進行了異步處理,而這種異步處理的方式大大的節省了服務器的請求響應時間,從而提高了系統的吞吐量。如發送短信、郵件、過濾非法關鍵字等等。它還可以用於RPC。

先看一張官方圖:

技術分享

一、概念: Broker:簡單來說就是消息隊列服務器實體。    Exchange:消息交換機,它指定消息按什麽規則,路由到哪個隊列。    Queue:消息隊列載體,每個消息都會被投入到一個或多個隊列。    Binding:綁定,它的作用就是把exchange和queue按照路由規則綁定起來。
   Routing Key:路由關鍵字,exchange根據這個關鍵字進行消息投遞。    vhost:虛擬主機,一個broker裏可以開設多個vhost,用作不同用戶的權限分離。    producer:消息生產者,就是投遞消息的程序。    consumer:消息消費者,就是接受消息的程序。    channel:消息通道,在客戶端的每個連接裏,可建立多個channel,每個channel代表一個會話任務。 二、安裝RabbitMQ Ubuntu: sudo apt-get install erlang sudo apt-get install rabbitmq-server
CentOS: 1.先安裝erlang yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel yum -y install ncurses-devel yum install ncurses-devel wget http://www.erlang.org/download/otp_src_17.5.tar.gz tar -xzvf otp_src_17.5.tar.gz
cd otp_src_17.5 ./configure --prefix=/usr/local/erlang --with-ssl --enable-threads --enable-smp-support --enable-kernel-poll --enable-hipe make && make install 配置erlang環境 vi /etc/profile #在最後添加下文 ERL_HOME=/usr/local/erlang PATH=$ERL_HOME/bin:$PATH export ERL_HOME PATH 使環境變量生效 source /etc/profile 測試一下是否安裝成功,在控制臺輸入命令erl     安裝完後輸入“erl”以下提示即為安裝成功     
[root@cloud bin]# erl
Erlang R16B02 (erts-5.10.3) [source] [64-bit] [smp:2:2] [async-threads:10] [hipe] [kernel-poll:false]
 
Eshell V5.10.3  (abort with ^G)
1>
2.安裝rabbitmq wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.5.1/rabbitmq-server-3.5.1.tar.gz tar -zxvf rabbitmq-server-3.5.1.tar.gz cd rabbitmq-server-3.5.1 make make TARGET_DIR=/usr/local/rabbitmq SBIN_DIR=/usr/local/rabbitmq/sbin MAN_DIR=/usr/local/rabbitmq/man DOC_INSTALL_DIR=/usr/local/rabbitmq/doc install 報錯處理: /bin/sh: xmlto: command not found /bin/sh: line 2: xmlto: command not found 解決:yum install xmlto 三、管理命令   切換到安裝目錄,sbin文件下才能執行命令,如:cd /usr/local/rabbitmq/sbin/ 啟動:./rabbitmq-server start 關閉:./rabbitmqctl stop 狀態:./rabbitmqctl status 四、插件 啟動web管理插件,切換到安裝目錄,sbin文件下才能執行命令,如:cd /usr/local/rabbitmq/sbin/ ./rabbitmq-plugins enable rabbitmq_management 錯誤解決: Error: {cannot_write_enabled_plugins_file,"/etc/rabbitmq/enabled_plugins", enoent} mkdir /etc/rabbitmq 重新啟動輸入地址:localhost:15672,帳號默認為guest,密碼guest,此帳號默認只能在本機訪問。不建議打開遠程訪問。你可以創建一個帳戶,並設置可以遠程訪問的角色進行訪問。 如:./rabbitmqctl add_user luo 123456 ./rabbitmqctl set_user_tags luo administrator 五、用戶管理 默認的guest帳戶相當於root帳戶 rabbitmqctl add_user username password 添加帳戶 rabbitmqctl change_password username newpassword 修改密碼 rabbitmqctl delete_user username 刪除帳戶 rabbitmqctl list_users 列出所有帳戶 rabbitmqctl set_user_tags User Tag 設置角色(administrator、monitoring、policymaker、management、其它) 立即生效,不需重啟 六、安裝PHP擴展(amqp) 不一定非要安裝擴展,你也可以直接下載類庫:https://github.com/videlalvaro/php-amqplib。 https://pecl.php.net/get/amqp-1.3.0.tgz tar -zxvf amqp-1.3.0.tgz cd amqp-1.3.0 /usr/local/php/bin/phpize( 可使用find / -name phpize查找phpize路徑 )

./configure --with-php-config=/usr/local/php/bin/php-config --with-amqp

(可使用find / -name php-config查找php-config路徑) 在php.ini中extenstion部分寫入extension=amqp.so 最後phpinfo檢查是否成功 七、使用PHP與之交互 Produce端:
//設置連接屬性  
$connArgs = array(  
     ‘host‘           => ‘localhost‘,  
     ‘port‘           => ‘5672‘,  
     ‘login‘           => ‘guest‘,  
     ‘password‘ => ‘guest‘,  
     ‘vhost‘          => ‘/‘  
);  
  
//創建RabbitMQ連接  
$conn = new AMQPConnection($connArgs);  
if (!$conn->connect()) {  
    echo "Cannot connect to the broker <br/>\n ";  
}  
  
//創建channel  
$channel = new AMQPChannel($conn);  
  
//創建exchange  
$exchangeName = ‘exchange‘;  
$exchange = new AMQPExchange($channel);  
$exchange->setName($exchangeName);//創建名字  
$exchange->setType(AMQP_EX_TYPE_DIRECT);//設置為direct類型  
$exchange->setFlags(AMQP_DURABLE);//持久化交換機,當代理重啟動後依然存在,並包括它們中的完整數據  
$exchange->setFlags(AMQP_AUTODELETE);//對交換機而言,自動刪除標誌表示交換機將在沒有隊列綁定的情況下被自動刪除,如果從沒有隊列和其綁定過,這個交換機將不會被刪除.  
  
//發送消息  
$routingKey = ‘key_test‘;  
for($i=0;$i<10;$i++){  
     sleep(3);  
     $message = ‘Hello World ‘.$i;  
    $exchange->publish($message,$routingKey);  
}  
  
//斷開連接  
$conn->disconnect();  

Consumer端:
//設置連接屬性  
$connArgs = array(  
                    ‘host‘           => ‘localhost‘,  
                    ‘port‘           => ‘5672‘,  
                    ‘login‘           => ‘guest‘,  
                    ‘password‘ => ‘guest‘,  
                    ‘vhost‘          => ‘/‘  
);  
  
//創建RabbitMQ連接  
$conn = new AMQPConnection($connArgs);  
if (!$conn->connect()) {  
     echo ‘cannot connect to the broker‘;  
}  
  
//創建channel  
$channel = new AMQPChannel($conn);  
  
//設置隊列名稱  
$exchangeName = ‘exchange‘;  
$routingKey = ‘key_test‘;      
$queueName = ‘queue_test_1‘;  
$queue = new AMQPQueue($channel);  
$queue->setName($queueName);//設置名稱  
$queue->setFlags(AMQP_DURABLE);//持久化隊列,當代理重啟動後依然存在,並包括它們中的完整數據  
$queue->declare();//聲明此隊列  
$queue->bind(‘exchange‘,$routingKey);//使用某交換機,並綁定某路由關鍵字  
  
//消費數據方式一:非阻塞方式  
// while(true)  
// {  
//      sleep(1);  
//      $envelope = $queue->get(AMQP_AUTOACK) ;//第一個參數表示自動ACK應答  
//      if ($envelope){  
//           $messages = $envelope->getBody();  
//           echo $messages;  
//      }  
// }  
  
  
//消費數據方式二:以阻塞模式消費數據(推薦)  
while(true)  
{  
          $queue->consume(‘processMessage‘);//第一個參數表示要回調的方法名,第二個參數設置為AMQP_AUTOACK,表示自動ACK應答  
}  
/** 
* 定義回調方法 
*/  
function processMessage($envelope, $queue)  
{  
     $messages = $envelope->getBody();#獲取消息數據  
     echo $messages;  
     $queue->ack($envelope->getDeliveryTag()); //處理成功後,手動發送ACK應答  
     //$queue->nack($envelope->getDeliveryTag()); //處理不成功,手動發送NO-ACK應答,放回隊列中   
}  


八:特別說明: 1.如果某一次消費數據沒有ACK,則此條消息會記錄為Unacknowledged,如果對於某一節點有連續三條則RabbitMQ認為此節點有故障,則不會再對它進行分發.當它斷開後或一定時間後Unacknowledged狀態消息會重新放回交換機中。手動no-ack後此Message將會放回隊列中且不會再轉發給此節點。 2.對於計算密集型的工作,我們需要建立多個Consumer,對於多個Consumer,默認的分發機制是“公平分發”,將第n個發給第n個Consumer,超過個數取n的模。 3.Exchange中的類型有:direct, topic 和fanout。 direct:通過routingKey和exchange決定的那個唯一的queue可以接收消息。 topic :所有符合routingKey(此時可以是一個表達式)的routingKey所bind的queue可以接收消息,direct的區別是它的routingkey可 以模糊匹配,#代表一個或多個字符,*代表任何字符。一般為確保程序嚴謹性而使用direct。註意當使用RoutingKey為#,Exchange Type為topic的時候相當於使用fanout fanout:是廣播模式,所有bind到此exchange的queue都可以接收消息,即該一個Message可以對應多個Consumer,應用場景舉例:生產了了添加到購物車的Message,一個Consumer寫推薦商品日誌,一個Consumer寫購物車表。 4.兩個不相同的queue名的隊列綁定到同一exchange和rotingky上,此時相當於同一queue名的fanout廣播模式,兩個queue都會得都到 所有Message。如圖所示: 技術分享 5.RPC的實現流程: 技術分享

RabbitMQ的安裝與基本使用