1. 程式人生 > >官網指南-RabbitMQ-Java Client API Guide

官網指南-RabbitMQ-Java Client API Guide

概述

RabbitMQ Java client 將com.rabbitmq.client作為其頂層包. 關鍵類和介面有:

  • Channel
  • Connection
  • ConnectionFactory
  • Consumer

協議操作可通過Channel介面來進行.Connection用於開啟channels,註冊connection生命週期事件處理, 並在不需要時關閉connections.
Connections是通過ConnectionFactory來初始化的,在ConnectionFactory中,你可以配置不同的connection設定,如:虛擬主機和使用者名稱等等.

Connections 和 Channels

核心API類是Connection和Channel, 它們代表對應AMQP 0-9-1 connection 和 channel. 在使用前,可像下面這樣來匯入:

import com.rabbitmq.client.Connection; 
import com.rabbitmq.client.Channel;

連線到broker

下面的程式碼會使用給定的引數連線到AMQP broker:

ConnectionFactory factory = new ConnectionFactory(); 
factory.setUsername(userName); 
factory.setPassword(password); 
factory.setVirtualHost(virtualHost); 
factory.setHost(hostName); 
factory.setPort(portNumber); 
Connection conn = factory.newConnection(); 

也可以使用URIs 來設定連線引數:

ConnectionFactory factory = new ConnectionFactory(); 
factory.setUri("amqp://userName:[email protected]:portNumber/virtualHost"); 
Connection conn = factory.newConnection(); 

Connection 介面可用來開啟一個channel:

Channel channel = conn.createChannel(); 

channel現在可用來發送和接收訊息,正如後續章節中描述的一樣.

要斷開連線,只需要簡單地關閉channel和connection:

channel.close(); conn.close();

關閉channel被認為是最佳實踐,但在這裡不是嚴格必須的 - 當底層連線關閉的時候,channel也會自動關閉.

使用 Exchanges 和 Queues

採用交換器和佇列工作的客戶端應用程式,是AMQP高級別構建模組。在使用前,必須先宣告.宣告每種型別的物件都需要確保名稱存在,如果有必要須進行建立.

繼續上面的例子,下面的程式碼聲明瞭一個交換器和一個佇列,然後再將它們進行繫結.

channel.exchangeDeclare(exchangeName, "direct", true); 
String queueName = channel.queueDeclare().getQueue(); 
channel.queueBind(queueName, exchangeName, routingKey);

這實際上會宣告下面的物件,它們兩者都可以可選引數來定製. 在這裡,它們兩個都沒有特定引數。

  1. 一個型別為direct,且持久化,非自動刪除的交換器
  2. 採用隨機生成名稱,且非持久化,私有的,自動刪除佇列

上面的函式然後使用給定的路由鍵來繫結佇列和交換器.

注意,當只有一個客戶端時,這是一種典型宣告佇列的方式:它不需要一個已知的名稱,其它的客戶端也不會使用它(exclusive),並會被自動清除(autodelete).
如果多個客戶端想共享帶有名稱的佇列,下面的程式碼應該更適合:

channel.exchangeDeclare(exchangeName, "direct", true); 
channel.queueDeclare(queueName, true, false, false, null); 
channel.queueBind(queueName, exchangeName, routingKey);

這實際上會宣告:

  1. 一個型別為direct,且持久化,非自動刪除的交換器
  2. 一個已知名稱,且持久化的,非私有,非自動刪除佇列

注意,Channel API 的方法都是過載的。這些 exchangeDeclare, queueDeclare 和queueBind 都使用的是預設行為.
這裡也有更多引數的長形式,它們允許你按需覆蓋預設行為,允許你完全控制。

發由訊息

要向交換器中釋出訊息,可按下面這樣來使用Channel.basicPublish方法:

byte[] messageBodyBytes = "Hello, world!".getBytes(); 
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);

為了更好的控制,你可以使用過載方法來指定mandatory標誌,或使用預先設定的訊息屬性來發送訊息:

channel.basicPublish(exchangeName, routingKey, mandatory, MessageProperties.PERSISTENT_TEXT_PLAIN,messageBodyBytes);

這會使用分發模式2(持久化)來發送訊息, 優先順序為1,且content-type 為"text/plain".你可以使用Builder類來構建你自己的訊息屬性物件:

channel.basicPublish(exchangeName, routingKey,new AMQP.BasicProperties.Builder().contentType("text/plain").deliveryMode(2).priority(1).userId("bob").build()),messageBodyBytes);

下面的例子使用自定義的headers來發布訊息:

Map<String, Object> headers = new HashMap<String, Object>(); 
headers.put("latitude",  51.5252949); 
headers.put("longitude", -0.0905493);  
channel.basicPublish(exchangeName, routingKey,new AMQP.BasicProperties.Builder().headers(headers).build()),messageBodyBytes);

下面的例子使用expiration來發布訊息:

channel.basicPublish(exchangeName, routingKey,new AMQP.BasicProperties.Builder().expiration("60000").build()),messageBodyBytes);

BasicProperties is an inner class of the autogenerated holder class AMQP.

Invocations of Channel#basicPublish will eventually block if a resource-driven alarm is in effect.

Channels 和併發考慮(執行緒安全性)

Channel 例項不能在多個執行緒間共享。應用程式必須在每個執行緒中使用不同的channel例項,而不能將同個channel例項在多個執行緒間共享。 有些channl上的操作是執行緒安全的,有些則不是,這會導致傳輸時出現錯誤的幀交叉。
在多個執行緒共享channels也會干擾Publisher Confirms.

通過訂閱來來接收訊息

import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer;

接收訊息最高效的方式是用Consumer介面來訂閱。當訊息到達時,它們會自動地進行分發,而不需要顯示地請求。

當在呼叫Consumers的相關方法時, 個別訂閱總是通過它們的consumer tags來確定的, consumer tags可通過客戶端或服務端來生成,參考 the AMQP specification document
同一個channel上的消費者必須有不同的consumer tags.

實現Consumer的最簡單方式是繼承便利類DefaultConsumer.子類可通過在設定訂閱時,將其傳遞給basicConsume呼叫:

boolean autoAck = false; 
channel.basicConsume(queueName, autoAck, "myConsumerTag",new DefaultConsumer(channel) {          
@Override          
publicvoid handleDelivery(String consumerTag,Envelope envelope,AMQP.BasicProperties properties,byte[] body)throws IOException{              
String routingKey = envelope.getRoutingKey();              
String contentType = properties.getContentType();              
long deliveryTag = envelope.getDeliveryTag();              
// (process the message components here ...)              
channel.basicAck(deliveryTag, false);          
}      
});

在這裡,由於我們指定了autoAck = false,因此消費者有必要應答分發的訊息,最便利的方式是在handleDelivery 方法中處理.

更復雜的消費者可能需要覆蓋更多的方法,實踐中,handleShutdownSignal會在channels和connections關閉時呼叫,handleConsumeOk 會在其它消費者之前

呼叫

,傳遞consumer tag(不明白,要研究)。

消費者可實現handleCancelOk 和 handleCancel方法來接收顯示和隱式取消操作通知。

你可以使用Channel.basicCancel來顯示地取消某個特定的消費者:

channel.basicCancel(consumerTag);

passing the consumer tag.

消費者回調是在單獨執行緒上處理的,這意味著消費者可以安全地在Connection或Channel, 如queueDeclare, txCommit, basicCancel或basicPublish上呼叫阻塞方法。

每個Channel都有其自己的dispatch執行緒.對於一個消費者一個channel的大部分情況來說,這意味著消費者不會阻擋其它的消費者。如果在一個channel上多個消費者,則必須意識到長時間執行的消費者可能阻擋此channel上其它消費者回調排程.

獲取單個訊息

要顯示地獲取一個訊息,可使用Channel.basicGet.返回值是一個GetResponse例項, 在它之中,header資訊(屬性) 和訊息body都可以提取:

boolean autoAck = false; 
GetResponse response = channel.basicGet(queueName, autoAck); 
if (response == null) {     
// No message retrieved. 
} else {     
AMQP.BasicProperties props = response.getProps();     
byte[] body = response.getBody();     
long deliveryTag = response.getEnvelope().getDeliveryTag();     ...

因為autoAck = false,你必須呼叫Channel.basicAck來應答你已經成功地接收了訊息:

channel.basicAck(method.deliveryTag, false); // acknowledge receipt of the message }

處理未路由訊息

如果釋出訊息時,設定了"mandatory"標誌,但如果訊息不能路由的話,broker會將其返回到傳送客戶端 (通過 AMQP.Basic.Return 命令).

要收到這種返回的通知, clients可實現ReturnListener介面,並呼叫Channel.setReturnListener.如果channel沒有配置return listener,那麼返回的訊息會默默地丟棄。

channel.setReturnListener(new ReturnListener() {     
    publicvoid handleBasicReturn(int replyCode,String replyText,String exchange,String routingKey,AMQP.BasicProperties properties,byte[] body)     throws IOException {
         ...     
    } 
});

 return listener將被呼叫,例如,如果client使用"mandatory"標誌向未繫結佇列的direct型別交換器傳送了訊息.

關閉協議

AMQP client 關閉概述

AMQP 0-9-1 connection和channel 使用相同的方法來管理網路故障,內部故障,以及顯示本地關閉.

AMQP 0-9-1 connection  和 channel 有如下的生命週期狀態:

  • open: 準備要使用的物件
  • closing: 物件已顯示收到收到本地關閉通知, 並向任何支援的底層物件發出關閉請求,並等待其關閉程式完成
  • closed: 物件已收到所有底層物件的完成關閉通知,最終將執行關閉操作

這些物件總是以closed狀態結束的,不管基於什麼原因引發的關閉,比如:應用程式請求,內部client library故障, 遠端網路請求或網路故障.

AMQP connection 和channel 物件會持有下面與關閉相關的方法:

  • addShutdownListener(ShutdownListener 監聽器)和removeShutdownListener(ShutdownListener 監聽器),用來管理監聽器,當物件轉為closed狀態時,將會觸發這些監聽器.注意,在已經關閉的物件上新增一個ShutdownListener將會立即觸發監聽器
  • getCloseReason(), 允許同其互動以瞭解物件關閉的理由
  • isOpen(), 用於測試物件是否處於open狀態
  • close(int closeCode, String closeMessage), 用於顯示通知物件關閉

可以像這樣來簡單使用監聽器:

import com.rabbitmq.client.ShutdownSignalException; 
import com.rabbitmq.client.ShutdownListener;  
connection.addShutdownListener(new ShutdownListener() {     
public void shutdownCompleted(ShutdownSignalException cause)     {         ...     } }
);

關閉環境資訊

可通過顯示呼叫getCloseReason()方法或通過使用ShutdownListener類中的業務方法的cause引數來從ShutdownSignalException中獲取關閉原因的有用資訊.

ShutdownSignalException 類提供方法來分析關閉的原因.通過呼叫isHardError()方法,我們可以知道是connection錯誤還是channel錯誤.getReason()會返回相關cause的相關資訊,這些引起cause的方法形式-要麼是AMQP.Channel.Close方法,要麼是AMQP.Connection.Close (或者是null,如果是library中引發的異常,如網路通訊故障,在這種情況下,可通過getCause()方法來獲取資訊).

public void shutdownCompleted(ShutdownSignalException cause) { if (cause.isHardError()) {
Connection conn = (Connection)cause.getReference();
if (!cause.isInitiatedByApplication()) {
Method reason = cause.getReason(); ... } ... }
else { Channel ch = (Channel)cause.getReference(); ... } }

原子使用isOpen()方法

channel和connection物件的isOpen()方法不建議在在生產程式碼中使用,因為此方法的返回值依賴於shutdown cause的存在性. 下面的程式碼演示了竟爭條件的可能性:

public void brokenMethod(Channel channel) {     if (channel.isOpen())     {         // The following code depends on the channel being in open state.         // However there is a possibility of the change in the channel state         // between isOpen() and basicQos(1) call         ...         channel.basicQos(1);     } }

相反,我們應該忽略這種檢查,並簡單地嘗試這種操作.如果程式碼執行期間,connection的channel關閉了,那麼將丟擲ShutdownSignalException,這就表明物件處於一種無效狀態了.當broker意外關閉連線時,我們也應該捕獲由SocketException引發的IOException,或者當broker清理關閉時,捕獲ShutdownSignalException.

public void validMethod(Channel channel) {     try {         ...         channel.basicQos(1);     } catch (ShutdownSignalException sse) {         // possibly check if channel was closed         // by the time we started action and reasons for         // closing it         ...     } catch (IOException ioe) {         // check why connection was closed         ...     } }

高階連線選項

Consumer執行緒池

Consumer 執行緒預設是通過一個新的ExecutorService執行緒池來自動分配的(參考下面的Receiving).如果需要在newConnection() 方法中更好地控制ExecutorService,可以使用定製的執行緒池.下面的示例展示了一個比正常分配稍大的執行緒池:

ExecutorService es = Executors.newFixedThreadPool(20); Connection conn = factory.newConnection(es); 

Executors 和 ExecutorService 都是java.util.concurrent包中的類.

當連線關閉時,預設的ExecutorService將會被shutdown(), 但使用者自定義的ExecutorService (如上面所示)將不會被shutdown(). 提供自定義ExecutorService的Clients必須確保最終它能被關閉(通過呼叫它的shutdown() 方法), 否則池中的執行緒可能會阻止JVM終止.

同一個executor service,可在多個連線之間共享,或者連續地在重新連線上重用,但在shutdown()後,則不能再使用.

使用這種特性時,唯一需要考慮的是:在消費者回調的處理過程中,是否有證據證明有嚴重的瓶頸. 如果沒有消費者執行回撥,或很少,預設的配置是綽綽有餘. 開銷最初是很小的,分配的全部執行緒資源也是有界限的,即使偶爾可能出現一陣消費活動.

使用Host列表

可以傳遞一個Address陣列給newConnection(). Address只是 com.rabbitmq.client 包中包含host和port元件的簡單便利類. 例如:

Address[] addrArr = new Address[]{ new Address(hostname1, portnumber1)                                  , new Address(hostname2, portnumber2)}; Connection conn = factory.newConnection(addrArr); 

將會嘗試連線hostname1:portnumber1, 如果不能連線,則會連線hostname2:portnumber2,然後會返回陣列中第一個成功連線(不會丟擲IOException)上broker的連線.這完全等價於在factory上重複呼叫factory.newConnection()方法來設定host和port, 直到有一個成功返回.

如果提供了ExecutorService(在factory.newConnection(es, addrArr)中使用),那麼執行緒池將與第一個成功連線相關聯.

心跳超時

參考Heartbeats guide 來了解更多關於心跳及其在Java client中如何配置的更多資訊.

自定義執行緒工廠

像Google App Engine (GAE)這樣的環境會限制執行緒直接例項化. 在這樣的環境中使用RabbitMQ Java client, 需要配置一個定製的ThreadFactory,即使用合適的方法來例項化執行緒,如: GAE's ThreadManager. 下面是Google App Engine的相關程式碼.

import com.google.appengine.api.ThreadManager;  ConnectionFactory cf = new ConnectionFactory(); cf.setThreadFactory(ThreadManager.backgroundThreadFactory()); 

網路故障時自動恢復

Connection恢復

clients和RabbitMQ節點之間的連線可發生故障. RabbitMQ Java client 支援連線和拓撲(queues, exchanges, bindings, 和consumers)的自動恢復. 大多數應用程式的連線自動恢復過程會遵循下面的步驟:

  1. 重新連線
  2. 恢復連線監聽器
  3. 重新開啟通道
  4. 恢復通道監聽器
  5. 恢復通道basic.qos 設定,釋出者確認和事務設定

拓撲恢復包含下面的操作,每個通道都會執行下面的步驟:

  1. 重新宣告交換器(except for predefined ones)
  2. 重新宣告佇列
  3. 恢復所有繫結
  4. 恢復所有消費者

要啟用自動連線恢復,須使用factory.setAutomaticRecoveryEnabled(true):

ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(userName); factory.setPassword(password); factory.setVirtualHost(virtualHost); factory.setHost(hostName); factory.setPort(portNumber); factory.setAutomaticRecoveryEnabled(true); // connection that will recover automatically Connection conn = factory.newConnection();

如果恢復因異常失敗(如. RabbitMQ節點仍然不可達),它會在固定時間間隔後進行重試(預設是5秒). 時間間隔可以進行配置:

ConnectionFactory factory = new ConnectionFactory(); // attempt recovery every 10 seconds factory.setNetworkRecoveryInterval(10000);

當提供了address列表時,將會在所有address上逐個進行嘗試:

ConnectionFactory factory = new ConnectionFactory();  Address[] addresses = {new Address("192.168.1.4"), new Address("192.168.1.5")}; factory.newConnection(addresses);

恢復監聽器

可在可恢復連線和通道上註冊一個或多個恢復監聽器. 當啟用了連線恢復時,ConnectionFactory#newConnection 和 Connection#createChannel 的連線已實現了com.rabbitmq.client.Recoverable,並提供了兩個方法:

  • addRecoveryListener
  • removeRecoveryListener

注意,在使用這些方法時,你需要將connections和channels強制轉換為Recoverable.

釋出影響

當連線失敗時,使用Channel.basicPublish方法傳送的訊息將會丟失. client不會保證在連線恢復後,訊息會得到分發.要確保釋出的訊息到達了RabbitMQ,應用程式必須使用Publisher Confirms 

拓撲恢復

拓撲恢復涉及交換器,佇列,繫結以及消費者恢復.預設是啟用的,但也可以禁用:

ConnectionFactory factory = new ConnectionFactory();  Connection conn = factory.newConnection(); factory.setAutomaticRecoveryEnabled(true); factory.setTopologyRecoveryEnabled(false);

手動應答和自動恢復

當使用手動應答時,在訊息分發與應答之間可能存在網路連線中斷. 在連線恢復後,RabbitMQ會在所有通道上重設delivery標記. 也就是說,使用舊delivery標記的basic.ackbasic.nack, 以及basic.reject將會引發channel exception. 為了避免發生這種情況, RabbitMQ Java client可以跟蹤,更新,以使它們在恢復期間單調地增長. Channel.basicAck, Channel.basicNack, 以及Channel.basicReject 然後可以轉換這些 delivery tags,並且不再發送過期的delivery tags. 使用手動應答和自動恢復的應用程式必須負責處理重新分發.

未處理異常

關於connection, channel, recovery, 和consumer lifecycle 的異常將會委派給exception handler,Exception handler是實現了ExceptionHandler介面的任何物件. 預設情況下,將會使用DefaultExceptionHandler例項,它會將異常細節輸出到標準輸出上.

可使用ConnectionFactory#setExceptionHandler來覆蓋原始handler,它將被用於由此factory建立的所有連線:

ConnectionFactory factory = new ConnectionFactory(); cf.setExceptionHandler(customHandler);         

Exception handlers 應該用於異常日誌.

Google App Engine上的RabbitMQ Java Client

在Google App Engine (GAE) 上使用RabbitMQ Java client,必須使用一個自定義的執行緒工廠來初始化執行緒,如使用GAE's ThreadManager. 此外,還需要設定一個較小的心跳間隔(4-5 seconds) 來避免InputStream 上讀超時:

ConnectionFactory factory = new ConnectionFactory(); cf.setRequestedHeartbeat(5);         

警告和限制

為了能使拓撲恢復, RabbitMQ Java client 維持了宣告佇列,交換器,繫結的快取. 快取是基於每個連線的.某些RabbitMQ的特性使得客戶端不能觀察到拓撲的變化,如,當佇列因TTL被刪除時. RabbitMQ Java client 會嘗試在下面的情況中使用快取實體失效:

  • 當佇列被刪除時.
  • 當交換器被刪除時.
  • 當繫結被刪除時.
  • 當消費者在自動刪除佇列上退出時.
  • 當佇列或交換器不再繫結到自動刪除的交換器上時.

然而, 除了單個連線外,client不能跟蹤這些拓撲變化. 依賴於自動刪除佇列或交換器的應用程式,正如TTL佇列一樣 (注意:不是訊息TTL!), 如果使用了自動連線恢復,在知道不再使用或要刪除時,必須明確地刪除這些快取實體,以淨化 client-side 拓撲cache. 這些可通過Channel#queueDelete, Channel#exchangeDelete,Channel#queueUnbind, and Channel#exchangeUnbind來進行.

RPC (Request/Reply) 模式

為了便於程式設計, Java client API提供了一個使用臨時回覆佇列的RpcClient類來提供簡單的RPC-style communication.

此類不會在RPC引數和返回值上強加任何特定格式. 它只是簡單地提供一種機制來向帶特定路由鍵的交換器傳送訊息,並在回覆佇列上等待響應.

import com.rabbitmq.client.RpcClient;  
RpcClient rpc = new RpcClient(channel, exchangeName, routingKey);

(其實現細節為:請求訊息使用basic.correlation_id唯一值欄位來發送訊息,並使用basic.reply_to來設定響應佇列的名稱.)

一旦你建立此類的例項,你可以使用下面的任意一個方法來發送RPC請求:

byte[] primitiveCall(byte[] message); 
String stringCall(String message) Map mapCall(Map message) Map mapCall(Object[] keyValuePairs)

primitiveCall 方法會將原始byte陣列轉換為請求和響應的訊息體. stringCall只是一個primitiveCall的簡單包裝,將訊息體視為帶有預設字符集編碼的String例項.

mapCall 變種稍為有些複雜: 它會將原始java值包含在java.util.Map中,並將其編碼為AMQP 0-9-1二進位制表示形式,並以同樣的方式來解碼response. (注意:在這裡,對一些值物件型別有所限制,具體可參考javadoc.)

所有的編組/解組便利方法都使用primitiveCall來作為傳輸機制,其它方法只是在它上面的做了一個封裝.

posted on 2016-06-04 00:37 胡小軍 閱讀(9614) 評論(1)  編輯  收藏 所屬分類: RabbitMQ