1. 程式人生 > >RabbitMQ 高階指南:從配置、使用到高可用叢集搭建

RabbitMQ 高階指南:從配置、使用到高可用叢集搭建

博主說:在專案中,通過 RabbitMQ,咱們可以將一些無需即時返回且耗時的操作提取出來,進行非同步處理,而這種非同步處理的方式大大的節省了伺服器的請求響應時間,從而提高了系統的吞吐量。

正文

3

1 RabbitMQ 簡介

1.1 介紹

  RabbitMQ 是一個由 erlang 開發的基於 AMQP(Advanced Message Queue)協議的開源實現。用於在分散式系統中儲存轉發訊息,在易用性、擴充套件性、高可用性等方面都非常的優秀,是當前最主流的訊息中介軟體之一。RabbitMQ 官網:http://www.rabbitmq.com

1.2 AMQP

  AMQP 是應用層協議的一個開放標準,為面向訊息的中介軟體設計。訊息中介軟體主要用於元件之間的解耦,訊息的傳送者無需知道訊息使用者的存在,同樣,訊息使用者也不用知道傳送者的存在。AMQP 的主要特徵是面向訊息、佇列、路由(包括點對點和釋出/訂閱)、可靠性、安全。

1.3 系統架構

4

訊息佇列的使用過程大概如下:

  • 客戶端連線到訊息佇列伺服器,開啟一個 channel;
  • 客戶端宣告一個 exchange,並設定相關屬性;
  • 客戶端宣告一個 queue,並設定相關屬性;
  • 客戶端使用 routing key,在 exchange 和 queue 之間建立好繫結關係;
  • 客戶端投遞訊息到 exchange,exchange 接收到訊息後,就根據訊息的 key 和已經設定的 binding,進行訊息路由,將訊息投遞到一個或多個佇列裡。

如上圖所示:AMQP 裡主要說兩個元件,Exchange 和 Queue。綠色的X就是 Exchange ,紅色的是 Queue ,這兩者都在 Server 端,又稱作 Broker,這部分是 RabbitMQ 實現的,而藍色的則是客戶端,通常有 Producer 和 Consumer 兩種型別。

1.4 幾個概念

  • P: 為 Producer,資料的傳送方;
  • C:為 Consumer,資料的接收方;
  • Exchange:訊息交換機,它指定訊息按什麼規則,路由到哪個佇列;
  • Queue:訊息佇列載體,每個訊息都會被投入到一個或多個佇列;
  • Binding:繫結,它的作用就是把 exchange 和 queue 按照路由規則繫結起來;
  • Routing Key:路由關鍵字,exchange 根據這個關鍵字進行訊息投遞;
  • vhost:虛擬主機,一個 broker 裡可以開設多個 vhost,用作不同使用者的許可權分離;
  • channel:訊息通道,在客戶端的每個連線裡,可建立多個 channel,每個 channel 代表一個會話任務。

2 RabbitMQ 安裝與配置

2.1 安裝

RabbitMQ 是建立在強大的 Erlang OTP 平臺上,因此安裝 RabbitMQ 之前要先安裝 Erlang.

注意

  • 現在先別裝最新的 3.6.3 ,本人在安裝完最新的版本,queue 佇列有問題,降到 3.6.2 則問題解決。
  • 預設安裝的RabbitMQ監聽埠是:5672

2.2 配置

(1)安裝完以後 erlang 需要手動設定ERLANG_HOME的系統變數。

輸入:set ERLANG_HOME=C:\Program Files\erl8.0

1

(2)啟用 RabbitMQ’s Management Plugin

使用 Rabbit MQ 管理外掛,可以更好的視覺化方式檢視 Rabbit MQ 伺服器例項的狀態,咱們可以在命令列中使用下面的命令啟用。

輸入rabbitmq-plugins.bat enable rabbitmq_management

2

同時,也使用 rabbitmqctl 控制檯命令(位於 rabbitmq_server-3.6.3\sbin>)來建立使用者、密碼、繫結許可權等。

(3)建立管理使用者

輸入:rabbitmqctl.bat add_user zhangweizhong weizhong1988

4

(4)設定管理員

輸入:rabbitmqctl.bat set_user_tags zhangweizhong administrator

5

(5)設定許可權

輸入:rabbitmqctl.bat set_permissions -p / zhangweizhong ".*" ".*" ".*"

6

(6)其它命令

  • 查詢使用者: rabbitmqctl.bat list_users
  • 查詢 vhosts: rabbitmqctl.bat list_vhosts
  • 啟動 RabbitMQ 服務: net stop RabbitMQ && net start RabbitMQ

以上這些,賬號、vhost、許可權、作用域等基本就設定完啦!

2.3 Rabbit MQ管理後臺

使用瀏覽器開啟http://localhost:15672訪問 RabbitMQ 的管理控制檯,使用剛才建立的賬號登陸系統即可。RabbitMQ 管理後臺,可以更好的視覺化方式檢視 RabbitMQ 伺服器例項的狀態。

7

2.4 建立 vhosts

建立 vhosts,在 admin 頁面,點選右側 Virtual Hosts:

