1. 程式人生 > >【c#】RabbitMQ學習文檔(七)C# API

【c#】RabbitMQ學習文檔(七)C# API

[] 檢索 並發 IT 重載 線上 request 基本屬性 and

今天這篇博文是我翻譯的RabbitMQ的最後一篇文章了,介紹一下RabbitMQ的C#開發的接口。好了,言歸正傳吧。

Net/C# 客戶端 API簡介

主要的命名空間,接口和類

定義核心的API的接口和類被定義在RabbitMQ.Client這個命名空間下面:

所以要想使用RabbitMQ的功能,需要以下代碼

using RabbitMQ.Client;

核心API的接口和類

IModel:表示一個符合AMQP 0-9-1 協議的通道,並且提供了很多的操作方法

IConnection:表示一個符合AMQP 0-9-1協議的連接對象,用戶和RabbitMQ 服務端的連接


ConnectionFactory:可以創建一個IConnection對象的實例。

IBasicConsumer:表示一個消息的消費者,或者是使用者。

其他有用的接口和類

DefaultBasicConsumer:通常用作消費者的基類,如果要編寫自己的消費者程序,可以從該類繼承。

除RabbitMQ.Client之外的公共命名空間

RabbitMQ.Client.Events:作為客戶端庫一部分的各種事件和事件處理程序。包括EventingBasicConsumer,一個基於C#事件處理程序構建的消費者實現。

RabbitMQ.Client.Exceptions: 對用戶可見的一些異常對象。


所有其他命名空間都保留用於庫的私有實現細節,盡管私有命名空間的成員通常可以使用該庫的應用程序使用,以便允許開發人員實現其在庫實現中發現的故障或設計錯誤的解決方法。 應用程序不能依賴於在庫的版本中保持穩定的私有命名空間中出現的任何類,接口,成員變量等。

創建到代理的連接


要連接到RabbitMQ,需要實例化一個ConnectionFactory並將其配置為使用所需的主機名,虛擬主機和憑據。 然後使用ConnectionFactory.CreateConnection()打開一個連接。 以下兩個代碼片段連接到hostName上的RabbitMQ節點:

技術分享圖片
ConnectionFactory factory = new
ConnectionFactory(); // "guest"/"guest" by default, limited to localhost connections factory.UserName = user; factory.Password = pass; factory.VirtualHost = vhost; factory.HostName = hostName; IConnection conn = factory.CreateConnection(); ConnectionFactory factory = new ConnectionFactory(); factory.Uri = "amqp://user:pass@hostName:port/vhost"; IConnection conn = factory.CreateConnection();
技術分享圖片


由於.NET客戶端使用比其他客戶端更嚴格的AMQP 0-9-1 URI規範解釋,因此在使用URI時必須小心。 特別是,主機部分不能被忽略,具有空名稱的虛擬主機不可尋址。 所有出廠屬性都有默認值。 如果屬性在創建連接之前保持未分配,則將使用屬性的默認值:

技術分享圖片
Username
    "guest"
   Password
    "guest"
   Virtual host
    "/"
   Hostname
    "localhost"
   port
    5672 是針對一般而言的連接, 5671 是針對 TLS 連接的 
技術分享圖片


IConnection接口可以打開一個通道:

 IModel channel = conn.CreateModel();


通道Channel用於接收和發送消息

使用消息交換機和隊列


客戶端應用程序將與消息交換機和消息隊列(AMQP 0-9-1的高級構建塊)配合工作。 【消息交換機】和【消息隊列】在使用之前必須先聲明他們。 聲明任何類型的對象只是確保其中一個名稱存在,如有必要,創建它。 繼續前面的例子,以下代碼聲明一個消息交換機和一個隊列,然後將它們綁定在一起。

model.ExchangeDeclare(exchangeName, ExchangeType.Direct);
   model.QueueDeclare(queueName, false, false, false, null);
   model.QueueBind(queueName, exchangeName, routingKey, null);


這將聲明了以下兩個對象:

【1】、非持久、非自動刪除的、交換類型為“direct”的消息交換機;

【2】、非持久、非自動刪除、非排他的消息隊列

