1. 程式人生 > >使用EasyNetQ組件操作RabbitMQ消息隊列服務

使用EasyNetQ組件操作RabbitMQ消息隊列服務

而不是 color arch 我們 定義 -i 隨筆 easy htm

RabbitMQ是一個由erlang開發的AMQP(Advanved Message Queue)的開源實現,是實現消息隊列應用的一個中間件,消息隊列中間件是分布式系統中重要的組件,主要解決應用耦合,異步消息,流量削鋒等問題。實現高性能,高可用,可伸縮和最終一致性架構。是大型分布式系統不可缺少的中間件。EasyNetQ則是基於官方.NET組件RabbitMQ.Client 的又一層封裝,使用起來更加方便。本篇隨筆主要大概介紹下RabbitMQ的基礎知識和環境的準備,以及使用EasyNetQ的相關開發調用。

1、RabbitMQ基礎知識

AMQP,即Advanced Message Queuing Protocol,高級消息隊列協議,是應用層協議的一個開放標準,為面向消息的中間件設計。消息中間件主要用於組件之間的解耦,消息的發送者無需知道消息使用者的存在,反之亦然。

AMQP的主要特征是面向消息、隊列、路由(包括點對點和發布/訂閱)、可靠性、安全。
RabbitMQ 是一個開源的AMQP實現,服務器端用Erlang語言編寫,支持多種客戶端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用於在分布式系統中存儲轉發消息,在易用性、擴展性、高可用性等方面表現不俗。

RabbitMQ的特點強大的應用程序消息傳遞;使用方便;運行在所有主要操作系統上;支持大量開發人員平臺;開源商業支持。消息隊列的模式有兩種模式:P2P(Point to Point),P2P模式包含三個角色:消息隊列(Queue),發送者(Sender),接收者(Receiver)。每個消息都被發送到一個特定的隊列,接收者從隊列中獲取消息。隊列保留著消息,直到他們被消費或超時。Publish/Subscribe(Pub/Sub),包含三個角色主題(Topic),發布者(Publisher),訂閱者(Subscriber) 。多個發布者將消息發送到Topic,系統將這些消息傳遞給多個訂閱者。

EasyNetQ 的目標是提供一個使.NET中的RabbitMQ盡可能簡單的庫。在EasyNetQ中消息應由.NET類型表示,消息應通過其.NET類型進行路由。EasyNetQ按消息類型進行路由。發布消息時,EasyNetQ會檢查其類型,並根據類型名稱,命名空間和裝配體給出一個路由密鑰。在消費方面,用戶訂閱類型。訂閱類型後,該類型的消息將路由到訂戶。默認情況下,EasyNetQ使用Newtonsoft.Json庫將.NET類型序列化為JSON。這具有消息是人類可讀的優點,因此您可以使用RabbitMQ管理應用程序等工具來調試消息問題。

EasyNetQ是在RabbitMQ.Client庫之上提供服務的組件集合。這些操作可以像序列化,錯誤處理,線程編組,連接管理等。它們由mini-IoC容器組成。您可以輕松地用自己的實現替換任何組件。因此,如果您希望XML序列化而不是內置的JSON,只需編寫一個ISerializer的實現並將其註冊到容器。

以下是官方提供的一個結構圖,這個結構圖可以很好的解析該組件的結構:

技術分享圖片

2、RabbitMQ的環境準備

本處主要介紹在Windows系統中安裝RabbitMQ。

1. 下載安裝erlang

下載地址 http://www.erlang.org/downloads(根據操作系統選擇32還64位)

2. 下載安裝rabbitmq-server

下載地址 http://www.rabbitmq.com/install-windows.html

下載後獲得兩個安裝文件,按照順序安裝即可

技術分享圖片