8

將剛建立的 OrderQueue 分配給相關使用者:

9

其它建立 exchange 、queue 大家自己在後臺建立吧,這裡不再贅述。

3 C# 如何使用 RabbitMQ

3.1 客戶端

RabbitMQ.Client 是 RabbitMQ 官方提供的的客戶端,net 版本地址 為:http://www.rabbitmq.com/dotnet.html

EasyNetQ 是基於 RabbitMQ.Client 基礎上封裝的開源客戶端,使用非常方便,地址為:http://easynetq.com/

本篇使用示例程式碼下載地址:

demo 示例下載http://files.cnblogs.com/files/zhangweizhong/Weiz.RabbitMQ.RPC.rar

RabbitMQ 還有很多其它客戶端 API,都非常好用。我們一直用的都是 EasyNetQ,所以這裡的 demo 只介紹 EasyNetQ 客戶端實現。

3.2 專案結構

10

說明:前面咱們提到過,RabbitMQ 由 Producer(生成者) 和 Consumer(消費者)兩部分組成。Weiz.Consumer 就是 Consumer(消費者),Weiz. Producer 為 Producer(生成者),Weiz.MQ 為訊息佇列的通用處理類庫。

3.3 專案搭建

(1)Weiz.MQ 專案,訊息佇列的通用處理類庫,用於正在的訂閱和釋出訊息。

  • 通過 nuget 安裝專案 EasyNetQ 相關元件;
  • 增加 BusBuilder.cs 管道建立類,主要負責連結 RabbitMQ.
using System;
using System.Configuration;
using EasyNetQ;

