1. 程式人生 > >RabbitMQ客戶連線池的實現

RabbitMQ客戶連線池的實現

目前RabbitMQ官方給的出的客戶端傳送訊息的Demo的都是基於短連線來做的,例如:

ConnectionFactory cf = new ConnectionFactory();
cf.Uri = serverAddress;
using (IConnection conn = cf.CreateConnection()){
    using (IModel ch = conn.CreateModel()) {
        if (exchange != ""){
            ch.ExchangeDeclare(exchange, exchangeType);
        }
        ch.BasicPublish
(exchange,routingKey,null,Encoding.UTF8.GetBytes(message)); } }

我們剛開始也是採用這種方式來實現的,但做壓力測試時,發現這種每次新建Connection和新建Channel是非常耗時的,在大併發下,一般都要8毫秒左右,慢的話,好多都是幾十毫秒,為此,我專門查了資料,得出如下結論:

1、Rabbit Client提供的連線方式介紹:

RabbitMQ官方提供了:
Connection物件,就是一個TCP連線物件。
Channels物件,虛擬連線。虛擬連線建立在上面Connection物件的TCP連線中。資料流動都是在Channel中進行的。每個Connection物件的虛擬連線也是有限的,如果單個Connnection的Channel物件超出指定範圍了,也會有效能問題,另外一個TCP連線上的多個虛擬連線,實際在傳輸資料時,傳輸資料的虛擬連線還是獨佔了TCP連線,其它虛擬連線在排隊等待。

2、RabbitMQ官方推薦客戶端連線使用方式:

在一個Connection物件上建立多個Channel,然後程式傳送資料時,分別共享使用建立好的Channel,但使用具體的單個Channel時,需要保障單個Chanel的執行緒獨佔性使用,不要讓多個執行緒同時在使用某一個Channel,這樣會導致併發錯誤。使用單個Channel時,加一個鎖即可解決,具體官方文件說明和程式碼示例如下:
Sharing Channels Between Threads
As a rule of thumb, IModel instances should not be used by more than one thread simultaneously: application code should maintain a clear notion of thread ownership for IModel instances. If more than one thread needs to access a particular IModel instances, the application should enforce mutual exclusion itself. One way of achieving this is for all users of an IModel to lock the instance itself:

IModel ch = RetrieveSomeSharedIModelInstance();
lock (ch) {
ch.BasicPublish(…);
}
這裡寫圖片描述

3、RabbitMQ Client客戶端目前存在的問題。

3.1、Connection物件建立、銷燬的耗時問題,管理問題:

Connection物件建立的是TCP連線,TCP連線的建立和銷燬本身就是很耗時,解決的辦法就是建立一個Connection物件之後,一直不關閉,在傳輸資料時,共享這一個Connection物件就行了,MQ官方也是推薦一個Connection物件上建立多個Chnanel來實現快速資料傳輸,具體實現方式,建立一個靜態的Connection物件不就搞定了,但這種方式問題是,如果我需要連線多個MQ伺服器呢,如果我這邊根據業務進行了劃分,不同的業務資料,需要分別傳輸到不同的MQ伺服器上,如訂單資料和聊天資料,是分別發到不同的MQ伺服器上面,如果只是有一個靜態的Connection物件,怎麼可能同時連線2臺伺服器呢,另外的問題,如果現在客戶端傳送資料量較大,一個Connection實在是傳輸不過來,而且Connection雖說是可以給大家共享傳輸,但具體傳輸時,還是某一個具體傳輸Channel會獨佔整個Connection中的TCP連線,這樣傳輸量一大,Connection忙不過來,還是會導致擁塞的發生。
解決辦法:建立Connection池,如果不同的伺服器,會分別有不同的Connection物件,如果一個Connection物件傳輸不過來,會有多個Connection物件同時在傳輸資料。連線池在建立多個連線之後,如果某個連線閒置時間超過指定的時間,則連線池會進行單個連線的dispose和remove動作,將連線先銷燬,再從連線池中移除,確保不會長期佔用無效連線。

3.2、Channel物件的建立、銷燬的耗時問題,管理問題:

Channel物件,即Connection物件上的TCP連線上的軟連線,我們程式具體使用的就是Channel物件進行資料傳輸,我自己記錄了Channel物件建立和銷燬的耗時,也是非常長的,為什麼Channel物件的建立和銷燬會非常耗時,我仔細查了MQ Clinet的原始碼,發現建立和銷燬Channel時,MQ Client分別向MQ伺服器傳送了2次資料,用於分別通知MQ伺服器,當前這個Channel接下來資料傳輸時用的資料協議,報文格式,以及其它通訊相關的資訊。銷燬時,又重新向MQ伺服器傳送了資料,通知MQ伺服器,斷開這個Channel,並且釋放MQ伺服器上面關於這個Channel所佔用的資源。在平常情況下2次TCP資料傳輸,一般要耗時1毫秒左右,還可以接受,但在高併發下,2次TCP資料傳輸,則會很耗時,而且如果MQ伺服器壓力比較大,遲遲不響應客戶端請求,則客戶端會等待以及整個耗時會更長。而且MQ官方也是推薦共享使用Channel,而不是每次都建立和銷燬Channel。
現在問題來了,我怎麼才能實現共享Channel,我查了MQ Client原始碼,Connection物件中確實有集合在存放所有的Channel,但居然沒有提供方法讓我來使用和管理這裡面的Chanel,
具體的原因不詳,有興趣的朋友可以自己在查一下MQ Client的原始碼。另外如果我建立多個Channel之後,如果不再使用的Channel在閒置時間超過指定的時間之後,如何銷燬呢,另外我查了原始碼,如果我們不設定Connection物件Channel池的長度,則一個Connection物件的Channel數量可以無限增加,因為Channel在傳輸時,實際上是獨佔TCP連線,如果Channel無限增加的話,會導致這個TCP擁塞,如果我設定了Channel池的長度,則我建立Channel的數量超過Channel池的長度,則MQ Client直接丟擲異常,提示Channel池長度越界(這是我從MQ Client原始碼中查到的),這樣的話,我建立Channel時,需要判斷Channel池的長度,防止越界,基於這些問題,我再在外面開發了一個Channel池,用於建立和管理單個Connection物件的Channel物件。如果Chnanel的池數量達到指定數量時,則會新建Connection物件,在新的Connection物件中建立Channel,如果Channel的閒置時間到達指定時間,則會在後臺銷燬這個Channel物件,如果一個Connection物件的的Channel全部被銷燬了並且Connection物件的閒置時間也到達了,則Connection物件也會被銷燬。

具體示例圖如下:

效能測試結果如下:我這邊自己用200執行緒,傳送了20萬個和60萬個訊息,做了壓力測試,另外還有心跳功能和閒置超過指定時間,主動dispose的功能。
Connection池數量:4,Channel池數量:15 總耗時:218.731秒,平均每秒傳送數量:913
這裡寫圖片描述