1. 程式人生 > >rabbitmq - (訊息佇列) 的基本原理介紹

rabbitmq - (訊息佇列) 的基本原理介紹

介紹

MQ全稱為Message Queue, 是一種分散式應用程式的的通訊方法,它是消費-生產者模型的一個典型的代表,producer往訊息佇列中不斷寫入訊息,而另一端consumer則可以讀取或者訂閱佇列中的訊息。RabbitMQ是MQ產品的典型代表,是一款基於AMQP協議可複用的企業訊息系統

系統架構

Rabbitmq系統最核心的元件是Exchange和Queue,Exchange和Queue是在rabbitmq server(又叫做broker)端,producer和consumer在應用端。

原理大致圖(MQ:Message Queue):

Queue

訊息佇列,提供了FIFO的處理機制,具有快取訊息的能力。rabbitmq中,佇列訊息可以設定為持久化,臨時或者自動刪除。

  1. 設定為持久化的佇列,queue中的訊息會在server本地硬碟儲存一份,防止系統crash,資料丟失
  2. 設定為臨時佇列,queue中的資料在系統重啟之後就會丟失
  3. 設定為自動刪除的佇列,當不存在使用者連線到server,佇列中的資料會被自動刪除

Exchange

Exchange類似於資料通訊網路中的交換機,提供訊息路由策略。rabbitmq中,producer不是通過通道直接將訊息傳送給queue,而是先發送給Exchange。一個Exchange可以和多個Queue進行繫結,producer在傳遞訊息的時候,會傳遞一個ROUTING_KEY,Exchange會根據這個ROUTING_KEY按照特定的路由演算法,將訊息路由給指定的queue。和Queue一樣,Exchange也可設定為持久化,臨時或者自動刪除。

Exchange有4種類型:direct(預設),fanout, topic, 和headers,不同型別的Exchange轉發訊息的策略有所區別:

  1. Direct直接交換器,工作方式類似於單播,Exchange會將訊息傳送完全匹配ROUTING_KEY的Queue
  2. fanout廣播是式交換器,不管訊息的ROUTING_KEY設定為什麼,Exchange都會將訊息轉發給所有繫結的Queue
  3. topic主題交換器,工作方式類似於組播,Exchange會將訊息轉發和ROUTING_KEY匹配模式相同的所有佇列,比如,ROUTING_KEY為user.stock的Message會轉發給繫結匹配模式為 * .stock,user.stock, * . * 和#.user.stock.#的佇列。( * 表是匹配一個任意片語,#表示匹配0個或多個片語)
  4. headers訊息體的header匹配(ignore)

Binding

所謂繫結就是將一個特定的 Exchange 和一個特定的 Queue 繫結起來。Exchange 和Queue的繫結可以是多對多的關係

通訊過程

假設P1和C1註冊了相同的Broker,Exchange和Queue。P1傳送的訊息最終會被C1消費。基本的通訊流程大概如下所示:

  1. P1生產訊息,傳送給伺服器端的Exchange
  2. Exchange收到訊息,根據ROUTINKEY,將訊息轉發給匹配的Queue1
  3. Queue1收到訊息,將訊息傳送給訂閱者C1
  4. C1收到訊息,傳送ACK給佇列確認收到訊息
  5. Queue1收到ACK,刪除佇列中快取的此條訊息

Consumer收到訊息時需要顯式的向rabbit broker傳送basic.ack訊息或者consumer訂閱訊息時設定auto_ack引數為true。在通訊過程中,佇列對ACK的處理有以下幾種情況:

  1. 如果consumer接收了訊息,傳送ack,rabbitmq會刪除佇列中這個訊息,傳送另一條訊息給consumer。
  2. 如果cosumer接受了訊息, 但在傳送ack之前斷開連線,rabbitmq會認為這條訊息沒有被deliver,在consumer在次連線的時候,這條訊息會被redeliver。
  3. 如果consumer接受了訊息,但是程式中有bug,忘記了ack,rabbitmq不會重複傳送訊息。
  4. rabbitmq2.0.0和之後的版本支援consumer reject某條(類)訊息,可以通過設定requeue引數中的reject為true達到目地,那麼rabbitmq將會把訊息傳送給下一個註冊的consumer。

php 生產者示例

先用 composer 載入 mq 拓展檔案

{ 
  "require": { 
    "php-amqplib/php-amqplib": "2.7.*" //增加這行 
  } 
}

 

<?php

require 'vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$conf = [
        'host' => '127.0.0.1',
        'port' => 5672,
        'user' => 'kd_dev',
        'pwd'  => 'kd_dev',
        'vhost' => '/',
];
$exchangeName = 'kd_sms_send_ex'; //交換機名
$queueName    = 'kd_sms_send_q'; //佇列名稱
$routingKey   = 'sms_send'; //路由關鍵字(也可以省略)

$conn = new AMQPStreamConnection( //建立生產者與mq之間的連線
    $conf['host'], $conf['port'], $conf['user'], $conf['pwd'], $conf['vhost']
);
$channel = $conn->channel(); //在已連線基礎上建立生產者與mq之間的通道


$channel->exchange_declare($exchangeName, 'direct', false, true, false); //宣告初始化交換機
$channel->queue_declare($queueName, false, true, false, false); //宣告初始化一條佇列
$channel->queue_bind($queueName, $exchangeName, $routingKey); //將佇列與某個交換機進行繫結,並使用路由關鍵字