namespace Weiz.MQ
{
    /// <summary>
    /// 訊息伺服器聯結器
    /// </summary>
    public class BusBuilder
    {
        public static IBus CreateMessageBus()
        {
            // 訊息伺服器連線字串
            // var connectionString = ConfigurationManager.ConnectionStrings["RabbitMQ"];
            string connString = "host=192.168.98.107:5672;virtualHost=OrderQueue;username=zhangweizhong;password=weizhong1988";
            if (connString == null || connString == string.Empty)
            {
                throw new Exception("messageserver connection string is missing or empty");
            }

            return RabbitHutch.CreateBus(connString);
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 增加 IProcessMessage 類,定義了一個訊息方法,用於訊息傳遞
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace Weiz.MQ
{
    public interface IProcessMessage
    {
        void ProcessMsg(Message msg);
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 增加 Message 類,定義了訊息傳遞的實體屬性欄位等資訊
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace Weiz.MQ
{
    public class Message
    {
        public string MessageID { get; set; }

        public string MessageTitle { get; set; }

        public string MessageBody { get; set; }

        public string MessageRouter { get; set; }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 增加 MQHelper 類,用於正在的訂閱和釋出訊息
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Configuration;

using EasyNetQ;

namespace Weiz.MQ
{
    public class MQHelper
    {
        /// <summary>
        /// 傳送訊息
        /// </summary>
        public static void Publish(Message msg)
        {
            //// 建立訊息bus
            IBus bus = BusBuilder.CreateMessageBus();

            try
            {
                bus.Publish(msg, x => x.WithTopic(msg.MessageRouter));
            }
            catch (EasyNetQException ex)
            {
                //處理連線訊息伺服器異常 
            }

            bus.Dispose();//與資料庫connection類似,使用後記得銷燬bus物件
        }

        /// <summary>
        /// 接收訊息
        /// </summary>
        /// <param name="msg"></param>
        public static void Subscribe(Message msg, IProcessMessage ipro)
        {
            //// 建立訊息bus
            IBus bus = BusBuilder.CreateMessageBus();

            try
            {
                bus.Subscribe<Message>(msg.MessageRouter, message => ipro.ProcessMsg(message), x => x.WithTopic(msg.MessageRouter));

            }
            catch (EasyNetQException ex)
            {
                //處理連線訊息伺服器異常 
            }
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53

(2)RabbitMQ 由 Producer(生成者)

  • 建立一個 aspx 頁面,增加如下程式碼
using System;
using System.Collections.Generic;
using System.Linq;
using System.Web;
using System.Web.UI;
using System.Web.UI.WebControls;

using Weiz.MQ;

namespace Weiz.Producer
{
    public partial class TestMQ : System.Web.UI.Page
    {
        protected void Page_Load(object sender, EventArgs e)
        {

        }

        protected void Button1_Click(object sender, EventArgs e)
        {
            Message msg = new Message();
            msg.MessageID = "1";
            msg.MessageBody = DateTime.Now.ToString();
            msg.MessageTitle = "1";
            msg.MessageRouter = "pcm.notice.zhangsan";
            MQHelper.Publish(msg);

        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

(3)Weiz.Consumer 就是 Consumer(消費者)

  • 新增 OrderProcessMessage.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Web;

namespace Weiz.Consumer
{
    public class OrderProcessMessage:MQ.IProcessMessage
    {
        public void ProcessMsg(MQ.Message msg)
        {
            Console.WriteLine(msg.MessageBody);
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • Program 增加如下程式碼
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Weiz.Consumer
{
    class Program
    {
        static void Main(string[] args)
        {
            OrderProcessMessage order = new OrderProcessMessage();
            MQ.Message msg = new MQ.Message();
            msg.MessageID = "1";
            msg.MessageRouter = "pcm.notice.zhangsan";

            MQ.MQHelper.Subscribe(msg, order);
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

3.4 執行

(1)啟動 Weiz.Consumer(消費者),啟動消費者,會自動在 RabbitMQ 伺服器上建立相關的 exchange 和 queue.

11

Consumer 消費者,使用的是 Subscribe(訂閱)的模式,所以,Weiz.Consumer 客戶端啟動後,會自動建立 connection,生成相關的 exchange 和 queue.

(2)啟動 Weiz. Producer 裡的 TestMQ.aspx 頁面,往佇列裡面寫一條訊息,訂閱的消費者立馬就能拿到這條訊息。

12

至此,C# 向 RabbitMQ 訊息佇列傳送訊息已經簡單完成。

4 四種 Exchange 模式

  AMQP 協議中的核心思想就是:生產者和消費者隔離,生產者從不直接將訊息傳送給佇列。生產者通常不知道是否一個訊息會被髮送到佇列中,只是將訊息傳送到一個交換機。先由 Exchange 來接收,然後 Exchange 按照特定的策略轉發到 Queue 進行儲存。同理,消費者也是如此。Exchange 就類似於一個交換機,轉發各個訊息分發到相應的佇列中。

  RabbitMQ 提供了四種 Exchange 模式:fanout、direct、topic、header. 由於 header 模式在實際使用中較少,因此本節只對前三種模式進行比較。

4.1 Fanout Exchange

13

所有傳送到 Fanout Exchange 的訊息都會被轉發到與該 Exchange 繫結(Binding)的所有 Queue 上。

Fanout Exchange 不需要處理 RouteKey,只需要簡單的將佇列繫結到 exchange 上,這樣傳送到 exchange 的訊息都會被轉發到與該交換機繫結的所有佇列上。類似子網廣播,每臺子網內的主機都獲得了一份複製的訊息。

所以,Fanout Exchange 轉發訊息是最快的。

/// <summary>
/// 生產者
/// </summary>
/// <param name="change"></param>
private static void ProducerMessage(MyMessage msg)
{
    var advancedBus = CreateAdvancedBus();

    if (advancedBus.IsConnected)
    {
        var exchange = advancedBus.ExchangeDeclare("user", ExchangeType.Fanout);

        advancedBus.Publish(exchange, "", false, new Message<MyMessage>(msg));
    } else {
         Console.WriteLine("Can't connect");
    }
}

/// <summary>
/// 消費者
/// </summary>
private static void ConsumeMessage()
{
    var advancedBus = CreateAdvancedBus();
    var exchange = advancedBus.ExchangeDeclare("user", ExchangeType.Fanout);

    var queue = advancedBus.QueueDeclare("user.notice.wangwu");
    advancedBus.Bind(exchange, queue, "user.notice.wangwu");
    advancedBus.Consume(queue, registration =>
    {
        registration.Add<MyMessage>((message, info) => { Console.WriteLine("Body: {0}", message.Body); });
     });
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33

4.2 Direct Exchange

14

所有傳送到 Direct Exchange 的訊息被轉發到 RouteKey 中指定的 Queue.

Direct 模式可以使用 RabbitMQ 自帶的 Exchange:default Exchange ,因此不需要將 Exchange 進行任何繫結(binding)操作 。訊息傳遞時,RouteKey 必須完全匹配,才會被佇列接收,否則該訊息會被拋棄。

/// <summary>
/// 生產者
/// </summary>
/// <param name="change"></param>
private static void ProducerMessage(MyMessage msg)
{
    var advancedBus = CreateAdvancedBus();

    if (advancedBus.IsConnected)
    {
        var queue = advancedBus.QueueDeclare("user.notice.zhangsan");

        advancedBus.Publish(Exchange.GetDefault(), queue.Name, false, new Message<MyMessage>(msg));
     } else {
          Console.WriteLine("Can't connect");
     }

}

/// <summary>
/// 消費者
/// </summary>
private static void ConsumeMessage()
{
    var advancedBus = CreateAdvancedBus();

    var exchange = advancedBus.ExchangeDeclare("user", ExchangeType.Direct);

    var queue = advancedBus.QueueDeclare("user.notice.lisi");

    advancedBus.Bind(exchange, queue, "user.notice.lisi");

    advancedBus.Consume(queue, registration =>
    {
         registration.Add<MyMessage>((message, info) =>
         {
             Console.WriteLine("Body: {0}", message.Body);
          });
     });
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40

4.3 Topic Exchange

15

所有傳送到 Topic Exchange 的訊息被轉發到所有關心 RouteKey 中指定 Topic 的 Queue 上,Exchange 將 RouteKey 和某 Topic 進行模糊匹配。此時佇列需要繫結一個 Topic,可以使用萬用字元進行模糊匹配,符號#匹配一個或多個詞,符號*匹配不多不少一個詞。因此log.#能夠匹配到log.info.oa,但是log.* 只會匹配到log.error.

所以,Topic Exchange 使用非常靈活。

/// <summary>
/// 生產者
/// </summary>
/// <param name="change"></param>
private static void ProducerMessage(MyMessage msg)
{
    //// 建立訊息bus
    IBus bus = CreateBus();

    try {
        bus.Publish(msg, x => x.WithTopic(msg.MessageRouter));
    } catch (EasyNetQException ex) {
          //處理連線訊息伺服器異常 
    }

    bus.Dispose();//與資料庫connection類似,使用後記得銷燬bus物件
}

/// <summary>
/// 消費者
/// </summary>
private static void ConsumeMessage(MyMessage msg)
{
    //// 建立訊息bus
    IBus bus = CreateBus();

    try {
        bus.Subscribe<MyMessage>(msg.MessageRouter, message => Console.WriteLine(msg.MessageBody), x => x.WithTopic("user.notice.#"));
     } catch (EasyNetQException ex) {
         //處理連線訊息伺服器異常 
     }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

這個是 RabbitMQ 的實際使用的幾個場景,熟悉了這個,基本上 RabbitMQ 也就瞭解啦!官方教程:http://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html

至此,RabbitMQ 的幾種 Exchange 模式已經介紹完了,實際使用過程中,咱們應該根據不同的場景,來使用不同的 exchange 模式。

5 RPC 遠端過程呼叫

  在這一節中,主要講述 RabbitMQ RPC. 其實,RabbitMQ RPC 就是通過訊息佇列(Message Queue)來實現 RPC 的功能,就是客戶端向服務端傳送定義好的 Queue 訊息,其中攜帶的訊息就應該是服務端將要呼叫的方法的引數 ,並使用 Propertis 告訴服務端將結果返回到指定的 Queue.

5.1 RabbitMQ RPC 的特點

  • Message Queue 把所有的請求訊息儲存起來,然後處理,和客戶端解耦;
  • Message Queue 引入新的結點,系統的可靠性會受 Message Queue 結點的影響;
  • Message Queue 是非同步單向的訊息。傳送訊息設計成是不需要等待訊息處理的完成。

所以對於有同步返回需求的,Message Queue 是個不錯的方向。

5.2 普通 PRC 的特點

  • 同步呼叫,對於要等待返回結果/處理結果的場景,RPC 是可以非常自然直覺的使用方式,當然 RPC 也可以是非同步呼叫;
  • 由於等待結果,客戶端會有執行緒消耗。

如果以非同步 RPC 的方式使用,客戶端執行緒消耗可以去掉,但不能做到像訊息一樣暫存訊息請求,壓力會直接傳導到服務端。

5.3 適用場合說明

  • 希望同步得到結果的場合,RPC 合適。
  • 希望使用簡單,則 RPC;RPC 操作基於介面,使用簡單,使用方式模擬本地呼叫。非同步的方式程式設計比較複雜。
  • 不希望客戶端受限於服務端的速度等,可以使用 Message Queue.

5.4 RabbitMQ RPC工作流程

16

基本概念

  Callback queue 回撥佇列,客戶端向伺服器傳送請求,伺服器端處理請求後,將其處理結果儲存在一個儲存體中。而客戶端為了獲得處理結果,那麼客戶在向伺服器傳送請求時,同時傳送一個回撥佇列地址reply_to.

  Correlation id 關聯標識,客戶端可能會發送多個請求給伺服器,當伺服器處理完後,客戶端無法辨別在回撥佇列中的響應具體和那個請求時對應的。為了處理這種情況,客戶端在傳送每個請求時,同時會附帶一個獨有correlation_id屬性,這樣客戶端在回撥佇列中根據correlation_id欄位的值就可以分辨此響應屬於哪個請求。

流程說明

  • 當客戶端啟動的時候,它建立一個匿名獨享的回撥佇列;
  • 在 RPC 請求中,客戶端傳送帶有兩個屬性的訊息:一個是設定回撥佇列的 reply_to 屬性,另一個是設定唯一值的correlation_id屬性;
  • 將請求傳送到一個 rpc_queue 佇列中;
  • 伺服器等待請求傳送到這個佇列中來,當請求出現的時候,它執行他的工作並且將帶有執行結果的訊息傳送給 reply_to 欄位指定的佇列;
  • 客戶端等待回撥佇列裡的資料。當有訊息出現的時候,它會檢查correlation_id屬性,如果此屬性的值與請求匹配,將它返回給應用。

5.5 完整程式碼

(1)建立兩個控制檯程式,作為 RPC Server 和 RPC Client,引用 RabbitMQ.Client

(2) RPC Server

class Program {
    static void Main(string[] args) {
        var factory = new ConnectionFactory() {
            HostName ="localhost",VirtualHost ="OrderQueue",UserName ="zhangweizhong",Password ="weizhong1988",Port =5672
        };
        using(var connection = factory.CreateConnection())
        using(var channel = connection.CreateModel())
        {
            channel.QueueDeclare(queue:"rpc_queue",
                durable:false,
                exclusive:false,
                autoDelete:false,
                arguments:null);
            channel.BasicQos(0, 1, false);
            var consumer = new QueueingBasicConsumer(channel);
            channel.BasicConsume(queue:"rpc_queue",
                noAck:false,
                consumer:consumer);
            Console.WriteLine(" [x] Awaiting RPC requests");

            while (true) {
                string response = null;
                var ea = (BasicDeliverEventArgs) consumer.Queue.Dequeue();

                var body = ea.Body;
                var props = ea.BasicProperties;
                var replyProps = channel.CreateBasicProperties();
                replyProps.CorrelationId = props.CorrelationId;

                try {
                    var message = Encoding.UTF8.GetString(body);
                    int n = int.Parse(message);
                    Console.WriteLine(" [.] fib({0})", message);
                    response = fib(n).ToString();
                } catch (Exception e) {
                    Console.WriteLine(" [.] " + e.Message);
                    response = "";
                } finally {
                    var responseBytes = Encoding.UTF8.GetBytes(response);
                    channel.BasicPublish(exchange:"",
                            routingKey:props.ReplyTo,
                            basicProperties:replyProps,
                            body:responseBytes);
                    channel.BasicAck(deliveryTag:ea.DeliveryTag,
                            multiple:false);
                }
            }
        }
    }

    /// <summary>
    /// Assumes only valid positive integer input.
    /// Don't expect this one to work for big numbers,
    /// and it's probably the slowest recursive implementation possible.
    /// </summary>
    private static int fib(int n) {
        if (n == 0 || n == 1) {
            return n;
        }

        Thread.Sleep(1000 * 10);

        return n;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65

(3)RPC Client

class Program {
    static void Main(string[] args) {
        for (int i = 0; i < 10; i++) {
            Stopwatch watch = new Stopwatch();

            watch.Start();

            var rpcClient = new RPCClient();

            Console.WriteLine(string.Format(" [x] Requesting fib({0})", i));

            var response = rpcClient.Call(i.ToString());

            Console.WriteLine(" [.] Got '{0}'", response);

            rpcClient.Close();

            watch.Stop();

            Console.WriteLine(string.Format(" [x] Requesting complete {0} ,cost {1} ms", i, watch.Elapsed.TotalMilliseconds));
        }

        Console.WriteLine(" complete!!!! ");


        Console.ReadLine();
    }
}

class RPCClient {
    private IConnection connection;
    private IModel channel;
    private string replyQueueName;
    private QueueingBasicConsumer consumer;

    public RPCClient() {
        var factory = new ConnectionFactory() {
            HostName ="localhost",VirtualHost ="OrderQueue",UserName ="zhangweizhong",Password ="weizhong1988",Port =5672
        };
        connection = factory.CreateConnection();
        channel = connection.CreateModel();
        replyQueueName = channel.QueueDeclare().QueueName;
        consumer = new QueueingBasicConsumer(channel);
        channel.BasicConsume(queue:replyQueueName,
                noAck:true,
                consumer:consumer);
    }

    public string Call(string message) {
        var corrId = Guid.NewGuid().ToString();
        var props = channel.CreateBasicProperties();
        props.ReplyTo = replyQueueName;
        props.CorrelationId = corrId;

        var messageBytes = Encoding.UTF8.GetBytes(message);
        channel.BasicPublish(exchange:"",
                routingKey:"rpc_queue",
                basicProperties:props,
                body:messageBytes);

        while (true) {
            var ea = (BasicDeliverEventArgs) consumer.Queue.Dequeue();
            if (ea.BasicProperties.CorrelationId == corrId) {
                return Encoding.UTF8.GetString(ea.Body);
            }
        }
    }

    public void Close() {
        connection.Close();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72

(4)分別執行 Server 和 Client

5

6

5.6 官方教程及原始碼下載地址

  • 參照 RabbitMQ 官方教程的 RPC,地址:http://www.rabbitmq.com/tutorials/tutorial-six-dotnet.html
  • 本文原始碼下載地址為:http://files.cnblogs.com/files/zhangweizhong/Weiz.RabbitMQ.RPC.rar

6 RabbitMQ 高可用叢集

  RabbitMQ 是用 erlang 開發的,叢集非常方便,因為 erlang 天生就是一門分散式語言,但其本身並不支援負載均衡。Rabbit 模式大概分為以下三種:單一模式、普通模式和映象模式。

單一模式:最簡單的情況,非叢集模式。

  沒什麼好說的。

普通模式:預設的叢集模式。

  對於 Queue 來說,訊息實體只存在於其中一個節點,A、B 兩個節點僅有相同的元資料,即佇列結構。當訊息進入 A 節點的 Queue 中後,consumer 從 B 節點拉取時,RabbitMQ 會臨時在 A、B 間進行訊息傳輸,把 A 中的訊息實體取出並經過 B 傳送給 consumer. 所以 consumer 應儘量連線每一個節點,從中取訊息。即對於同一個邏輯佇列,要在多個節點建立物理 Queue。否則無論 consumer 連 A 或 B,出口總在 A,會產生瓶頸。該模式存在一個問題就是當 A 節點故障後,B 節點無法取到 A 節點中還未消費的訊息實體。如果做了訊息持久化,那麼得等 A 節點恢復,然後才可被消費;如果沒有持久化的話,然後就沒有然後了……

映象模式:把需要的佇列做成映象佇列,存在於多個節點,屬於 RabbitMQ 的 HA 方案。

  該模式解決了上述問題,其實質和普通模式不同之處在於,訊息實體會主動在映象節點間同步,而不是在 consumer 取資料時臨時拉取。該模式帶來的副作用也很明顯,除了降低系統性能外,如果映象佇列數量過多,加之大量的訊息進入,叢集內部的網路頻寬將會被這種同步通訊大大消耗掉,所,在對可靠性要求較高的場合中適用。

6.1 叢集中的基本概念

  RabbitMQ 的叢集節點包括記憶體節點、磁碟節點。顧名思義記憶體節點就是將所有資料放在記憶體,磁碟節點將資料放在磁碟。不過,如前文所述,如果在投遞訊息時,打開了訊息的持久化,那即使是記憶體節點,資料還是安全的放在磁碟。

  一個 RabbitMQ 叢集中可以共享 user、vhost、queue、exchange 等,所有的資料和狀態都是必須在所有節點上覆制的,一個例外是那些當前只屬於建立它的節點的訊息佇列,儘管它們可見且可被所有節點讀取。 RabbitMQ 節點可以動態地加入到叢集中,一個節點它可以加入到叢集中,也可以從叢集環叢集進行一個基本的負載均衡。

叢集中有兩種節點

  • 記憶體節點:只儲存狀態到記憶體(一個例外的情況是:持久的 queue 的持久內容將被儲存到 disk);
  • 磁碟節點:儲存狀態到記憶體和磁碟,記憶體節點雖然不寫入磁碟,但是它執行比磁碟節點要好,叢集中,只需要一個磁碟節點來儲存狀態 就足夠了,如果叢集中只有記憶體節點,那麼不能停止它們,否則所有的狀態,訊息等都會丟失。

思路

  那麼具體如何實現 RabbitMQ 高可用,咱們先搭建一個普通叢集模式,在這個模式基礎上再配置映象模式實現高可用,Rabbit 叢集前增加一個反向代理,生產者、消費者通過反向代理訪問 RabbitMQ 叢集。

架構圖

7

上述圖裡是 3 個 RabbitMQ 執行在同一主機上,分別用不同的服務埠。當然咱們的實際生產中,多個 RabbitMQ 肯定是執行在不同的物理伺服器上,否則就失去了高可用的意義。

6.2 叢集模式配置

設計架構可以如下:在一個叢集裡,有 4 臺機器,其中 1 臺使用磁碟模式,另 2 臺使用記憶體模式。2 臺記憶體模式的節點,無疑速度更快,因此客戶端(consumer、producer)連線訪問它們。而磁碟模式的節點,由於磁碟 IO 相對較慢,因此僅作資料備份使用,另外一臺作為反向代理。

四臺伺服器 hostname 分別為:queue 、panyuntao1、panyuntao2、panyuntao3(ip:172.16.3.110)

配置 RabbitMQ 叢集非常簡單,只需要幾個命令,配置步驟如下:

Step 1:queue、panyuntao1、panyuntao2 做為 RabbitMQ 叢集節點,分別安裝 RabbitMq-Server ,安裝後分別啟動 RabbitMq-server。

啟動命令# Rabbit-Server start,安裝過程及啟動命令參見:http://www.cnblogs.com/flat_peach/archive/2013/03/04/2943574.html

Step 2:在安裝好的 3 臺節點伺服器中,分別修改/etc/hosts檔案,指定queue、panyuntao1、panyuntao2 的 hosts,如

172.16.3.32 queue

172.16.3.107 panyuntao1

172.16.3.108 panyuntao2
  • 1
  • 2
  • 3
  • 4
  • 5

還有 hostname 檔案也要正確,分別是 queue、panyuntao1、panyuntao2,如果修改 hostname 建議在安裝 rabbitmq 前修改。請注意 RabbitMQ 叢集節點必須在同一個網段裡,如果是跨廣域網效果就差。

Step 3:設定每個節點 Cookie。RabbitMQ 的叢集是依賴於 erlang 的叢集來工作的,所以必須先構建起 erlang 的叢集環境。Erlang 的叢集中各節點是通過一個 magic cookie 來實現的,這個 cookie 存放在/var/lib/rabbitmq/.erlang.cookie中,檔案是 400 的許可權。所以必須保證各節點 cookie 保持一致,否則節點之間就無法通訊。

-r--------. 1 rabbitmq rabbitmq 20 3月 5 00:00 /var/lib/rabbitmq/.erlang.cookie
  • 1

將其中一臺節點上的.erlang.cookie值複製下來儲存到其他節點上,或者使用scp的方法也可,但是要注意檔案的許可權和屬主屬組。咱們這裡將 queue 中的 cookie 複製到 panyuntao1、panyuntao2 中,先修改下 panyuntao1、panyuntao2 中的.erlang.cookie許可權:

#chmod 777  /var/lib/rabbitmq/.erlang.cookie 
  • 1

將 queue 的/var/lib/rabbitmq/.erlang.cookie這個檔案,拷貝到 panyuntao1、panyuntao2 的同一位置(反過來亦可),該檔案是叢集節點進行通訊的驗證金鑰,所有節點必須一致,拷完後重啟下 RabbitMQ. 複製好後別忘記還原.erlang.cookie的許可權,否則可能會遇到錯誤:

#chmod 400 /var/lib/rabbitmq/.erlang.cookie 
  • 1

設定好 cookie 後先將三個節點的 RabbitMQ 重啟:

# rabbitmqctl stop
# rabbitmq-server start
  • 1
  • 2

Step 4:停止所有節點 RabbitMQ 服務,然後使用 detached 引數獨立執行,這步很關鍵,尤其增加節點停止節點後再次啟動遇到無法啟動都可以參照這個順序。

queue# rabbitmqctl stop
panyuntao1# rabbitmqctl stop
panyuntao2# rabbitmqctl stop

queue# rabbitmq-server -detached
panyuntao1# rabbitmq-server -detached
panyuntao2# rabbitmq-server -detached
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

分別檢視下每個節點:

queue# rabbitmqctl cluster_status
Cluster status of node [email protected] ...
[{nodes,[{disc,[[email protected]]}]},
{running_nodes,[[email protected]]},
{partitions,[]}]
...done.

panyuntao1# rabbitmqctl cluster_status 
Cluster status of node [email protected]
[{nodes,[{disc,[[email protected]]}]},
{running_nodes,[[email protected]]},
{partitions,[]}]
...done.

panyuntao2# rabbitmqctl cluster_status
Cluster status of node [email protected]
[{nodes,[{disc,[[email protected]]}]},
{running_nodes,[[email protected]]},
{partitions,[]}]
...done.
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

Step 5:將 panyuntao1、panyuntao2 作為記憶體節點與 queue 連線起來,在 panyuntao1 上,執行如下命令

panyuntao1# rabbitmqctl stop_app
panyuntao1# rabbitmqctl join_cluster --ram [email protected]   
panyuntao1# rabbitmqctl start_app

panyuntao2# rabbitmqctl stop_app
panyuntao2# rabbitmqctl join_cluster --ram [email protected]
panyuntao2# rabbitmqctl start_app
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

上述命令先停掉 RabbitMQ 應用,然後呼叫 cluster 命令,將 panyuntao1 連線到,使兩者成為一個叢集,最後重啟 RabbitMQ 應用。在這個 cluster 命令下,panyuntao1、panyuntao2 是記憶體節點,queue 是磁碟節點(RabbitMQ啟動後,預設是磁碟節點)。

queue 如果要使 panyuntao1 或 panyuntao2 在叢集裡也是磁碟節點,join_cluster命令去掉--ram引數即可。

 #rabbitmqctl join_cluster [email protected] 
  • 1

只要在節點列表裡包含了自己,它就成為一個磁碟節點。在 RabbitMQ 叢集裡,必須至少有一個磁碟節點存在。

Step 6:在 queue、panyuntao1、panyuntao2 上,執行cluster_status命令檢視叢集狀態。

[[email protected] ~]# rabbitmqctl cluster_status
Cluster status of node [email protected] ...
[{nodes,[{disc,[[email protected]]},{ram,[[email protected],[email protected]]}]},
{running_nodes,[[email protected],[email protected],[email protected]]},
{partitions,[]}]
...done.

[[email protected] rabbitmq]# rabbitmqctl cluster_status
Cluster status of node [email protected] ...
[{nodes,[{disc,[[email protected]]},{ram,[[email protected],[email protected]]}]},
{running_nodes,[[email protected],[email protected],[email protected]]},
{partitions,[]}]
...done.

[[email protected] rabbitmq]# rabbitmqctl cluster_status
Cluster status of node [email protected] ...
[{nodes,[{disc,[[email protected]]},{ram,[[email protected],[email protected]]}]},
{running_nodes,[[email protected],[email protected],[email protected]]},
{partitions,[]}]
...done.
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

這時咱們可以看到每個節點的叢集資訊,分別有兩個記憶體節點和一個磁碟節點。

Step 7:往任意一臺叢集節點裡寫入訊息佇列,會複製到另一個節點上,咱們會看到兩個節點的訊息佇列數一致。

[email protected] :~# rabbitmqctl list_queues -p hrsystem

Listing queues …
test_queue 10000
…done.

[email protected] :~# rabbitmqctl list_queues -p hrsystem
Listing queues …
test_queue 10000
…done.

[email protected]:~# rabbitmqctl list_queues -p hrsystem
Listing queues …
test_queue 10000
…done.
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

其中,-p引數為vhost名稱。

這樣RabbitMQ叢集就正常工作了,

這種模式更適合非持久化佇列,只有該佇列是非持久的,客戶端才能重新連線到叢集裡的其他節點,並重新建立佇列。假如該佇列是持久化的,那麼唯一辦法是將故障節點恢復起來。

為什麼 RabbitMQ 不將佇列複製到叢集裡每個節點呢?這與它的叢集的設計本意相沖突,叢集的設計目的就是增加更多節點時,能線性的增加效能(CPU、記憶體)和容量(記憶體、磁碟)。理由如下:

  1. storage space: If every cluster node had a full copy of every queue, adding nodes wouldn’t give you more storage capacity. For example, if one node could store 1GB of messages, adding two more nodes would simply give you two more copies of the same 1GB of messages.

  2. performance: Publishing messages would require replicating those messages to every cluster node. For durable messages that would require triggering disk activity on all nodes for every message. Your network and disk load would increase every time you added a node, keeping the performance of the cluster the same (or possibly worse).

當然 RabbitMQ 新版本叢集也支援佇列複製(有個選項可以配置)。比如在有五個節點的叢集裡,可以指定某個佇列的內容在 2 個節點上進行儲存,從而在效能與高可用性之間取得一個平衡。

6.3 映象模式配置

上面配置 RabbitMQ 預設叢集模式,但並不保證佇列的高可用性,儘管交換機、繫結這些可以複製到叢集裡的任何一個節點,但是佇列內容不會複製,雖然該模式解決一部分節點壓力,但佇列節點宕機直接導致該佇列無法使用,只能等待重啟,所以要想在佇列節點宕機或故障也能正常使用,就要複製佇列內容到叢集裡的每個節點,需要建立映象佇列。

下面,咱們看看如何映象模式來解決複製的問題,從而提高可用性 。

Step 1:增加負載均衡器

關於負載均衡器,商業的比如 F5 的 BIG-IP,Radware 的 AppDirector 是硬體架構的產品,可以實現很高的處理能力。但這些產品昂貴的價格會讓人止步,所以咱們還有軟體負載均衡方案。網際網路公司常用的軟體 LB 一般有 LVS、HAProxy、Nginx 等。LVS 是一個核心層的產品,主要在第四層負責資料包轉發,使用較複雜。HAProxy 和 Nginx 是應用層的產品,但 Nginx 主要用於處理 HTTP,所以這裡選擇 HAProxy 作為 RabbitMQ 前端的 LB.

HAProxy 的安裝使用非常簡單,在 CentOS 下直接yum install haproxy,然後更改/etc/haproxy/haproxy.cfg檔案即可,檔案內容大概如下:

13

負載均衡器會監聽 5672 埠,輪詢咱們的兩個記憶體節點172.16.3.107172.16.3.108的 5672 埠,172.16.3.32為磁碟節點,只做備份不提供給生產者、消費者使用,當然如果我們伺服器資源充足情況也可以配置多個磁碟節點,這樣磁碟節點除了故障也不會影響,除非同時出故障。

Step 2:配置策略

使用 Rabbit 映象功能,需要基於 RabbitMQ 策略來實現,政策是用來控制和修改群集範圍的某個 vhost 佇列行為和 Exchange 行為。在 cluster 中任意節點啟用策略,策略會自動同步到叢集節點:

# rabbitmqctl set_policy -p hrsystem ha-allqueue"^" '{"ha-mode":"all"}'
  • 1

這行命令在 vhost 名稱為 hrsystem 建立了一個策略,策略名稱為 ha-allqueue,策略模式為 all 即複製到所有節點,包含新增節點,策略正則表示式為“^”表示所有匹配所有佇列名稱。例如:

rabbitmqctl set_policy -p hrsystem ha-allqueue "^message" '{"ha-mode":"all"}'
  • 1

注意"^message"這個規則要根據自己修改,這個是指同步"message"開頭的佇列名稱,咱們配置時使用的應用於所有佇列,所以表示式為"^",官方set_policy說明參見:

set_policy [-p vhostpath] {name} {pattern} {definition} [priority]
  • 1

ha-mode

14

step3

建立佇列時需要指定ha引數,如果不指定x-ha-prolicy的話將無法複製。下面C#程式碼片段:

using ( var bus = RabbitHutch.CreateBus(ConfigurationManager .ConnectionStrings["RabbitMQ"].ToString())) {
    bus.Subscribe< TestMessage>("word_subscriber" , message => RunTable(message),x=>x.WithArgument("x-ha-policy" , "all"));
    Console.WriteLine("Subscription Started. Hit any key quit" );
    Console.ReadKey();
}
  • 1
  • 2
  • 3
  • 4
  • 5

客戶端使用負載伺服器172.16.3.110(panyuntao3)傳送訊息,佇列會被複制到所有節點,當然策略也可以配置制定某幾個節點,這時任何節點故障 、或者重啟將不會影響我們正常使用某個佇列。到這裡,咱們就完成了 RabbitMQ 的高可用配置(所有節點都宕機那沒有辦法了)。使用 RabbitMQ 管理端可以看到叢集映象模式中對列狀態:

15

溫馨提示:本文略有修改,如果大家對原文感興趣的話,可以進入部落格「章小魚」閱讀原文。