安裝erlang環境後,一般會添加了一個ERLANG_HOME的系統變量,指向erlang的安裝目錄路徑,如下所示(一般都添加了,確認下

技術分享圖片

安裝RabbitMQ後,在程序裏面可以看到

技術分享圖片

我們使用它的命令行來啟動RabbitMQ的服務

查看安裝是否成功命令 :rabbitmqctl status

技術分享圖片

安裝成功,在瀏覽器中輸入 http://127.0.0.1:15672/,可以看到如下界面,使用默認的賬號密碼均為guest登陸進行管理

技術分享圖片

guest 賬號是管理員賬號,可以添加Exchanges,Queues,Admin。但我們一般不使用guest賬號,可以選擇用命令來添加賬號和權限,也可以使用管理界面進行添加相應的內容。

例如我添加相應的用戶賬號

技術分享圖片

一般我們還需要添加虛擬機,默認的虛擬機為/,我這裏添加了一個虛擬機myvhost。

技術分享圖片

然後綁定賬號到虛擬機上即可。

技術分享圖片

3、EasyNetQ組件的使用

EasyNetQ組件的使用方式比較簡單,跟很多組件都類似,例如:建立連接,進行操作做等等,對於EasyNetQ組件也是如此。

在.NET中使用EasyNetQ,要求至少基於 .NET4.5的框架基礎上進行開發,可以直接在VS項目上使用NuGet的程序包進行添加EasyNetQ的引用。

技術分享圖片

一般添加引用後,至少包含了下面圖示的幾個引用DLL。

技術分享圖片

1)創建連接:

使用EasyNetQ連接RabbitMQ,是在應用程序啟動時創建一個IBus對象,並且,在應用程序關閉時釋放該對象。

RabbitMQ連接是基於IBus接口的,當IBus中的方法被調用,連接才會開啟。創建一個IBus對象的方法如下:

var bus = RabbitHutch.CreateBus(“host=myServer;virtualHost=myVirtualHost;username=admin;password=123456”);

與RabbitMQ服務器的延遲連接由IBus接口表示,創建連接的方式連接字符串由格式為key = value的鍵/值對組成,每一個用分號(;)分隔。

  • host,host=localhost 或者host =192.168.1.102或者host=my.rabbitmq.com,如果用到集群配置的話,那麽可以用逗號將服務地址隔開,例如host=a.com,b.com,c.com
  • virtualHost,虛擬主機,默認為‘/‘
  • username,用戶登錄名
  • password,用戶登錄密碼
  • requestedHeartbeat,心跳設置,默認是10秒
  • prefetchcount,默認是50
  • pubisherConfirms,默認為false
  • persistentMessages,消息持久化,默認為true
  • product,產品名
  • platform,平臺
  • timeout,默認為10秒

一般我們在代碼裏面測試的話,簡化連接代碼如下所示。

 //初始化bus對象
 bus = RabbitHutch.CreateBus("host=localhost");

2關閉連接:

bus.Dispose();

要關閉連接,只需簡單地處理總線,這將關閉EasyNetQ使用的連接,渠道,消費者和所有其他資源。

如果我們在Winform窗體裏面初始化一個IBus對象,那麽在窗體關閉的時候,關閉這個接口即可。

        private void FrmPublisher_FormClosed(object sender, FormClosedEventArgs e)
        {
            //關閉IBus接口
            if(bus != null)
            {
                bus.Dispose();
            }
        }

3發布消息:

EasyNetQ支持最簡單的消息模式是發布和訂閱。發布消息後,任意消費者可以訂閱該消息,也可以多個消費者訂閱。並且不需要額外配置。首先,如上文中需要先創建一個IBus對象,然後,在創建一個可序列化的.NET對象。調用Publish方法即可。

var message = new MyMessage { Text = "Hello Rabbit" };
bus.Publish(message);

4訂閱消息:

EasyNetQ提供了消息訂閱,當調用Subscribe方法時候,EasyNetQ會創建一個用於接收消息的隊列,不過與消息發布不同的是,消息訂閱增加了一個參數,subscribe_id.代碼如下:

bus.Subscribe<MyMessage>("my_subscription_id", msg => Console.WriteLine(msg.Text));

第一個參數是訂閱id,另外一個是delegate參數,用於處理接收到的消息。這裏要註意的是,subscribe_id參數很重要,假如開發者用同一個subscribeid訂閱了同一種消息類型兩次或者多次,RabbitMQ會以輪訓的方式給每個訂閱的隊列發送消息。接收到之後,其他隊列就接收不到該消息。如果用不同的subscribeid訂閱同一種消息類型,那麽生成的每一個隊列都會收到該消息。

需要註意的是,在收到消息處理消息時候,不要占用太多的時間,會影響消息的處理效率,所以,遇到占用長時間的處理方法,最好用異步處理。

為了測試發布和訂閱消息,我們可以建立幾個不同的項目來進行測試,如發布放在一個Winform項目,訂閱放在一個Winform項目,另外一個項目放置共享的消息對象定義,如下所示。