可以通過使用附加參數來定制消息交換機。 然後,上面的代碼使用給定的路由鍵將隊列綁定到消息交換機。 請註意,許多通道API(IModel)方法重載。 ExchangeDeclare方便的簡單形式使用合理的默認值。 還有更多的表單具有更多的參數,可以根據需要修改這些默認值,並在需要時進行完全控制。 在整個API中使用這種“短版本,長版本”模式。

發布消息


要將消息發布到消息交換機,請使用IModel.BasicPublish,如下所示:

 byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!");
  model.BasicPublish(exchangeName, routingKey, null, messageBodyBytes);


為了精細控制,您可以使用重載變量來指定強制標誌,或指定消息屬性:

技術分享圖片
byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!");
  IBasicProperties props = model.CreateBasicProperties();
  props.ContentType = "text/plain";
  props.DeliveryMode = 2;
  model.BasicPublish(exchangeName,
                   routingKey, props,
                   messageBodyBytes);
技術分享圖片


這將發送一個帶有發送模式為2(持久性)和內容類型是“text / plain”的消息。 有關可用消息屬性的更多信息,請參閱IBasicProperties接口的定義。

在以下示例中,我們使用自定義標頭發布消息:

技術分享圖片
byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!");

   IBasicProperties props = model.CreateBasicProperties();
   props.ContentType = "text/plain";
   props.DeliveryMode = 2;
   props.Headers = new Dictionary<string, object>();
   props.Headers.Add("latitude",  51.5252949);
   props.Headers.Add("longitude", -0.0905493);

   model.BasicPublish(exchangeName,
                   routingKey, props,
                   messageBodyBytes);
技術分享圖片


下面的示例設置消息過期:

技術分享圖片
byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!");

   IBasicProperties props = model.CreateBasicProperties();
   props.ContentType = "text/plain";
   props.DeliveryMode = 2;
   props.Expiration = "36000000"

   mode.BasicPublish(exchangeName,
                  routingKey, props,
                  messageBodyBytes);
技術分享圖片

獲取個人消息(“拉API”)

要檢索單個消息,請使用IModel.BasicGet。 返回的值是BasicGetResult的實例,可以從中提取基本屬性和消息體:

技術分享圖片
bool noAck = false;
BasicGetResult result = channel.BasicGet(queueName, noAck);
if (result == null) {
    // No message available at this time.
} else {
    IBasicProperties props = result.BasicProperties;
    byte[] body = result.Body;
    ...
技術分享圖片


由於noAck = false,您還必須調用IModel.BasicAck來確認您已成功接收並處理該消息:

  ...
    // acknowledge receipt of the message
    channel.BasicAck(result.DeliveryTag, false);
}


請註意,使用此API獲取消息效率相對較低。 如果您希望RabbitMQ將消息推送給客戶端,請參閱下一節。

通過訂閱檢索郵件(“推送API”)

接收消息的另一種方法是使用IBasicConsumer接口建立訂閱。 然後,消息將在其到達時自動發送,而不必主動請求。 實現【消費者】的一種方法是使用便利類EventingBasicConsumer,它將傳送和其他【消費者】生命周期事件以C#事件:

技術分享圖片
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (ch, ea) =>
                {
                    var body = ea.Body;
                    // ... process the message
                    channel.BasicAck(ea.DeliveryTag, false);
                };
String consumerTag = channel.BasicConsume(queueName, false, consumer);
技術分享圖片


另一個選項是根據子類DefaultBasicConsumer,覆蓋方法,或者直接實現IBasicConsumer。 你一般會想實現核心方法IBasicConsumer.HandleBasicDeliver。 更復雜的消費者將需要實施進一步的方法。 特別地,HandleModelShutdown使通道/連接關閉。 【消費者】還可以實現HandleBasicCancelOk以獲得取消通知。 DefaultBasicConsumer的ConsumerTag屬性可用於檢索服務器生成的使用者標簽,如果沒有提供給原始的IModel.BasicConsume調用。 您可以使用IModel.BasicCancel取消活躍的消費者:

channel.BasicCancel(consumerTag);


在調用API方法時,您總是通過【消費者】標簽來引用【消費者】,【消費者】標簽可以是客戶端或服務器生成的,如AMQP 0-9-1規範文檔中所述。

【消費者】的並發註意事項


