1. 程式人生 > >RabbitMQ.Client API (.NET)中文文件

RabbitMQ.Client API (.NET)中文文件

連線到代理(Connecting to a Broker要連線到RabbitMQ的,有必要例項化(例示)一個連線工廠和其配置為使用所需的主機,虛擬主機和證書(證書)。然後使用ConnectionFactory.CreateConnection()開啟的連線。下面兩段程式碼連線到主機名RabbitMQ的節點:
  1. ConnectionFactory factory =newConnectionFactory(); factory.UserName= user;// "gue
  2. factory.Password= pass;
  3. factory.VirtualHost= vhost;
  4. factory.HostName
    = hostName;
  5. IConnection conn = factory.CreateConnection();
  1. ConnectionFactory factory =newConnectionFactory();
  2. factory.Uri="amqp://user:[email protected]:port/vhost";
  3. IConnection conn = factory.CreateConnection();
由於.NET客戶端使用AMQP 0-9-1 URI規格比其他客戶的嚴格的解釋,必須小心使用URI時服用。特別是,主機部分不能被省略,並且與空名稱虛擬主機不可定址(可定址的)。所有出廠的屬性都有預設值。如果該屬性保持建立連線之前未分配將用於一個屬性的預設值: 使用者名稱     “guest” 密碼     “guest” 虛擬主機     “/” 主機名     “localhost” 埠     5672定期連線,5671連線使用TLS 然後IConnection介面可用於開啟一個通道:
  1. IModel
    channel= conn.CreateModel();
通道現在可以被用來發送和接收訊息,如在隨後的章節中描述。
使用交換機(Exchanges)和佇列(Queues)客戶端應用程式在交換機和佇列中工作,AMQP 0-9-1的高層次的積木工作。這些都必須先“申明”,然後才能使用它們。宣告任一型別的物件只是確保該名稱的一個存在,如果有必要創造它。繼續前面的例子,下面的程式碼宣告一個交換機和佇列,然後繫結在一起。
  1. model.ExchangeDeclare(exchangeName,ExchangeType.Direct);
  2. model.QueueDeclare(queueName,false,false
    ,false,null);
  3. model.QueueBind(queueName, exchangeName, routingKey,null);
這將啟用一下物件:   1: “直連”(direct)型別的非永續性(non-durable),非自動刪除(non-autodelete)交換機(exchange)   2:非持久,不自動刪除,非排他性(non-exclusive)的佇列       交換機可通過使用額外的引數定製。上面的程式碼將佇列繫結到指定路由的交換機。請注意,許多通道API(IModel)方法被過載。 ExchangeDeclare的便利縮寫形式使用合理的預設值。也有較長的形式與更多的引數,讓你根據需要過載這些預設值,充分控制在需要的地方。這種“短版,長版”的格局在整個API使用。傳送訊息(Publishing Messages)使用 IModel.BasicPublish 傳送訊息到交換機,如下:
  1. byte[] messageBodyBytes =System.Text.Encoding.UTF8.GetBytes("Hello, world!");
  2. model.BasicPublish(exchangeName, routingKey,null, messageBodyBytes);
為了更好的控制,可以使用過載變數指定的強制性標誌,或指定的訊息屬性:
  1. byte[] messageBodyBytes =System.Text.Encoding.UTF8.GetBytes("Hello, world!");
  2. IBasicProperties props = model.CreateBasicProperties();
  3. props.ContentType="text/plain";
  4. props.DeliveryMode=2;
  5. model.BasicPublish(exchangeName,routingKey,props,messageBodyBytes);

以持續性的互動模式傳送文字訊息,有關訊息屬性的詳細資訊請檢視IBasicProperties介面的定義。

在下面的例子中,我們傳送定義了Header(頭)的訊息:

12345678910byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes("Hello, world!");IBasicProperties props = model.CreateBasicProperties();props.ContentType = "text/plain";props.DeliveryMode = 2;props.Headers = new Dictionary<string, object>();props.Headers.Add("latitude",  51.5252949);props.Headers.Add("longitude", -0.0905493);model.BasicPublish(exchangeName, routingKey,props, messageBodyBytes);

下面的示例程式碼設定訊息過期時間:

12345678byte[] messageBodyBytes=System.Text.Encoding.UTF8.GetBytes("Hello, world!");IBasicProperties props = model.CreateBasicProperties();props.ContentType = "text/plain";props.DeliveryMode = 2;props.Expiration = "36000000"mode.BasicPublish(exchangeName,routingKey,props,messageBodyBytes);

獲取單條訊息(Fetching Individual Messages ("pull API"))

使用IModel.BasicGet獲取單條訊息,從訊息的Header(屬性)和訊息主體可以獲取到BasicGetResult的例項

12345678bool noAck = false;BasicGetResult result = channel.BasicGet(queueName, noAck);if (result == null) {// No message available at this time.} else {IBasicProperties props = result.BasicProperties;byte[] body = result.Body;...

上面的  noAck=false 你也可以使用  IModel.BasicAck 來確認成功的接受並處理了訊息。

1234...// acknowledge receipt of the messagechannel.BasicAck(result.DeliveryTag, false);}

注意:使用該API獲取訊息是效率較低。如果你想使用RabbitMQ將郵件推送到客戶端,請參閱下一節。 

通過訂閱檢索訊息(Retrieving Messages By Subscription ("push API"))

接收訊息的另一種方法是使用IBasicConsumer介面建立訂閱。該訊息將在到達時被自動推送,而不必進行主動請求。實現消費者的一種方法是使用的方便(convenience)類EventingBasicConsumer,其中排程交付和其他消費的生命週期事件為C#事件:

12345678var consumer = new EventingBasicConsumer(channel);consumer.Received += (ch, ea) =>{var body = ea.Body;// ... process the messagech.BasicAck(ea.DeliveryTag, false);};String consumerTag=channel.BasicConsume(queueName,false,consumer);

另一種選擇是繼承DefaultBasicConsumer類,重寫必要的方法,或者直接實現IBasicConsumer。通常要實現核心方法IBasicConsumer.HandleBasicDeliver。更復雜的消費者將需要實施進一步的方法。特別是,HandleModelShutdown使 channel/connection關閉。消費者還可以實現HandleBasicCancelOk通知取消的訊息。 在沒有被提交給原始IModel.BasicConsume情況下,DefaultBasicConsumer的ConsumerTag屬性可用於檢索伺服器生成的消費者標籤。您可以使用IModel.BasicCancel主動取消消費者:

1channel.BasicCancel(consumerTag);

當呼叫API方法,你總是通過消費者標籤提交到他們自己的消費者,它可以是客戶端或伺服器生成的,詳見AMQP規範0-9-1檔案中解釋。

消費者的併發性考慮(Concurrency Considerations for Consumers)

每個IConnection例項,在當前實現中,由單個後臺執行緒從Socket中讀取並排程所得事件給應用程式的支援。如果啟用心跳,必須3.5.0版本,它們用.NET的定時器來實現的。通常,因此,在使用這種庫的應用程式至少需要啟用兩個執行緒:

1:應用程式執行緒(the application thread

包含應用程式的邏輯,呼叫 IModel 的方法執行協議操作。

2:活動的 I/O 執行緒

通過IConnection的例項隱藏和完全管理

在任何 回撥的應用程式和庫中,執行緒模型對應用程式是可見的。這樣的回撥包括:

1、任何IBasicConsumer方法

2、在IModel的BasicReturn事件

3、任何對IConnection了各種關閉事件,IModel等。

消費者回調和訂閱(Consumer Callbacks and Ordering)

從版本3.5.0應用回撥處理程式可以呼叫阻塞操作(如IModel.QueueDeclare或IModel.BasicCancel)。IBasicConsumer回撥併發呼叫。然而,每個通道的操作順序將予以保留。換句話說,如果訊息A和B在該順序輸送在同一通道上,它們將被以該順序進行處理。如果訊息A和B分別在不同的通道輸送,它們可以以任何順序進行處理(或並行)。消費者回調在派往由.NET執行庫提供的預設的TaskScheduler任務呼叫。

使用自定義計劃任務(Using a Custom Task Scheduler)

我們可以通過設定ConnectionFactory.TaskScheduler使用自定義的任務排程程式:

1234567public class CustomTaskScheduler : TaskScheduler{// ...}var cf = new ConnectionFactory();cf.TaskScheduler = new CustomTaskScheduler();

此處的例子,可以用來限制與一個自定義的TaskScheduler併發程度。

執行緒之間共享通道(Sharing Channels Between Threads)

根據經驗,IModel例項不應由多個執行緒同時使用:應用程式程式碼應該為IModel例項維護一個清晰的執行緒所有權概念。如果多個執行緒需要訪問特定的IModel例項,應用程式應該實施互斥本身。 實現這一點的一種方式是對於IModel的所有使用者鎖定例項本身

  1. IModel ch =RetrieveSomeSharedIModelInstance();
  2. lock(ch){
  3. ch.BasicPublish(...);
  4. }
不正確序列化的IModel操作包括但不限於,如下:

1、 線上路上傳送的無效幀序列(例如,如果同時執行多於一個BasicPublish操作,則發生),和/或NotSupportedExceptions從RpcContinuationQueue類中的方法丟擲,引發“禁止請求的管道”(在同時執行多個AMQP 0-9-1同步操作(如ExchangeDeclare)的情況下)。

處理不可路由的訊息(Handling Unroutable Messages

如果釋出的訊息具有設定的“mandatory”標誌,但不能傳遞,代理將返回給傳送客戶端(通過basic.return AMQP 0-9-1命令)。 為了通知這樣的返回,客戶可以訂閱IModel.BasicReturn事件。 如果沒有連線到事件的偵聽器,則返回的訊息將被靜默刪除。

  1. model.BasicReturn+=
  2. newRabbitMQ.Client.Events.BasicReturnEventHandler(...);
例如,如果客戶端釋出了一條“強制”標誌設定為未繫結到佇列的“direct”型別交換的訊息,則BasicReturn事件將觸發。斷開與RabbitMQ的連線(Disconnecting from RabbitMQ)

要斷開連線,只需關閉通道和連線:

  1. channel.Close(200,"Goodbye");
  2. conn.Close();
注意,關閉頻道被認為是良好的做法,但不是絕對必要的 - 它將在底層連線關閉時自動完成。 在某些情況下,您可能希望連線在連線上的最後一個開啟通道關閉後自動關閉。 要實現這一點,請將IConnection.AutoClose屬性設定為true,但僅在建立第一個通道後:
  1. IConnection conn = factory.CreateConnection(...);
  2. IModel channel = conn.CreateModel();
  3. conn.AutoClose=true;
當AutoClose為true時,最後關閉的通道也將導致連線關閉。 如果在建立任何通道之前將其設定為true,則連線將在此時關閉。從網路故障自動恢復(Automatic Recovery From Network Failures)連線恢復(Connection Recovery)

客戶端和RabbitMQ節點之間的網路連線可能失敗。 RabbitMQ .NET / C#客戶端支援自動恢復連線和拓撲(queues, exchanges, bindings, and consumers)。 許多應用程式的自動恢復過程遵循以下步驟:

     1、重新連線 (Reconnect)

     2、還原連線偵聽器(Restore connection listeners)

     3、重新開啟通道(Re-open channels)

     4、還原頻道偵聽器(Restore channel listeners

     5、恢復通道basic.qos設定,釋出者確認和事務設定(Restore channel basic.qos setting, publisher confirms and transaction settings

拓撲恢復包括對每個通道執行的以下操作:

     1、重新宣告交易(除了預定義的交易)(Re-declare exchanges (except for predefined ones)

     2、重新宣告佇列(Re-declare queues)

     3、恢復所有繫結(Recover all bindings)

     4、恢復所有消費者(Recover all consumers)

要啟用自動連線恢復,請將ConnectionFactory.AutomaticRecoveryEnabled設定為true:

  1. ConnectionFactory factory =newConnectionFactory();
  2. factory.AutomaticRecoveryEnabled=true;
  3. // connection that will recover automatically
  4. IConnection conn = factory.CreateConnection();
如果恢復由於異常(例如RabbitMQ節點仍然不可達)失敗,將在固定的時間間隔(預設為5秒)後重試。 間隔可以配置:
  1. ConnectionFactory factory =newConnectionFactory();
  2. // attempt recovery every 10 seconds
  3. factory.NetworkRecoveryInterval=TimeSpan.FromSeconds(10);
拓撲恢復

拓撲恢復涉及恢復queues, exchanges, bindings, and consumers。 預設情況下啟用它,但可以禁用:

  1. ConnectionFactory factory =newConnectionFactory();
  2. Connection conn = factory.CreateConnection();
  3. factory.AutomaticRecoveryEnabled=true;
  4. factory.TopologyRecoveryEnabled=false;
手動確認和自動恢復當使用手動確認時,可能與RabbitMQ節點的網路連線在訊息傳遞和確認之間失敗。 在連線恢復後,RabbitMQ將重置所有通道上的交付標籤。 這意味著使用舊的傳遞標記的basic.ack,basic.nack和basic.reject將導致通道異常。 為了避免這種情況,RabbitMQ .NET客戶端跟蹤和更新傳遞標記,使它們在恢復之間單調增長。 IModel.BasicAck,IModel.BasicNack和IModel.BasicReject然後將調整後的交付標籤轉換為RabbitMQ使用的標籤。 不會發送過期交貨標籤的確認。 使用手動確認和自動恢復的應用程式必須能夠處理重新遞送。使用AMQP 0-9-1的常見方法(Common ways of working with AMQP 0-9-1

當使用RabbitMQ構建分散式系統時,會有一些不同的訊息模式反覆出現。在本節中,我們將介紹一些最常見的編碼模式和互動風格:

    點對點訊息:遠端過程呼叫(RPC)和指向特定接收器的非同步訊息。

    事件廣播:一對多互動;隱含地指向一組感興趣的接收者的訊息的傳輸,以及零個或多個可能的響應的收集。

    責任轉移:選擇網路中的哪個部分負責任何給定的訊息。

    訊息傳輸:至少一次和最多一次訊息傳遞。

    在與外部資源互動時保持原子性和冪等性。

有限庫支援也可用於處理這些模式,在RabbitMQ.Client.MessagePatterns名稱空間:

    訂閱提供了從伺服器接收訊息的高階介面。

    SimpleRpcServer構建在Subscription上以實現RPC或單向服務。

    SimpleRpcClient構建在Subscription上,與遠端服務互動。

將來的RabbitMQ .NET客戶端庫版本將包括改進對最常見的訊息傳遞模式及其變體的高階支援。

點對點訊息(Point-to-point Messaging

當訊息的釋出者具有特定的接收應用時,例如,當通過AMQP伺服器使得RPC樣式的服務可用時,或者當工作流鏈中的應用接收到訊息時,發生點對點訊息傳遞模式工作項,並將轉換後的工作項傳送給其後繼者。

同步,客戶機 - 伺服器遠端過程呼叫(RPC)

為了執行請求/響應RPC,

    一些解決服務的手段必須可用

    一些接收答覆的方法必須可用

    將請求訊息與回覆訊息相關聯的一些裝置必須可用

定址服務(Addressing the service

由於AMQP訊息是使用一對交換名稱和路由金鑰釋出的,因此這足以用於定址服務。使用簡單的交換名/路由 - 金鑰組合允許多種不同的方式來實現服務,同時向客戶端呈現相同的介面。例如,服務可以被實現為從佇列消耗的單個程序和在內部的負載均衡,或者其可以是從單個佇列消耗的多個程序,被遞送請求迴圈式,從而在沒有特殊編碼的情況下進行負載均衡服務邏輯。訊息也可以定址到服務請求佇列

    直接,使用AMQP預設交換(“”);要麼

    間接地通過使用服務特定交換,其使得路由金鑰免費用於諸如方法選擇或附加服務特定定址資訊的目的;要麼

    間接地,通過使用由多個服務共享的交換,其中服務名稱在路由金鑰中編碼。

使用預設交換之外的交換允許其他應用程式接收每個請求訊息的副本,這對於監視,審計,日誌記錄和除錯是有用的。

確保服務例項正在偵聽

AMQP 0-9-1釋出操作(IModel.BasicPublish)提供了交付標誌“強制性”,可用於確保客戶端傳送請求時的服務可用性。如果不能將請求路由到佇列,則設定“mandatory”標誌會導致返回請求。返回的訊息顯示為basic.return命令,通過IModel上用於釋出訊息的IModel.BasicReturn事件使其可見。

由於已釋出的訊息通過basic.return方法返回到客戶端,而basic.return是非同步否定確認事件,因此不能將特定訊息的basic.return作為傳遞的確認:使用傳遞標誌只提供了提高杆的方法,而不是完全消除故障。

另外,訊息被標記為“強制性”並且成功地入隊在一個或多個佇列上的事實不能保證其最終接收:最平常地,佇列可以在訊息被處理之前被刪除,但是其他情況,例如使用noAck標誌的訊息消費者,也可以使得“強制”提供的保證有條件。

或者,您可以使用釋出商確認。通過呼叫IModel.ConfirmSelect將通道設定為確認模式會導致代理在通過傳遞到就緒消費者或持久儲存到磁碟來處理每個訊息後傳送Basic.Ack。一旦通過IModel.BasicAcks事件處理程式確認成功處理的訊息,代理就承擔了該訊息的責任,客戶端可以考慮處理訊息。注意,代理還可以通過傳送回Basic.Nack來否定確認訊息。在這種情況下,如果通過IModel.BasicNacks事件處理程式拒絕訊息,客戶端應該假定訊息丟失或以其他方式無法投遞。此外,請注意,不可路由的訊息 - 釋出為不存在佇列的強制性訊息 - 都是Basic.Return和Basic.Ack'ed。

接收回復(Receiving Replies

AMQP 0-9-1內容頭(IBasicProperties)包含一個稱為ReplyTo的欄位,可用於告知服務在何處釋出對接收到的RPC請求的答覆。在當前的RabbitMQ客戶端庫中,ReplyTo頭中的字串使用最廣泛的格式是一個簡單的佇列名稱,儘管傳遞通過應用程式特定規則加入的交換名稱和路由鍵也是一個選項。服務例項將其答覆釋出到指定的目的地,並且請求客戶端應該安排接收如此定址的訊息,使用在適當繫結的佇列上的BasicGet或BasicConsume。

將接收到的應答與傳送的請求相關聯

IBasicProperties包含一個名為CorrelationId的欄位,在AMQP 0-9-1中是一個非結構化字串,可用於將請求匹配到回覆。應答訊息應具有與附加到請求訊息的相同的相關標識。

非同步,單向訊息傳遞(Asynchronous, one-way messaging

在某些情況下,簡單的請求 - 回覆互動模式不適合您的應用程式。 在這些情況下,感興趣的互動模式可以從非同步,單向,點對點訊息構造。 如果應用程式要響應同步,RPC樣式請求和非同步單向請求,它應該使用ReplyTo的值來決定請求它的互動樣式:如果ReplyTo存在並且非空, 請求可以假定是一個RPC樣式的呼叫; 否則,應假定它是單向訊息。 CorrelationId欄位可以用於將多個相關訊息分組在一起,就像對於RPC樣式的情況一樣,但是更通常地將任意數量的訊息繫結在一起。

點對點的確認模式(Acknowledgment modes for point-to-point)

當從伺服器接收訊息時,AMQP可以以兩種模式之一操作:自動確認模式(當BasicGet,BasicConsume或Subscription建構函式上設定noAck標誌時)或手動確認模式。選擇正確的確認模式對於您的應用程式很重要:

    自動確認模式意味著當伺服器在網路上傳輸訊息時,伺服器將內部將訊息標記為已成功傳遞。以自動確認模式傳送的訊息通常不會重新傳送到任何其他接收器。

    手動確認模式意味著在將訊息標記為已成功傳送之前,伺服器將等待接收的肯定確認。如果在伺服器接收到確認之前關閉了交付的手動確認模式下的通道(IModel),則將重新排隊。

一般來說,

    如果服務處於手動確認模式,則它不應該確認請求訊息,直到它回覆它;請參閱下面有關與外部資源互動的部分。

    客戶端可以使用自動確認模式,這取決於請求訊息的重傳的結果。

庫支援點對點訊息傳遞(Library support for point-to-point messaging)

RabbitMQ .NET客戶端庫包括涉及點對點訊息傳遞的常見任務的基本支援。

SimpleRpcServer

類RabbitMQ.Client.MessagePatterns.SimpleRpcServer實現同步RPC樣式的請求處理以及非同步訊息處理。使用者應該繼承SimpleRpcServer,覆蓋一個或多個以“Handle”開頭的方法。 SimpleRpcServer例項具有請求排程迴圈MainLoop,它將請求解釋為RPC樣式的請求,如果請求的IBasicProperties的ReplyTo欄位為非空且非空,則需要回復。具有缺少或空的ReplyTo欄位的請求被視為單向。當處理了RPC樣式的請求時,將答覆傳送到ReplyTo地址。答覆地址首先與描述上面給出的類似URI的語法的正則表示式相匹配;如果匹配,則使用類似URI的語法的元件作為回覆地址,如果不匹配,則將整個字串用作簡單佇列名稱,並將回覆傳送到預設交換(“”)一個等於ReplyTo字串的路由鍵。

SimpleRpcClient

類RabbitMQ.Client.MessagePatterns.SimpleRpcClient實現與SimpleRpcServers或類似的互動的程式碼。 RPC風格的互動是用Call方法執行的。 (私人)訂閱設定為從服務接收回復,並且ReplyTo欄位設定為指向訂閱。請求的CorrelationId欄位被初始化為新的GUID。非同步/單向互動被簡單地傳遞到IModel.BasicPublish而不修改:它是由呼叫者在非同步情況下設定CorrelationId。該類目前不支援在已釋出的請求訊息上設定“mandatory”標誌,也不支援處理由於設定該標誌而可能產生的任何BasicReturn事件。從內部訂閱檢索答覆的程式碼當前無法處理多個同時未解決的RPC請求,因為它要求答覆以與傳送請求相同的順序到達。在解除此限制之前,不要嘗試管理通過SimpleRpcClient的單個例項傳送的請求。另請參見可覆蓋的受保護方法SimpleRpcClient.RetrieveReply。使用SimpleRpcClient的基本模式如下:

  1. using (IConnection conn =newConnectionFactory()
  2. .CreateConnection(args[0])){
  3. using (IModel ch = conn.CreateModel()){
  4. SimpleRpcClient client =newSimpleRpcClient(ch,/* ... */);
  5. // in the line above, the "..." indicates the parameters
  6. // used to specify the address to use to route messages
  7. // to the service.
  8. // The next three lines are optional:
  9. client.TimeoutMilliseconds=5000;// defaults to infinity
  10. client.TimedOut+=newEventHandler(TimedOutHandler);
  11. client.Disconnected+=newEventHandler(DisconnectedHandler);
  12. byte[] replyMessageBytes = client.Call(requestMessageBytes);
  13. // other useful overloads of Call() and Cast() are
  14. // available. See the code documentation of SimpleRpcClient
  15. // for full details.
  16. }
  17. }
請注意,單個SimpleRpcClient例項可以執行許多(順序)Call()和Cast()請求! 建議單個SimpleRpcClient重複用於多個服務請求,只要請求是嚴格順序的。

事件廣播(Event Broadcasting)

當應用程式希望在不知道每個感興趣方的地址的情況下向應用程式池指示狀態改變或其他通知時,發生事件廣播模式。 對某個事件子集感興趣的應用程式使用交換和佇列繫結來配置哪些事件被路由到其自己的專用佇列。

通常,事件將通過主題交換廣播,但是直接交換雖然不太靈活,但是有時對於其有限模式匹配能力足夠的應用可以執行得更好。

釋出事件(Publishing events

要釋出事件,首先確保交換存在,然後確定適當的路由金鑰。 例如,對於股票,一個鍵如“stock.ibm.nyse”可能是合適的; 對於其他應用程式,其他主題層次結構將自然出現。 主題交換常用。 然後釋出訊息。 例如:

  1. using (IConnection conn =newConnectionFactory()
  2. .CreateConnection(args[0])){
  3. using (IModel ch = conn.CreateModel()){
  4. IBasicProperties props = ch.CreateBasicProperties();
  5. FillInHeaders(props);// or similar
  6. byte[] body =ComputeBody(props);// or similar
  7. ch.BasicPublish("exchangeName",
  8. "chosen.routing.key",
  9. props,
  10. body);
  11. }
  12. }
請參閱RabbitMQ.Client.IModel類中的BasicPublish的各種過載的文件。訂閱(Subscription)RabbitMQ.Client.MessagePatterns.Subscription類實現了大多數接收訊息(包括,特別是廣播事件)的樣板,包括消費者宣告和管理,但不包括佇列和交換宣告和佇列繫結。例如,
  1. // "IModel ch" in scope.
  2. Subscription sub =newSubscription(ch,"STOCK.IBM.#");
  3. foreach(BasicDeliverEventArgs e in sub){
  4. // handle the message contained in e ...
  5. // ... and finally acknowledge it
  6. sub.Ack(e);
  7. }
將使用IModel.BasicConsume在佇列上啟動一個消費者。它假定佇列和任何繫結以前已經宣告。應該為每個接收的事件呼叫Subscription.Ack(),無論是否使用自動確認模式,因為Subscription內部知道是否需要確認的實際網路訊息,並以有效的方式為您處理只要在你的程式碼中總是呼叫Ack()。有關完整的詳細資訊,請參閱Subscription類的程式碼文件。使用自定義消費者檢索事件(Retrieving events with a custom consumer)有時,使用Subscription的高階方法是足夠的。 然而,其他時候,需要使用定製消費者。 這種檢索事件的方法是將佇列繫結到與適當的路由 - 金鑰模式規範相關的交換。 例如,假設我們的應用程式想要在佇列“MyApplicationQueue”上檢索有關IBM的所有價格:         
  1. // "IModel ch" in scope.
  2. ch.ExchangeDeclare("prices","topic");
  3. ch.QueueDeclare("MyApplicationQueue",false,true,true,null);
  4. ch.QueueBind("MyApplicationQueue","prices",
  5. "STOCK.IBM.#",false,null);
然後使用BasicGet或BasicConsume從“MyApplicationQueue”消耗訊息。 一個更完整的例子在ApiOverview章節。

事件廣播的確認模式(Acknowledgment modes for event broadcasting

與用於點對點訊息傳遞的相同的自動確認/手動確認決定可用於廣播事件的消費者,但是互動的模式引入不同的權衡:

    對於高容量訊息傳遞,其中偶爾可接受的是不接收一個感興趣的訊息,自動確認模式是有意義的

    對於其中滿足我們的訂閱的每個訊息需要被遞送的情況,手動確認是適當的

有關詳細資訊,請參閱下面的可靠郵件傳輸部分。還要注意,只要為每個接收的訊息呼叫Subscription.Ack(),類Subscription就會負責確認和各種確認模式。

可靠的訊息傳輸(Reliable message transfer)

訊息可以在具有不同服務質量(QoS)水平的端點之間傳輸。一般來說,不能完全排除故障,但重要的是要了解各種交付故障模式,以瞭解從故障中恢復的種類,以及可能恢復的情況。重申:不可能完全排除故障。可以做的最好是縮小可能發生故障的條件,並且當檢測到故障時通知系統操作員。

至少一次遞送

該QoS水平確保訊息被傳遞到其最終目的地至少一次。也就是說,接收器可以接收訊息的多個副本。如果對於給定訊息,副作用僅發生一次是重要的,則應該使用至多一次遞送。

要實施至少一次投放(At-least-once delivery)

    像往常一樣釋出訊息,在其上具有一些相關識別符號和回覆地址,使得接收方可以確認對傳送方的接收。當接收到訊息時,將確認訊息傳送回傳送者。如果訊息是RPC請求,則RPC應答訊息隱式地是對請求的接收的確認。

    或者,不是手動實現往返邏輯,而是客戶端可以使用釋出者確認。通過在通道上啟用確認模式,客戶端請求代理確認或否定確認從該點開始在該通道上傳送的所有訊息。請參閱“責任轉移”中有關如何使用確認的說明。

決定郵件重發策略可能很困難。一些簡單的重新發送策略是:

    如果您的連線丟失或在您收到收據確認之前發生其他崩潰,請重新發送

    如果您在幾秒鐘內沒有收到確認,則超時並重新發送。請確保每次重新發送的超時時間加倍,以幫助避免與重試相關的拒絕服務和網路擁塞。

最多一次傳送(At-most-once delivery)

對於最多一次傳遞,只需釋出訊息,一次,照常。不需要相關識別符號。在使用應用程式中接收訊息,注意交貨時的Redelivered標誌。 Redelivered標誌只有在伺服器認為它提供第一次訊息訊息時才會清除。如果之前已進行任何交貨嘗試,則重新送達標誌將被設定。 Redelivered標誌是一個非常有限的資訊,只給出最多一次的語義。

用多節點RabbitMQ叢集編碼(Coding with multi-node RabbitMQ clusters

在需要連續服務的情況下,可以通過一些仔細的程式設計和用於故障轉移的熱備份叢集的可用