技術分享圖片

定義消息對象類如下所示。

    /// <summary>
    /// 定義的MQ消息類型
    /// </summary>
    public class TextMessage
    {
        public string Text { get; set; }
    }

然後在發布消息的Winform項目上創建一個處理的窗體,並添加如下代碼。

namespace MyRabbitMQ.Publisher
{
    /// <summary>
    /// 測試RabbitMQ消息隊列的發布
    /// </summary>
    public partial class FrmPublisher : DevExpress.XtraEditors.XtraForm
    {
        //構建一個IBus公用接口對象
        private IBus bus = null;

        public FrmPublisher()
        {
            InitializeComponent();

            //初始化bus對象
            bus = RabbitHutch.CreateBus("host=localhost");
            //對指定消息類型進行回應
            bus.Respond<MyRequest, MyResponse>(request => new MyResponse { Text = "Responding to: "+ request.Text});

            //收到消息後輸出到控制臺上顯示
            bus.Receive("my.queue", x => x
            .Add<MyMessage>(message => Console.WriteLine(message.ToJson()))
            .Add<MyOtherMessage>(message => Console.WriteLine(message.ToJson())));
        }

技術分享圖片

發布消息的處理代碼,如下代碼所示。

        private void btnSend_Click(object sender, EventArgs e)
        {
            if (bus != null)
            {
                bus.Publish(new TextMessage
                {
                    Text = this.txtContent.Text
                });
            }
        }

然後在創建一個類似窗體,用來訂閱消息的處理窗體,如下所示代碼和窗體。

namespace MyRabbitMQ.Subcriber
{   
    /// <summary>
    /// 測試RabbitMQ消息隊列的訂閱
    /// </summary>
    public partial class FrmSubcriber : DevExpress.XtraEditors.XtraForm
    {
        //構建一個IBus公用接口對象
        private IBus bus = null;

        public FrmSubcriber()
        {
            InitializeComponent();

            //初始化bus對象
            bus = RabbitHutch.CreateBus("host=localhost");
            if(bus != null)
            {
                //訂閱一個消息,並對接收到的消息進行處理,展示在控件上
                bus.Subscribe<TextMessage>("test", (msg) =>
                {
                    StringBuilder sb = new StringBuilder();
                    sb.AppendLine(msg.Text + "," + DateTime.Now.ToString());
                    sb.AppendLine(this.txtContent.Text);

                    this.txtContent.Invoke(new MethodInvoker(delegate()
                    {
                        this.txtContent.Text = sb.ToString();
                    }));
                });
            }

            //使用消息發送接口發送消息
            bus.Send("my.queue", new MyMessage { Text = "Hello Widgets!" });
            bus.Send("my.queue", new MyOtherMessage { Text = "Hello wuhuacong!" });
        }

技術分享圖片

發送請求獲取響應的代碼如下所示。

        private void btnRequest_Click(object sender, EventArgs e)
        {
            //定義請求消息的對象
            var request = new MyRequest()
            {
                Text = string.Format("請求消息,{0}", DateTime.Now)
            };

            //異步獲取請求消息的結果並進行處理,展示應答消息在窗體中的
            var task = bus.RequestAsync<MyRequest, MyResponse>(request);
            task.ContinueWith(response =>
            {
                StringBuilder sb = new StringBuilder();
                sb.AppendLine(response.Result.Text);
                sb.AppendLine(this.txtContent.Text);
                this.txtContent.Invoke(new MethodInvoker(delegate()
                {
                    this.txtContent.Text = sb.ToString();
                }));
            });
        }

兩個項目聯合進行測試如下界面所示。

技術分享圖片

發布者多次發送消息的情況下,訂閱者中,會進行消息的輪訓處理,也就是進行均勻分配。

5)消息發送(Send)和接收(Receive)

與Publish/Subscribe略有不同的是,Send/Receive 可以自己定義隊列名稱。

//發送端代碼
bus.Send("my.queue", new MyMessage{ Text = "Hello Widgets!" });

//接收端代碼
bus.Receive<MyMessage>("my.queue", message => Console.WriteLine("MyMessage: {0}", message.Text));

並且,也可以在同一個隊列上發送不同的消息類型,Receive方法可以這麽寫:

bus.Receive("my.queue", x => x
    .Add<MyMessage>(message => deliveredMyMessage = message)
    .Add<MyOtherMessage>(message => deliveredMyOtherMessage = message));

如果消息到達隊列,但是沒有發現相應消息類型的處理時,EasyNetQ會發送一條消息到error隊列,並且,帶上一個異常信息:No handler found for message type <message type>。與Subscribe類型,如果在同一個隊列,同一個消息類型,多次調用Receive方法時,消息會通過輪詢的形式發送給每個Receive端。

6)遠程過程調用:

var request = new TestRequestMessage {Text = "Hello from the client! "};
bus.Request<TestRequestMessage, TestResponseMessage>(request, response => 
    Console.WriteLine("Got response: ‘{0}‘", response.Text));

7RPC服務器:

bus.Respond<TestRequestMessage, TestResponseMessage>(request => 
    new TestResponseMessage{ Text = request.Text + " all done!" });

8記錄器:

var logger = new MyLogger() ;
var bus = RabbitHutch.CreateBus(“my connection string”, x => x.Register<IEasyNetQLogger>(_ => logger));

9路由:

Publish方法,可以加一個topic參數。

bus.Publish(message, "X.A");