在當前實現中,每個IConnection實例都由一個從套接字讀取的後臺線程支持,並將生成的事件分派到應用程序。 如果啟用了心跳,則從3.5.0版開始,它們將以.NET定時器方式實現。 因此,通常使用此庫的應用程序中至少有兩個線程處於活動狀態:

應用程序線程

包含應用程序邏輯,並調用IModel方法來執行協議操作。

I / O活動線程

隱藏並由IConnection實例完全管理。

在應用程序中可以看到線程模型的本質的一個地方是在庫中的應用程序註冊的任何回調。 這種回調包括:

任何IBasicConsumer方法
在IModel上的BasicReturn事件
IConnection,IModel等各種關機事件

【消費者】回調和訂閱

從版本3.5.0開始,應用程序回調處理程序可以調用阻塞操作(如IModel.QueueDeclare或IModel.BasicCancel)。 IBasicConsumer回調同時被調用。 但是,每通道操作命令被保留。 換句話說,如果消息A和B按照同一個頻道的順序傳送,則按照此順序進行處理。 如果消息A和B在不同的信道上傳送,則可以以任何順序(或並行)處理消息。 由【.NET運行時】提供的、默認的TaskScheduler的任務分發器中調用【消費者】的回調

使用自定義任務計劃程序

通過設置ConnectionFactory.TaskScheduler可以使用自定義任務調度程序:

技術分享圖片
public class CustomTaskScheduler:TaskScheduler
{
   // ...
}

var cf = new ConnectionFactory();
cf.TaskScheduler = new CustomTaskScheduler();
技術分享圖片


例如,這可以用於通過自定義TaskScheduler來限制並發程度。

在多線程中共享通道Chanel


作為經驗法則,IModel實例不應同時被多個線程使用:應用程序代碼應該保持對IModel實例的線程所有權的清晰認識。 如果多個線程需要訪問特定的IModel實例,應用程序應該強制執行互斥。 實現此目的的一種方法是為IModel的所有用戶鎖定實例本身:

IModel ch = RetrieveSomeSharedIModelInstance();
lock (ch) {
  ch.BasicPublish(...);
}


IModel操作錯誤一系列的癥狀包括但不限於,

在線上發送無效幀序列(例如,如果同時運行多個BasicPublish操作,則發生)和/或從類RpcContinuationQueue中的方法拋出NotSupportedException異常,提示“管道的請求是被禁止的【Pipelining of requests forbidden】”(在同時運行多個AMQP 0-9-1同步操作(如ExchangeDeclare)的情況下發生)。

處理不可路由的消息

如果發布了一個設置了“強制”標誌的消息,但未能送達,則代理將消息返回給發送的客戶端(通過basic.return AMQP 0-9-1命令)。 要獲得此類通知,客戶端可以訂閱IModel.BasicReturn事件。 如果沒有附加事件的監聽器,則返回的消息將被靜默地丟棄。

model.BasicReturn +=
  new RabbitMQ.Client.Events.BasicReturnEventHandler(...);


如果客戶端把一條標識為“mandatory”的消息發送到了類型為【Direct】的【消息交換機Exchange】,但是這個exchange還沒有綁定到一個消息隊列的時候,BasicReturn事件就會被觸發

從RabbitMQ斷開連接

要斷開連接,只需關閉通道和連接:

channel.Close(200, "Goodbye");
conn.Close();


請註意,關閉通道是很好的做法,但不是必要的 - 當底層連接關閉時,它將自動完成。 在某些情況下,您可能希望連接上的最後一個打開的通道關閉的時候連接也關閉, 要實現此目的,請將IConnection.AutoClose屬性設置為true,但僅在創建第一個通道後:

IConnection conn = factory.CreateConnection(...);
IModel channel = conn.CreateModel();
conn.AutoClose = true;


當AutoClose為true時,最後關閉的通道也將導致連接關閉。 如果在創建任何通道之前設置為true,則連接將在那時關閉。

好了,暫時翻譯到此吧,其實還有一些沒翻譯完,暫時就不翻譯了,有時間再繼續。

再把原文的地址貼出來,想看完整的可以通過連接瀏覽。地址如下:http://www.rabbitmq.com/dotnet-api-guide.html

【c#】RabbitMQ學習文檔(七)C# API