1. 程式人生 > >rabbitmq - (消息隊列) 的基本原理介紹

rabbitmq - (消息隊列) 的基本原理介紹

可復用 word conn 大致 hpa 服務 ges bind nor

介紹

MQ全稱為Message Queue, 是一種分布式應用程序的的通信方法,它是消費-生產者模型的一個典型的代表,producer往消息隊列中不斷寫入消息,而另一端consumer則可以讀取或者訂閱隊列中的消息。RabbitMQ是MQ產品的典型代表,是一款基於AMQP協議可復用的企業消息系統

系統架構

Rabbitmq系統最核心的組件是Exchange和Queue,Exchange和Queue是在rabbitmq server(又叫做broker)端,producer和consumer在應用端。

原理大致圖(MQ:Message Queue):

技術分享圖片

Queue

消息隊列,提供了FIFO的處理機制,具有緩存消息的能力。rabbitmq中,隊列消息可以設置為持久化,臨時或者自動刪除。

  1. 設置為持久化的隊列,queue中的消息會在server本地硬盤存儲一份,防止系統crash,數據丟失
  2. 設置為臨時隊列,queue中的數據在系統重啟之後就會丟失
  3. 設置為自動刪除的隊列,當不存在用戶連接到server,隊列中的數據會被自動刪除

Exchange

Exchange類似於數據通信網絡中的交換機,提供消息路由策略。rabbitmq中,producer不是通過信道直接將消息發送給queue,而是先發送給Exchange。一個Exchange可以和多個Queue進行綁定,producer在傳遞消息的時候,會傳遞一個ROUTING_KEY,Exchange會根據這個ROUTING_KEY按照特定的路由算法,將消息路由給指定的queue。和Queue一樣,Exchange也可設置為持久化,臨時或者自動刪除。

Exchange有4種類型:direct(默認),fanout, topic, 和headers,不同類型的Exchange轉發消息的策略有所區別:

  1. Direct直接交換器,工作方式類似於單播,Exchange會將消息發送完全匹配ROUTING_KEY的Queue
  2. fanout廣播是式交換器,不管消息的ROUTING_KEY設置為什麽,Exchange都會將消息轉發給所有綁定的Queue
  3. topic主題交換器,工作方式類似於組播,Exchange會將消息轉發和ROUTING_KEY匹配模式相同的所有隊列,比如,ROUTING_KEY為user.stock的Message會轉發給綁定匹配模式為 * .stock,user.stock, * . * 和#.user.stock.#的隊列。( * 表是匹配一個任意詞組,#表示匹配0個或多個詞組)
  4. headers消息體的header匹配(ignore)

Binding

所謂綁定就是將一個特定的 Exchange 和一個特定的 Queue 綁定起來。Exchange 和Queue的綁定可以是多對多的關系

通信過程

假設P1和C1註冊了相同的Broker,Exchange和Queue。P1發送的消息最終會被C1消費。基本的通信流程大概如下所示:

  1. P1生產消息,發送給服務器端的Exchange
  2. Exchange收到消息,根據ROUTINKEY,將消息轉發給匹配的Queue1
  3. Queue1收到消息,將消息發送給訂閱者C1
  4. C1收到消息,發送ACK給隊列確認收到消息
  5. Queue1收到ACK,刪除隊列中緩存的此條消息

Consumer收到消息時需要顯式的向rabbit broker發送basic.ack消息或者consumer訂閱消息時設置auto_ack參數為true。在通信過程中,隊列對ACK的處理有以下幾種情況:

  1. 如果consumer接收了消息,發送ack,rabbitmq會刪除隊列中這個消息,發送另一條消息給consumer。
  2. 如果cosumer接受了消息, 但在發送ack之前斷開連接,rabbitmq會認為這條消息沒有被deliver,在consumer在次連接的時候,這條消息會被redeliver。
  3. 如果consumer接受了消息,但是程序中有bug,忘記了ack,rabbitmq不會重復發送消息。
  4. rabbitmq2.0.0和之後的版本支持consumer reject某條(類)消息,可以通過設置requeue參數中的reject為true達到目地,那麽rabbitmq將會把消息發送給下一個註冊的consumer。

php 生產者示例

先用 composer 加載 mq 拓展文件

{ 
  "require": { 
    "php-amqplib/php-amqplib": "2.7.*" //增加這行 
  } 
}

<?php

require vendor/autoload.php;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$conf = [
        host => 127.0.0.1,
        port => 5672,
        user => kd_dev,
        pwd  => kd_dev,
        vhost => /,
];
$exchangeName = kd_sms_send_ex; //交換機名
$queueName    = kd_sms_send_q; //隊列名稱
$routingKey   = sms_send; //路由關鍵字(也可以省略)

$conn = new AMQPStreamConnection( //建立生產者與mq之間的連接
    $conf[host], $conf[port], $conf[user], $conf[pwd], $conf[vhost]
);
$channel = $conn->channel(); //在已連接基礎上建立生產者與mq之間的通道


$channel->exchange_declare($exchangeName, direct, false, true, false); //聲明初始化交換機
$channel->queue_declare($queueName, false, true, false, false); //聲明初始化一條隊列
$channel->queue_bind($queueName, $exchangeName, $routingKey); //將隊列與某個交換機進行綁定,並使用路由關鍵字

$msgBody = json_encode(["name" => "iGoo", "age" => 22]);
$msg = new AMQPMessage($msgBody, [content_type => text/plain, delivery_mode => 2]); //生成消息
$r   = $channel->basic_publish($msg, $exchangeName, $routingKey); //推送消息到某個交換機
$channel->close();
$conn->close();

php 消費者代碼示例

<?php

  $bindingkey
=‘sms_send‘;   

  
//連接RabbitMQ   $conn_args = array( host=>127.0.0.1 , port=> 5672, login=>kd_dev , password=> kd_dev,vhost =>/);   $conn = new AMQPConnection($conn_args);   $conn->connect();   
  
//設置queue名稱,使用exchange,綁定routingkey   $channel = new AMQPChannel($conn); // 聲明一個通道   $q = new AMQPQueue($channel); // 聲明一個隊列   $q->setName(‘kd_sms_send_q‘); // 路由名
  $q
->setFlags(AMQP_DURABLE);
  $q
->declare();
  $q
->bind(‘kd_sms_send_ex‘,$bindingkey); // 隊列綁定交換機

  //消息獲取   $messages = $q->get(AMQP_AUTOACK) ;   if ($messages){     var_dump(json_decode($messages->getBody(), true ));   }
  $conn
->disconnect();

rabbitmq - (消息隊列) 的基本原理介紹