1. 程式人生 > >RabbitMQ及其.NET客戶端——幾個小例子

RabbitMQ及其.NET客戶端——幾個小例子

一、簡單生產者-消費者(使用direct交換器)

1、生產者

 

var factory = new ConnectionFactory();//例項化一個工廠
factory.HostName = "localhost";
factory.UserName = "honnnnl";
factory.Password = "honnnnl";

using (var connection = factory.CreateConnection())//用工廠建立聯結器
using (var channel = connection.CreateModel()) //建立通道
{
//在Rabbit服務上宣告訊息佇列。如果不存在,自動建立。
channel.QueueDeclare( queue: "test", //訊息佇列名稱 durable: false,//訊息佇列是否持久化 exclusive: false,//訊息佇列是否被本次連線connection獨享。(本次連線connection建立的通道可以共用).排外的queue在當前連線被斷開的時候會自動消失(清除)無論是否設定了持久化. autoDelete: false,//訊息佇列是否自動刪除。也就是說queue會清理自己,但是是在最後一個connection斷開的時候。 arguments: null);//引數對 //建立一條訊息,並轉為位元組陣列。 string message = "
Hello World"; var body = Encoding.UTF8.GetBytes(message); //釋出訊息。。 交換器,路由鍵, channel.BasicPublish("", "test", null, body);//注意路由鍵在用direct交換器時,要指定為佇列名 Console.WriteLine($"傳送: {message}"); }

 

2、消費者

var factory = new ConnectionFactory();//例項化聯結器建立工廠
factory.HostName = "localhost";
factory.UserName 
= "honnnnl"; factory.Password = "honnnnl"; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.QueueDeclare("test", false, false, false, null); var consumer = new EventingBasicConsumer(channel);//例項化一個事件型消費者 //訂閱消費者接收訊息的事件 consumer.Received += (model, ea) => { //獲取並解析資料 var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine($"收到: {message}"); }; //通道開始消費。。 訊息佇列名稱, 是否自動回覆響應, 消費者 channel.BasicConsume(queue: "test", autoAck: true, consumer: consumer); }

 

二、簡單任務佇列

1、任務釋出者

主要程式碼與第一節的生產者程式碼一樣。。只不過需要將發給工作者執行的任務放到訊息裡。

 

2、工作者

主要程式碼與第一節的消費者程式碼一樣。。只不過工作者要解析任務,執行任務。

預設RabbitMQ會將每個訊息按照順序依次分發給下一個消費者(工作者)。所以每個消費者接收到的訊息個數大致是平均的。 這種訊息分發的方式稱之為輪詢(round-robin)。

使用工作佇列的一個好處就是它能夠並行處理佇列。如果任務釋出的快工作者處理的慢,堆積了很多工,我們只需要新增更多的工作者(workers)——再開啟幾個工作者程序就可以了,擴充套件很簡單。

 

四、訊息響應

1、為什麼、如何進行訊息確認

當處理一個比較耗時得任務的時候,也許想知道消費者(consumers)是否執行到一半就掛掉。在上面的例子中,當RabbitMQ將訊息傳送給消費者(consumers)之後,馬上就會將該訊息從佇列中移除。此時,如果把處理這個訊息的工作者(worker)停掉,正在處理的這條訊息就會丟失。同時,所有傳送到這個工作者的還沒有處理的訊息都會丟失。

我們不想丟失任何任務訊息。如果一個工作者(worker)掛掉了,我們希望該訊息會重新發送給其他的工作者(worker)。

為了防止訊息丟失,RabbitMQ提供了訊息響應(acknowledgments)機制。也就是說消費者接收到的每一條訊息都必須進行確認。要麼消費者呼叫BasicAck()方法顯式地向RabbitMQ傳送一個確認,要麼當初在呼叫BasicConsume()開始消費訊息佇列時,將autoAct引數設定為true。總之消費者通過一個ack(響應),告訴RabbitMQ已經收到並處理了某條訊息,然後RabbitMQ才會釋放並刪除這條訊息。

  方式(1)寫程式碼呼叫BasicAck()方法

//在工作者程式碼,處理訊息後
channel.BasicAck(ea.DeliveryTag, false);//響應給RabbitMQ服務:收到並處理了訊息。

  方式(2)呼叫BasicConsume()時,將autoAct引數設定為true

  當autoAct設定為true時,一旦消費者接收到訊息,RabbitMQ自動視為其完成訊息確認。

 

如果消費者(consumer)掛掉前沒有傳送響應,RabbitMQ就會認為訊息沒有被完全處理,然後重新發送給其他消費者(consumer)。這樣,即使工作者(workers)偶爾的掛掉,也不會丟失訊息。

訊息是沒有超時這個概念的;當工作者與它斷開連的時候,RabbitMQ會重新發送訊息。這樣在處理一個耗時非常長的訊息任務的時候就不會出問題了。

訊息響應預設是開啟的。在之前的例子中使用了autoAck=True標識把它關閉。現在是時候移除這個標識了,當工作者(worker)完成了任務,就傳送一個響應。

1 //以下在工作者程式碼中
2 //通道開始消費。。     訊息佇列名稱,    是否自動回覆響應,      消費者
3 channel.BasicConsume(queue: "test", autoAck: false, consumer: consumer);

 

2、注意

(1)需要記住,消費者對訊息的確認(消費者對RabbitMQ)和告訴生產者訊息已經被接收(RabbitMQ對生產者),這兩件事毫不相干。

(2)如果消費者收到一條訊息,然後在確認之前從RabbitMQ斷開連線(或者從佇列上取消訂閱),RabbitMQ會認為這條訊息沒有分發成功,然後重新分發給下一個訂閱的消費者。如果你的應用程式(消費者)崩潰了,這種機制可以確保訊息會被髮送給另一個消費者進行處理。

  另一方面假如你的應用程式(消費者)存在bug,比如忘記確認訊息,RabbitMQ將不會再向該消費者傳送更多訊息(但不影響發給其他消費者)。這是因為在上一條訊息被確認前,RabbitMQ會認為這個消費者並沒有準備好接收下一條訊息。你可以好好利用這一點,如果處理訊息內容非常耗時,則你的應用程式應該處理完成再確認。這樣可以防止RabbitMQ持續不斷的訊息湧向你的應用程式(消費者)而導致過載。。。

 

3、拒絕消費訊息

你的消費者在收到訊息後,如果遇到某種問題無法處理訊息,但仍希望其他消費者處理它。你可以呼叫BasicReject()方法拒收該訊息,方法引數requeue設定為true,RabbitMQ會將訊息重新發送給下一個訂閱的消費者。如果設定為false,RabbitMQ立即從訊息佇列中移除它(成為死信),而不會把它發給新的消費者。

 

如果你的消費者收到一條訊息,檢測到訊息格式錯誤(意味著其他消費者也不可能解析處理),實際上你應該直接BasicAck()確認,而不做處理(應該記錄到錯誤日誌中或者報錯)。這麼幹(直接BasicAck()確認)確認比較節省大家時間、資源。(就不要再你拒收,再讓MQ發給其他消費者,其他消費者也得拒收... 這不是浪費時間麼。)

 

4、死信佇列dead letter

當你的消費者呼叫BasicReject()方法拒收訊息,且方法引數requeue設定為false時,RabbitMQ將訊息從訊息佇列中移除,並存入死信佇列(拒收或未送達的訊息)。死信佇列提供給消費者發現問題的途徑。

 

 

五、訊息持久化

前面已經搞定了即使消費者down掉,任務也不會丟失,但是,如果RabbitMQ Server停掉了,那麼這些訊息還是會丟失。當RabbitMQ Server 關閉或者崩潰,那麼裡面儲存的佇列和訊息預設是不會儲存下來的。

如果要讓RabbitMQ儲存住訊息,需要在兩個地方同時設定:需要保證佇列和訊息都是持久化的。

首先,要保證RabbitMQ不會丟失佇列,所以要做如下設定:

bool durable = true;
channel.QueueDeclare("hello", durable, false, false, null);
雖然在語法上是正確的,但是在目前階段是不正確的,因為我們之前已經定義了一個非持久化的hello佇列。RabbitMQ不允許我們使用不同的引數重新定義一個已經存在的同名佇列,如果這樣做就會報錯。現在,定義另外一個不同名稱的佇列:

bool durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);
queueDeclare 這個改動需要在傳送端和接收端同時設定。

現在保證了task_queue這個訊息佇列即使在RabbitMQ Server重啟之後,佇列不會丟失。 下面需要保證訊息也是持久化的, 這可以通過設定IBasicProperties.SetPersistent 為true來實現:

var properties = channel.CreateBasicProperties();
properties.SetPersistent(true);
需要注意的是,將訊息設定為持久化並不能完全保證訊息不丟失。雖然他告訴RabbitMQ將訊息儲存到磁碟上,但是在RabbitMQ接收到訊息和將其儲存到磁碟上這之間仍然有一個小的時間視窗。 RabbitMQ 可能只是將訊息儲存到了快取中,並沒有將其寫入到磁碟上。持久化是不能夠一定保證的,但是對於一個簡單任務佇列來說已經足夠。如果需要訊息佇列持久化的強保證,可以使用publisher confirms功能。

 

 

六、 公平分發

你可能也注意到了,分發機制不是那麼優雅。預設狀態下,RabbitMQ將第n個Message分發給第n個消費者。當然n是取餘後的。它不管消費者是否還有unacked Message,只是按照這個預設機制進行分發。

那麼如果有個消費者工作比較重,那麼就會導致有的消費者基本沒事可做,有的消費者卻是毫無休息的機會。那麼,RabbitMQ是如何處理這種問題呢?

通過 BasicQos 方法設定prefetchCount = 1。這樣RabbitMQ就會使得每個Consumer在同一個時間點最多處理一個Message。換句話說,在接收到該Consumer的ack前,他它不會將新的Message分發給它。Qos即服務質量 。設定方法如下:

channel.BasicQos(0, 1, false);

注意,這種方法可能會導致queue滿。當然,這種情況下你可能需要新增更多的消費者,或者建立更多的virtualHost來細化你的設計。