$msgBody = json_encode(["name" => "iGoo", "age" => 22]);
$msg = new AMQPMessage($msgBody, ['content_type' => 'text/plain', 'delivery_mode' => 2]); //生成訊息
$r   = $channel->basic_publish($msg, $exchangeName, $routingKey); //推送訊息到某個交換機
$channel->close();
$conn->close();

 php 消費者程式碼示例

<?php

  $bindingkey
='sms_send';   

  
//連線RabbitMQ   $conn_args = array( 'host'=>'127.0.0.1' , 'port'=> '5672', 'login'=>'kd_dev' , 'password'=> 'kd_dev','vhost' =>'/');   $conn = new AMQPConnection($conn_args);   $conn->connect();   
  
//設定queue名稱,使用exchange,繫結routingkey   $channel = new AMQPChannel($conn); // 宣告一個通道   $q = new AMQPQueue($channel); // 宣告一個佇列   $q->setName('kd_sms_send_q'); // 路由名
  $q
->setFlags(AMQP_DURABLE);
  $q
->declare();
  $q
->bind('kd_sms_send_ex',$bindingkey); // 佇列繫結交換機

  //訊息獲取   $messages = $q->get(AMQP_AUTOACK) ;   if ($messages){     var_dump(json_decode($messages->getBody(), true ));   }
  $conn
->disconnect();

 

轉自:https://www.cnblogs.com/jun-ma/p/4840869.html

通訊過程

假設P1和C1註冊了相同的Broker,Exchange和Queue。P1傳送的訊息最終會被C1消費。基本的通訊流程大概如下所示:

  1. P1生產訊息,傳送給伺服器端的Exchange
  2. Exchange收到訊息,根據ROUTINKEY,將訊息轉發給匹配的Queue1
  3. Queue1收到訊息,將訊息傳送給訂閱者C1
  4. C1收到訊息,傳送ACK給佇列確認收到訊息
  5. Queue1收到ACK,刪除佇列中快取的此條訊息

Consumer收到訊息時需要顯式的向rabbit broker傳送basic.ack訊息或者consumer訂閱訊息時設定auto_ack引數為true。在通訊過程中,佇列對ACK的處理有以下幾種情況:

  1. 如果consumer接收了訊息,傳送ack,rabbitmq會刪除佇列中這個訊息,傳送另一條訊息給consumer。
  2. 如果cosumer接受了訊息, 但在傳送ack之前斷開連線,rabbitmq會認為這條訊息沒有被deliver,在consumer在次連線的時候,這條訊息會被redeliver。
  3. 如果consumer接受了訊息,但是程式中有bug,忘記了ack,rabbitmq不會重複傳送訊息。
  4. rabbitmq2.0.0和之後的版本支援consumer reject某條(類)訊息,可以通過設定requeue引數中的reject為true達到目地,那麼rabbitmq將會把訊息傳送給下一個註冊的consumer。

php 生產者示例

先用 composer 載入 mq 拓展檔案

{ 
  "require": { 
    "php-amqplib/php-amqplib": "2.7.*" //增加這行 
  } 
}

 

<?php

require 'vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$conf = [
        'host' => '127.0.0.1',
        'port' => 5672,
        'user' => 'kd_dev',
        'pwd'  => 'kd_dev',
        'vhost' => '/',
];
$exchangeName = 'kd_sms_send_ex'; //交換機名
$queueName    = 'kd_sms_send_q'; //佇列名稱
$routingKey   = 'sms_send'; //路由關鍵字(也可以省略)

$conn = new AMQPStreamConnection( //建立生產者與mq之間的連線
    $conf['host'], $conf['port'], $conf['user'], $conf['pwd'], $conf['vhost']
);
$channel = $conn->channel(); //在已連線基礎上建立生產者與mq之間的通道


$channel->exchange_declare($exchangeName, 'direct', false, true, false); //宣告初始化交換機
$channel->queue_declare($queueName, false, true, false, false); //宣告初始化一條佇列
$channel->queue_bind($queueName, $exchangeName, $routingKey); //將佇列與某個交換機進行繫結,並使用路由關鍵字

$msgBody = json_encode(["name" => "iGoo", "age" => 22]);
$msg = new AMQPMessage($msgBody, ['content_type' => 'text/plain', 'delivery_mode' => 2]); //生成訊息
$r   = $channel->basic_publish($msg, $exchangeName, $routingKey); //推送訊息到某個交換機
$channel->close();
$conn->close();

 php 消費者程式碼示例

<?php

  $bindingkey
='sms_send';   

  
//連線RabbitMQ   $conn_args = array( 'host'=>'127.0.0.1' , 'port'=> '5672', 'login'=>'kd_dev' , 'password'=> 'kd_dev','vhost' =>'/');   $conn = new AMQPConnection($conn_args);   $conn->connect();   
  
//設定queue名稱,使用exchange,繫結routingkey   $channel = new AMQPChannel($conn); // 宣告一個通道   $q = new AMQPQueue($channel); // 宣告一個佇列   $q->setName('kd_sms_send_q'); // 路由名
  $q
->setFlags(AMQP_DURABLE);
  $q
->declare();
  $q
->bind('kd_sms_send_ex',$bindingkey); // 佇列繫結交換機

  //訊息獲取   $messages = $q->get(AMQP_AUTOACK) ;   if ($messages){     var_dump(json_decode($messages->getBody(), true ));   }
  $conn
->disconnect();