 消息訂閱方可以通過路由來過濾相應的消息。

  * 匹配一個字符

  #匹配0個或者多個字符

  所以 X.A.2 會匹配到 "#", "X.#", "*.A.*" 但不會匹配 "X.B.*" 或者 "A". 當消息訂閱需要用到topic時候,需要調用Subscribe的重載方法

bus.Subscribe("my_id", handlerOfXDotStar, x => x.WithTopic("X.*"));
bus.Subscribe("my_id", handlerOfStarDotB, x => x.WithTopic("*.B"));

上述這種方式,會將消息輪詢發送給兩個訂閱者,如果只需要一個訂閱者的話,可以這麽調用:

bus.Subscribe("my_id", handler, x => x.WithTopic("X.*").WithTopic("*.B"));

RabbitMQ具有非常好的功能,基於主題的路由,允許訂閱者基於多個標準過濾消息。*(星號)匹配一個字。#(哈希)匹配為零個或多個單詞。

RabbitMQ的應用場景,一般在快速處理訂單,以及異步的多任務處理中可以得到很好的體現,下面是幾個應用場景。

郵件和短消息的處理

技術分享圖片

訂單的解耦處理

技術分享圖片

RabbitMQ的服務器架構

技術分享圖片

3、RabbitMQ查詢狀態出現錯誤的處理

安裝成功之後使用rabbitmqctl status命令之後出現如下錯誤。

Status of node rabbit@WUHUACONG ...
Error: unable to perform an operation on node ‘rabbit@WUHUACONG‘. Please see diagnostics information and suggestions below.

Most common reasons for this are:

 * Target node is unreachable (e.g. due to hostname resolution, TCP connection or firewall issues)
 * CLI tool fails to authenticate with the server (e.g. due to CLI tool‘s Erlang cookie not matching that of the server)
 * Target node is not running

In addition to the diagnostics info below:

 * See the CLI, clustering and networking guides on http://rabbitmq.com/documentation.html to learn more
 * Consult server logs on node rabbit@WUHUACONG

DIAGNOSTICS
===========

attempted to contact: [rabbit@WUHUACONG]

rabbit@WUHUACONG:
  * connected to epmd (port 4369) on WUHUACONG
  * epmd reports node ‘rabbit‘ uses port 25672 for inter-node and CLI tool traffic
  * TCP connection succeeded but Erlang distribution failed

  * Authentication failed (rejected by the remote node), please check the Erlang cookie


Current node details:
 * node name: rabbitmqcli100@WUHUACONG
 * effective user‘s home directory: C:\Users\Administrator
 * Erlang cookie hash: RgaUM2coc+rxIhJrfLS7Jw==

這個問題出現比較常見,主要原因是兩個目錄的.erlang.cookie文件內容不一樣。

要確保.erlang.cookie文件的一致性,不知道什麽原因導致了C:\Users\{UserName}\.erlang.cookie和默認情況下C:\WINDOWS\System32\config\systemprofile\.erlang.cookie不一致了,將Windows目錄下的拷貝到用戶目錄下就可以了。

反正無論如何,兩個地址的Cookie內容一致就可以了,然後重啟下RabbitMQ服務器即可正常運行,並可以正常獲取它的狀態信息。

使用EasyNetQ組件操作RabbitMQ消息隊列服務