RabbitMQ開發庫的完整API文件(翻譯)
背景
我的Android端需要實時的接收RabbitMQ推送過來的訊息,故找了對應的文件檢視用法。這裡特地翻譯出來方便自己和諸位博友日後學習分享。
譯文連結
我的譯文
Java客戶端API指南
該指南涵蓋了RabbitMQ Java客戶端API。然而,這並不是一個教程。這些都可以在
5.x版本系列的庫需要JDK 8,用於編譯和執行時。在Android上,這意味著只支援Android 7.0或更高的版本。4.x釋出系列支援JDK 6和Android7.0之前的版本。
該庫是開源的,並且有以下三種授權:
這意味著使用者可以考慮在上面的三種授權列表中的任何許可下使用該庫。例如,使用者可以選擇Apache Public License 2.0,並將該客戶端包含到一個商業產品中。在GPLv2下獲得許可的程式碼庫可以選擇GPLv2,等等。
API引用(JavaDoc)是單獨可用的。
還有一些命令列工具,這些工具曾經與Java客戶端一起使用。
客戶端API與AMQP 0-9-1協議規範進行了密切的建模,並提供了額外的抽象以方便使用。
概述
RabbitMQ Java客戶端使用 com.RabbitMQ.client 作為它的top-level package。關鍵類和介面是:
Channel
Connection
ConnectionFactory
Consumer
- 1
- 2
- 3
- 4
協議操作通過 Channel 介面可用。Connection 用於開啟channels,註冊連線生命週期事件處理程式,以及關閉不再需要的連線。Connections 通過ConnectionFactory例項化,這是您配置各種連線設定的方式,例如vhost或使用者名稱。
Connections and Channels
核心API類是Connection 和Channel,分別表示AMQP 0-9-1連線和通道。這些是使用前需要匯入的:
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
- 1
- 2
連線到一個代理
下面的程式碼使用給定的引數(主機名、埠號等)連線到AMQP代理。
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(userName);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
factory.setHost(hostName);
factory.setPort(portNumber);
Connection conn = factory.newConnection();
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
對於在本地執行的RabbitMQ伺服器,所有這些引數都有合理的預設值
或者,也可以使用uri:
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://userName:[email protected]:portNumber/virtualHost");
Connection conn = factory.newConnection();
- 1
- 2
- 3
所有這些引數對於執行在本地的RabbitMQ伺服器都有合理的預設值。
然後可以使用 Connection 介面開啟通道:
Channel channel = conn.createChannel();
- 1
該通道現在可以用於傳送和接收訊息,後續部分會有講解。
要斷開連線,只需關閉通道和連線:
channel.close();
conn.close();
- 1
- 2
請注意,關閉通道可能被認為是良好的習慣,但在這裡並不是絕對必要的,因為當底層連線關閉後,任何情況下通道都會自動的關閉
使用 Exchanges and Queues(佇列)
客戶端應用程式與交換機和佇列一起工作,即AMQP的高階構建塊。這些必須在使用之前被“宣告”。宣告任何型別的物件只會確保其中一個名稱存在,並在必要時建立它。
繼續之前的例子,下面的程式碼聲明瞭一個交換機和一個佇列,然後將它們繫結在一起。
channel.exchangeDeclare(exchangeName, "direct", true);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, exchangeName, routingKey);
- 1
- 2
- 3
這將主動宣告以下的物件,這兩個物件都可以通過使用額外的引數進行定製。這裡,物件有一些特殊的引數體現:
一個持久的、非自動刪除的“直接”型別的exchange(交換機)
有一個已知名稱的、非持久的、專有的、自動刪除的佇列
上面的函式呼叫然後用給定的路由鍵將佇列繫結到交換機。
注意,當只有一個客戶端希望使用它時,這將是宣告佇列的一種典型方式:它不需要一個已知的名稱,其他客戶端不能獨佔佇列,並且佇列會自動清理(自動刪除)。如果有幾個客戶端想要共享一個已知名稱的佇列,那麼這段程式碼將是所需要的:
channel.exchangeDeclare(exchangeName, "direct", true);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
- 1
- 2
- 3
- 4
- 5
這些會主動宣告:
一個持久的、非自動刪除的“直接”型別的exchange(交換機)
有一個已知名稱的、持久的、非專有的(這裡理解為可共享的)、非自動刪除的佇列
請注意,所有這些 Channel API方法都是過載的。對於exchangeDeclare, queueDeclare 和 queueBind 這些方便的簡寫形式都是使用了合理的預設。還有更多引數的更長的形式,可以讓您根據需要覆蓋這些預設值,以便在需要的時候提供絕對控制權。
這種“簡寫形式、長形式(這裡理解為帶更多引數的一種形式)”模式在客戶端的API中使用。
釋出訊息(Publishing messages)
要將訊息釋出到交換機中,請使用 Channel.basicPublish 如下:
byte[] messageBodyBytes = "Hello, world!".getBytes();
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);
- 1
- 2
為了獲得良好的控制,您可以使用過載的變式來指定 mandatory 標誌,或者使用預先設定的訊息屬性來發送訊息:
channel.basicPublish(exchangeName, routingKey, mandatory,
MessageProperties.PERSISTENT_TEXT_PLAIN,
messageBodyBytes);
- 1
- 2
- 3
- 4
傳送一條訊息,mode是2(即持久化的),priority 為1,content-type 為 “text/plain”。您可以構建自己的訊息屬性物件,使用一個Builder類,您可以使用您喜歡的屬性,例如:
channel.basicPublish(exchangeName, routingKey,
new AMQP.BasicProperties.Builder().contentType("text/plain")
.deliveryMode(2)
.priority(1).userId("bob")
.build()),
messageBodyBytes);
舉個例子, 發一個帶自定義頭部的訊息:
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("latitude", 51.5252949);
headers.put("longitude", -0.0905493);
- 1
- 2
- 3
channel.basicPublish(exchangeName, routingKey,
new AMQP.BasicProperties.Builder()
.headers(headers)
.build()),
messageBodyBytes);
- 1
- 2
- 3
- 4
- 5
這個例子釋出了一個expiration的訊息:
channel.basicPublish(exchangeName, routingKey,
new AMQP.BasicProperties.Builder()
.expiration("60000")
.build()),
messageBodyBytes);
- 1
- 2
- 3
- 4
- 5
我們還沒有說明所有的可能性。
注意,BasicProperties是自動生成的holder類AMQP的內部類。
如果資源驅動的警報生效,那麼 Channel#basicPublish 的呼叫最終會被阻塞。
通道和併發性考慮事項(執行緒安全)
根據已有經驗,線上程之間共享 Channel 例項是可以避免的。應用程式應該選擇一個 Channel 對應一個執行緒,而不是在多個執行緒之間共享同一個 Channel 。
雖然通道上的一些操作是可以同時呼叫的,但有些操作是不安全的,並且會導致不正確的幀間交錯問題,雙確認問題等等。
共享通道上的併發釋出可能會導致連線上不正確的幀,從而觸發連線級別的協議異常和連線關閉。因此,它需要在應用程式程式碼中顯式同步( Channel#basicPublish 必須在關鍵部分中呼叫)。線上程之間共享通道也會干擾釋出者的確認。我們強烈建議在共享通道上避免併發釋出。
在共享通道時,在一個執行緒中消費,在另一個執行緒中釋出可以是安全的。
Server-pushed deliveries (見下面的部分)是同時發出的,保證每個通道的順序被保留。
對每個連線,分派機制使用java.util.concurrent.ExecutorService。可以提供一個自定義執行器,它將由一個ConnectionFactory的ConnectionFactory#setSharedExecutor 呼叫所產生的所有連線共享。
當使用手動確認時,重要的是要考慮哪些執行緒做了確認。如果它與接收到的執行緒不同(例如,Consumer#handleDelivery委託給不同的執行緒)將 multiple 引數設定為 true 是不安全的,並將導致雙重確認,因此會出現通道級協議異常,從而關閉通道。因此一次只確認一條資訊可以是安全的。
通過訂閱接收訊息(”Push API”)
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
- 1
- 2
接收訊息的最有效方式是使用 Consumer 介面設定訂閱。訊息將在到達時自動傳送,而不用顯式地請求。在呼叫與Consumers有關的API方法時,個人訂閱總是與它們的消費者標籤相關聯。消費者標籤是消費者的唯一識別符號,它可以是客戶端,也可以是伺服器生成的。為了讓RabbitMQ生成一個節點範圍的惟一標記,使用 Channel#basicConsume 方法重寫,它不需要攜帶一個消費者標記引數,或者傳遞一個空字串給消費者標記,並使用 Channel#basicConsume 方法返回的值。消費者標籤可被用來取消消費者。
不同的消費者例項必須具有不同的消費者標記。對連線上的重複的消費者標籤是強烈反對的,當消費者被監控時,會導致自動連線恢復和令人困惑的監控資料的問題。
實現Consumer 的最簡單方法是例項化 DefaultConsumer 類。這個子類的一個物件可以通過basicConsume的呼叫來設定訂閱:
boolean autoAck = false;
channel.basicConsume(queueName, autoAck, "myConsumerTag",
new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)
throws IOException
{
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
long deliveryTag = envelope.getDeliveryTag();
// (這裡進行訊息元件的處理 ...)
channel.basicAck(deliveryTag, false);
}
});
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
在這裡,因為我們指定了autoAck=false,有必要確認訊息傳遞給消費者,最方便的是在 handleDelivery 方法中完成,如上所示。
更復雜的Consumers需要覆蓋更多的方法。特別地,當通道和連線關閉時,handleShutdownSignal被呼叫,而handleConsumeOk方法會在任何其他回撥之前傳遞消費者標籤給Consumer。
消費者還可以實現handleCancelOk 和handleCancel 方法,分別通知顯式和隱式取消。
通過消費者標籤,你可以用Channel.basicCancel明確地取消一個特定的Consumer :channel.basicCancel(consumerTag);
就像釋出者一樣,考慮消費者的併發性風險也是很重要的。
對使用者的回撥被分派到執行緒池中,該執行緒池與例項化Channel的執行緒分開。這意味著消費者可以安全地呼叫Connection 或Channel上的阻塞方法,例如 Channel#queueDeclare 或者 Channel#basicCancel。
每個Channel 都有自己的分派執行緒,對於一個Channel 中一個Consumer 最常見的使用情況,這意味著Consumers不支援其他Consumers。如果一個通道都有多個消費者,那麼一個長時間執行的消費者可能會持有該通道上的其他消費者的回撥的引用。
關於併發性和併發性危害安全的其他主題,請參閱併發性考慮(執行緒安全)部分
恢復個人訊息(“Pull API”)
要顯式地檢索訊息,請使用Channel.basicGet。返回值是GetResponse的一個例項,從這個例項中可以提取頭資訊(屬性)和訊息體:
boolean autoAck = false;
GetResponse response = channel.basicGet(queueName, autoAck);
if (response == null) {
// No message retrieved.
} else {
AMQP.BasicProperties props = response.getProps();
byte[] body = response.getBody();
long deliveryTag = response.getEnvelope().getDeliveryTag();
...
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
而且由於設定了autoAck =false,您還必須呼叫Channel.basicAck 方法確認你確實已經成功地收到了這樣的資訊:
...
channel.basicAck(method.deliveryTag, false); // 確認收到的訊息
- 1
- 2
處理 unroutable(無法傳送的) 訊息
如果訊息釋出時帶有“強制”標誌,但是不能被路由,代理將把它返回給傳送客戶端(通過AMQP.Basic.Return 命令)。
收到返回通知,客戶端可以實現ReturnListener 介面或者呼叫Channel.addReturnListener。如果客戶端沒有為特定的通道配置一個返回監聽,那麼相關的返回訊息將被預設丟棄。
channel.addReturnListener(new ReturnListener() {
public void handleReturn(int replyCode,
String replyText,
String exchange,
String routingKey,
AMQP.BasicProperties properties,
byte[] body)
throws IOException {
...
}
});
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
例如,如果客戶端將帶有“mandatory”標誌的訊息釋出到一個沒有繫結到佇列的“direct”型別的交換機上,則將呼叫一個返回監聽。
關閉協議
AMQP客戶端關閉的概述
AMQP 0-9-1連線和通道共享相同的通用方法來管理網路故障、內部故障和顯式的本地關閉。
AMQP 0-9-1連線和通道具有以下生命週期狀態:
open: 物件已經準備好使用了
closing:該物件已被顯式地通知在本地關閉,並向任何支援底層物件的物件發出了關閉請求,並等待它們的關閉過程完成。
closed:該物件從任何較低級別的物件中接收了所有的關閉完成通知,結果已經關閉了它自己。
- 1
- 2
- 3
- 4
這些物件總是處於關閉狀態,不管導致閉包的原因是什麼,比如應用程式請求、內部客戶機庫故障、遠端網路請求或網路故障。
AMQP連線和通道物件擁有以下與關閉相關的方法:
removeShutdownListener(ShutdownListener listener)
addShutdownListener(ShutdownListener listener)
- 1
- 2
- 3
管理任何監聽器,當物件轉換到關閉狀態時將被觸發。注意,向已經關閉的物件新增一個ShutdownListener將立即觸發偵聽器
getCloseReason(),允許調查物件關閉的原因是什麼
isOpen(),用於測試物件是否處於開放狀態
close(int closeCode, String closeMessage), 顯式地通知物件關閉
- 1
- 2
- 3
- 4
- 5
監聽器的簡單用法如下:
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.client.ShutdownListener;
connection.addShutdownListener(new ShutdownListener() {
public void shutdownCompleted(ShutdownSignalException cause)
{
...
}
});
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
關於關閉情況的一些資訊
可以檢索ShutdownSignalException,其中包含關於關閉原因的所有可用資訊,或者通過顯式呼叫getCloseReason()方法,或者使用ShutdownListener類的服務(ShutdownSignalException起因)方法中的原因引數。
ShutdownSignalException類提供了分析關閉原因的方法。通過呼叫isHardError()方法,我們可以獲得資訊,無論它是連線還是通道錯誤,而getReason()將返回關於該原因的資訊,這是一個AMQP方法,即AMQP.Channel.Close或AMQP.Connection.Close (如果原因在庫中是某個異常,將返null,比如網路通訊失敗,在這種情況下,可以用getCause()來檢索異常)。
public void shutdownCompleted(ShutdownSignalException cause)
{
if (cause.isHardError())
{
Connection conn = (Connection)cause.getReference();
if (!cause.isInitiatedByApplication())
{
Method reason = cause.getReason();
...
}
...
} else {
Channel ch = (Channel)cause.getReference();
...
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
isOpen()方法的原子性和使用
對於生產程式碼,不建議使用通道和連線物件的isOpen()方法,因為方法返回的值依賴於關閉原因。下面的程式碼說明了競態條件的可能性:
public void brokenMethod(Channel channel)
{
if (channel.isOpen())
{
// The following code depends on the channel being in open state.
// However there is a possibility of the change in the channel state
// between isOpen() and basicQos(1) call
...
channel.basicQos(1);
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
相反,我們通常應該忽略這種檢查,並簡單地嘗試所需的操作。如果在程式碼執行期間,連線的通道被關閉,則會丟擲一個ShutdownSignalException,指示該物件處於無效狀態。我們還應該捕獲由SocketException引起的IOException,當代理關閉連線時,或者當代理啟動清理關閉時,或者出現ShutdownSignalException。
public void validMethod(Channel channel)
{
try {
...
channel.basicQos(1);
} catch (ShutdownSignalException sse) {
// possibly check if channel was closed
// by the time we started action and reasons for
// closing it
...
} catch (IOException ioe) {
// check why connection was closed
...
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
高階連線選項
消費者執行緒池
在預設情況下,消費者執行緒(see Receivingbelow)將自動在一個新的ExecutorService執行緒池中分配 。如果需要在newConnection()方法中提供一個ExecutorService用以更好的控制,那麼就需要使用這個執行緒池下面是一個比通常分配的更大的執行緒池的例子:
ExecutorService es = Executors.newFixedThreadPool(20);
Connection conn = factory.newConnection(es);
- 1
- 2
Executors 和ExecutorService 類都在java.util.concurrent 包中。
當連線關閉時,預設的ExecutorService將會關閉,但是使用者提供的ExecutorService(如上面的es)不會關閉。提供定製的ExecutorService的客戶端必須確保它最終被關閉(通過呼叫它的shutdown() 方法),否則池的執行緒可能會阻止JVM終止。
相同的執行器服務可以在多個連線之間共享,也可以在重新連線上進行序列重用,但在shutdown()之後不能使用。
只有當有證據表明在處理消費者回調過程中存在嚴重的瓶頸時,才應該考慮使用該特性。如果沒有執行或很少的消費者回調,那麼預設的分配就足夠了。開銷最初是最小的,並且分配的執行緒資源是有界的,即使偶爾會出現消費活動的爆發。
使用主機列表
可以將一個Address 陣列傳遞給newConnection()。Address 只是com.rabbitmq.client 包中的一個方便類,帶有主機和埠元件。舉個列子:
Address[] addrArr = new Address[]{ new Address(hostname1, portnumber1)
, new Address(hostname2, portnumber2)};
Connection conn = factory.newConnection(addrArr);
- 1
- 2
- 3
如果它不能連線到hostname2:portnumber2,將嘗試連線到hostname1:portnumber1。直到地址陣列中的某個地址第一次成功連線之後返回。這完全等同於在工廠中反覆地設定主機和埠,每次呼叫factory.newconnection(),直到其中一個成功。
如果還提供了一個ExecutorService(使用factory.newConnection(es, addrArr)形式)執行緒池將與(第一個)成功連線相關聯。
如果您想要對主機進行更多的控制,請檢視服務發現的支援。
使用AddressResolver介面的服務發現
在3.6.6版本中,可以讓AddressResolver的實現在建立連線時選擇在哪裡進行連線:
Connection conn = factory.newConnection(addressResolver);
- 1
AddressResolver介面如下:
public interface AddressResolver {
List<Address> getAddresses() throws IOException;
}
- 1
- 2
- 3
- 4
- 5
就像一個主機列表一樣,返回的第一個地址將首先被嘗試,然後是第二個地址,如果客戶端不能連線到第一個地址,等等。
如果還提供了一個ExecutorService(使用factory.newConnection(es, addressResolver)形式)執行緒池將與(第一個)成功連線相關聯。
AddressResolver是實現自定義服務發現邏輯的完美場所,這在動態基礎結構中尤其有用。與自動恢復相結合,客戶端可以自動連線到第一次啟動時甚至不啟動的節點。關聯和負載平衡是自定義AddressResolver可能有用的其他場景。
Java客戶端附帶了以下實現(請參閱javadoc以獲得詳細資訊):
DnsRecordIpAddressResolver:給定主機名,返回其IP地址(針對平臺DNS伺服器的解析)。這對於簡單的基於dns負載平衡或故障轉移非常有用。
DnsSrvRecordAddressResolver:給定服務的名稱,返回主機名/埠對。搜尋是作為一個DNS SRV請求實現的。這在使用類似於HashiCorp Consul的服務註冊時非常有用。
心跳超時
請參閱Heartbeats guide,瞭解關於heartbeats的更多資訊,以及如何在Java客戶機中配置它們。
自定義執行緒工廠
像Google App Engine(GAE)這樣的環境可以限制直接的執行緒例項化。要在這樣的環境中使用RabbitMQ Java客戶端,有必要配置一個自定義的ThreadFactory,該工廠使用適當的方法來例項化執行緒,例如GAE的ThreadManager。下面是Google App Engine的一個例子:
import com.google.appengine.api.ThreadManager;
ConnectionFactory cf = new ConnectionFactory();
cf.setThreadFactory(ThreadManager.backgroundThreadFactory());
- 1
- 2
- 3
- 4
對Java非阻塞IO的支援
4.0版本的Java客戶端的為Java非阻塞IO提供支援(a.k.a Java NIO)。NIO不應該比阻塞IO更快,它只允許更容易地控制資源(在本例中是執行緒)。
使用預設的阻塞IO模式,每個連線都使用一個執行緒從網路套接字讀取。使用NIO模式,您可以控制網路套接字讀/寫的執行緒數量。
如果您的Java程序使用許多連線(數十或數百),請使用NIO模式。您應該使用較少的執行緒,而不是預設的阻塞模式。使用適當數量的執行緒集,您不應該嘗試任何效能的下降,特別是如果連線不是很忙的話。
必須顯式地啟用NIO:
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.useNio();
- 1
- 2
NIO模式可以通過NioParams類進行配置:
connectionFactory.setNioParams(new NioParams().setNbIoThreads(4));
- 1
- 2
NIO模式使用合理的預設值,但是您可能需要根據您自己的工作負載來更改它們。其中一些設定是:使用的IO執行緒總數、緩衝區的大小、用於IO迴圈的服務執行器、記憶體中的寫佇列的引數(寫請求在被髮送到網路之前入隊)。請閱讀Javadoc以獲得詳細資訊和預設值。
從網路故障中自動恢復
連線恢復
客戶端和RabbitMQ節點之間的網路連線可能會失敗。RabbitMQ Java客戶端支援連線和拓撲(佇列、交換、繫結和消費者)的自動恢復。許多應用程式的自動恢復過程遵循以下步驟:
- Reconnect(重新連線)
- Restore connection listeners(恢復連線監聽器)
- Re-open channels(重新開啟通道)
- Restore channel listeners(恢復通道偵聽器)
- Restore channel basic.qos setting, publisher confirms and transaction settings(恢復基本頻道。qos設定、釋出者確認和事務設定)
拓撲恢復包括以下操作,為每個通道執行
- 1
- Re-declare exchanges (except for predefined ones)(重新宣告交換(除了預定義的交換))
- Re-declare queues(重新宣告佇列)
- Recover all bindings(恢復所有繫結)
- Recover all consumers(恢復所有消費者)
在4.0.0版本的Java客戶端中,預設情況下自動恢復是啟用的(因此也可以進行拓撲恢復)。
使用factory.setAutomaticRecoveryEnabled(boolean)
方法可以禁用或啟用自動連線恢復。下面的程式碼片段展示瞭如何顯式地啟用自動恢復(例如,在4.0.0之前的Java客戶端):
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername(userName);
factory.setPassword(password);
factory.setVirtualHost(virtualHost);
factory.setHost(hostName);
factory.setPort(portNumber);
factory.setAutomaticRecoveryEnabled(true);
// connection that will recover automatically
Connection conn = factory.newConnection();
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
如果由於異常(例如RabbitMQ節點仍然不能到達)而恢復失敗,那麼在固定的時間間隔之後將重新嘗試(預設為5秒)。可以配置間隔時間:
ConnectionFactory factory = new ConnectionFactory();
// attempt recovery every 10 seconds(每10秒嘗試一次恢復)
factory.setNetworkRecoveryInterval(10000);
- 1
- 2
- 3
當提供一個地址列表時,列表會被打亂,所有的地址都會被嘗試,一個接著一個:
ConnectionFactory factory = new ConnectionFactory();
Address[] addresses = {new Address("192.168.1.4"), new Address("192.168.1.5")};
factory.newConnection(addresses);
- 1
- 2
- 3
- 4
恢復監聽器
可以在可恢復的連線和通道上註冊一個或多個恢復偵聽器。當啟用連線恢復,連線將會返回通過實現com.rabbitmq.client.Recoverable包下的兩個方法ConnectionFactory#newConnection 和Connection#createChannel 。提供兩種具有相當描述性名稱的方法:
addRecoveryListener
removeRecoveryListener
- 1
- 2
注意,為了使用這些方法,您現在需要將連線和通道轉換為Recoverable
對釋出的影響
當連線關閉時,使用Channel.basicPublish釋出的訊息將丟失。客戶端不會在連線恢復後將它們儲入佇列。為了確保釋出的訊息能夠到達RabbitMQ應用程式,需要使用釋出者確認和對連線失敗做出解釋說明。
拓撲恢復
拓撲恢復涉及到交換、佇列、繫結和消費者的恢復。當啟用自動恢復功能時,預設啟用它。因此,在Java客戶端4.0.0中預設啟用拓撲恢復。
如果需要,可以顯式地禁用拓撲恢復:
ConnectionFactory factory = new ConnectionFactory();
Connection conn = factory.newConnection();
// enable automatic recovery (e.g. Java client prior 4.0.0)
// 啟用自動恢復(例如,在4.0.0之前的Java客戶端)
factory.setAutomaticRecoveryEnabled(true);
// disable topology recovery
// 禁用拓撲復蘇
factory.setTopologyRecoveryEnabled(false);
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
故障檢測和恢復限制
自動連線恢復有許多限制和專門的設計決策,應用程式開發人員需要知道這些決策。
當連線關閉或丟失時,它需要時間來檢測。因此,有一個時間視窗,其中庫和應用程式都不知道連線失敗的有效性。在此期間釋出的任何訊息都將被序列化,並像往常一樣被寫入TCP套接字。他們向代理的交付只能通過釋出者確認:AMQP 0-9-1的釋出完全是非同步的。
當一個連線啟用了自動恢復的連線時,當一個套接字或輸入/輸出操作錯誤被檢測到時,在預設情況下,恢復將在預設情況下啟動,預設為5秒。該設計假設,即使許多網路故障是短暫的,並且通常是短暫的,但它們不會立即消失。連線恢復嘗試將在相同的時間間隔內繼續,直到成功開啟新連線。
當連線處於恢復狀態時,任何在其通道上嘗試的釋出都將被一個異常拒絕。針對外出的訊息,客戶端當下不執行任何內部緩衝。當恢復成功時,應用程式開發人員負責跟蹤這些訊息並重新發布它們。釋出者確認是一個協議擴充套件,它應該被那些不能承受訊息損失的釋出者使用。
當通道由於通道級異常而關閉時,連線恢復將不會啟動。此類異常通常表示應用程式級別的問題。該library無法對何時發生這種情況作出明智的決定。
即使在連線恢復啟動後,關閉通道也無法恢復。這包括顯式關閉通道和上面的通道級異常情況。
手動確認和自動恢復
當使用手動確認時,在訊息傳遞和確認之間可能會出現與RabbitMQ節點的網路連線失敗情況。在連線恢復之後,RabbitMQ將在所有通道上重新設定傳輸標記。這意味著basic.ack、basic.nack和basic.reject 使用舊的交付標記將導致通道異常。為了避免這種情況,RabbitMQ Java客戶端保持跟蹤和更新發送標記,使它們在恢復之間單調地增長。basicack()、channel.basicnack()和channel.basicreject()然後將調整後的交付標籤翻譯成RabbitMQ使用的標籤。使用過期的交付標籤將不會被髮送,使用手動確認和自動恢復的應用必須能夠處理重新交付的情況。
未處理異常
與連線、通道、恢復和消費者生命週期相關的未處理異常被委託給異常處理程式。異常處理程式是實現了ExceptionHandler 介面的物件。預設情況下,使用了DefaultExceptionHandler的例項。它將異常細節輸出到標準輸出。
可以使用ConnectionFactory#setExceptionHandler來覆蓋處理程式。它將用於工廠建立的所有連線:
ConnectionFactory factory = new ConnectionFactory();
cf.setExceptionHandler(customHandler);
- 1
- 2
異常處理程式應該用於異常日誌記錄。
度量和監控
在版本4.0.0中,客戶端收集執行時指標(如已釋出的訊息的數量)。度量指標集合是可選的,並在ConnectionFactory級別上設定,使用setmetricscollsetter(metricscollsetter)方法。這個方法需要一個度量標準的例項,這個例項在客戶端程式碼的幾個地方被呼叫。
客戶端支援Micrometer(版本4.3)和下拉向導的標準。
以下是收集到的資料:
Number of open connections (開啟的連線數量)
Number of open channels (開啟的通道數量)
Number of published messages (已經發布了的訊息)
Number of consumed messages (消費了的訊息數量)
Number of acknowledged messages (確認了的訊息數量)
Number of rejected messages (被拒絕的資訊)
- 1
- 2
- 3
- 4
- 5
- 6
對於與訊息相關的度量標準,不管是測微計和Dropwizard指標都提供了計數,但也包括平均速率、最後五分鐘速率等。他們還支援用於監視和報告的常用工具(JMX, Graphite, Ganglia, Datadog, etc)。請參閱下面的專用部分,瞭解更多細節。
請注意以下有關度量標準的收集:
不要忘記在使用Micrometer 或Dropwizard 度量時,將適當的依賴項(in Maven, Gradle, 或者作為 JAR 檔案)新增到JVM的classpath中。這些是可選的依賴項,並且不會被Java客戶端自動引用。您還可能需要根據所使用的reporting backend(s)來新增其他依賴項。
指標集合是可擴充套件的。為特定的需求實現一個自定義的MetricsCollector是被鼓勵的。
MetricsCollector是在ConnectionFactory級別設定的,但是可以在不同的例項之間共享。
度量收集不支援事務。例如,如果在事務中傳送一個確認資訊,然後事務被回滾,那麼確認就會被計入客戶端指標(但顯然不是由代理進行的)。注意,確認實際上被髮送到代理,然後被事務回滾取消,因此客戶端指標在傳送的確認中是正確的。作為總結,不要使用客戶度量來作為關鍵的業務邏輯,它們不能保證是完全準確的。它們的目的是簡化關於執行系統的推理,使操作更加高效。
- 1
- 2
- 3
- 4
Micrometer support
您可以以下方式啟用Micrometer指標收集 :
ConnectionFactory connectionFactory = new ConnectionFactory();
MicrometerMetricsCollector metrics = new MicrometerMetricsCollector();
connectionFactory.setMetricsCollector(metrics);
...
metrics.getPublishedMessages(); // 獲取 Micrometer's Counter 物件
- 1
- 2
- 3
- 4
- 5
Micrometer 支援several reporting backends: Netflix Atlas, Prometheus, Datadog, Influx, JMX, 等待等。
你通常會把MeterRegistry 的例項傳遞給MicrometerMetricsCollector。
下面是一個使用JMX的示例:
JmxMeterRegistry registry = new JmxMeterRegistry();
MicrometerMetricsCollector metrics = new MicrometerMetricsCollector(registry);
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setMetricsCollector(metrics);
- 1
- 2
- 3
- 4
- 5
Dropwizard指標支援
你可以採用以下方式使用Dropwizard的資料收集:
ConnectionFactory connectionFactory = new ConnectionFactory();
StandardMetricsCollector metrics = new StandardMetricsCollector();
connectionFactory.setMetricsCollector(metrics);
...
metrics.getPublishedMessages(); // 得到指標的計量物件
- 1
- 2
- 3
- 4
- 5
- 6
Dropwizard度量支援多個several reporting backends:console, JMX, HTTP, Graphite, Ganglia, etc。
你通常會把MetricsRegistry的例項傳遞給StandardMetricsCollector。
MetricRegistry registry = new MetricRegistry();
StandardMetricsCollector metrics = new StandardMetricsCollector(registry);
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setMetricsCollector(metrics);
JmxReporter reporter = JmxReporter
.forRegistry(registry)
.inDomain("com.rabbitmq.client.jmx")
.build();
reporter.start();
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
谷歌應用引擎上的RabbitMQ Java客戶端
在Google App Engine(GAE)上使用RabbitMQ Java客戶端需要使用一個自定義執行緒工廠,該工廠使用GAE的ThreadManager例項化執行緒(見上文)。此外,還需要設定一個低心跳間隔(4-5秒),以避免在GAE上執行低InputStream讀取超時:
ConnectionFactory factory = new ConnectionFactory();
cf.setRequestedHeartbeat(5);
- 1
- 2
警告和限制
為了使拓撲恢復成為可能,RabbitMQ Java客戶端維護已宣告的佇列、交換和繫結的快取。快取是每個連線。某些RabbitMQ特性使得客戶端不可能觀察到某些拓撲更改,例如,由於TTL而刪除了一個佇列。RabbitMQ Java客戶機嘗試在下面的情況下使快取項無效:
When queue is deleted.(當佇列被刪除)
When exchange is deleted.(當交換被刪除)
When binding is deleted.(當繫結被刪除)
When consumer is cancelled on an auto-deleted queue.(當消費者被自動刪除的佇列取消時。)
When queue or exchange is unbound from an auto-deleted exchange.(當佇列或交換從自動刪除的交換中釋放時。)
- 1
- 2
- 3
- 4
- 5
但是,客戶端無法跟蹤這些拓撲更改,而不僅僅是單個連線。依賴於自動刪除佇列或交換的應用程式,以及佇列TTL(注意:不是訊息TTL!),以及使用自動連線恢復,應該顯式地刪除那些不被使用或刪除的實體,以清除客戶端拓撲快取。這是由Channel#queueDelete, Channel#exchangeDelete, Channel#queueUnbind和Channel#exchangeUnbind 在RabbitMQ 3.3.x中具有冪等性的。(刪除不存在的內容並不會導致異常)。
RPC(請求/應答)模式
作為一種程式設計便利,Java客戶端API提供了一個類RpcClient,它使用臨時應答佇列,通過AMQP 0-9-1提供簡單rpc樣式的通訊設施。
該類不會在RPC引數和返回值上強加任何特定的格式。它簡單地提供了一種機制,將訊息傳送到給定的交換器,並使用特定的路由鍵,並等待應答佇列上的響應。
import com.rabbitmq.client.RpcClient;
RpcClient rpc = new RpcClient(channel, exchangeName, routingKey);
- 1
- 2
- 3
這個類如何使用AMQP 0-9-1的實現細節如下:請求訊息以basic.correlation_id 欄位集的方式傳送到該RpcClient例項的惟一值,並將basic.reply_to 設定為應答佇列的名稱。
一旦您建立了這個類的一個例項,您就可以使用它來發送RPC請求,使用以下方法:
byte[] primitiveCall(byte[] message);
String stringCall(String message)
Map mapCall(Map message)
Map mapCall(Object[] keyValuePairs)
- 1
- 2
- 3
- 4
(primitiveCall 方法將原始位元組陣列作為請求和響應體進行傳輸。方法stringCall 是圍繞primitiveCall的一個簡單的便利包裝器,將訊息體作為預設字元編碼中的字串例項。)
mapCall 的變體稍微複雜一點:它們將一個包含普通Java值的 java.util.Map 編碼為AMQP 0-9-1二進位制表表示,並以同樣的方式解碼響應。(注意,這裡有一些限制,針對什麼型別的值可以在這裡使用——請參閱javadoc以獲得詳細資訊)
所有 marshalling/unmarshalling 便利方法使用 primitiveCall 作為傳輸機制,並提供包裝層之上。
TLS的支援
可以使用TLS加密客戶端和代理之間的通訊。還支援客戶端和伺服器身份驗證(也稱為對等驗證)。下面是使用Java客戶端的加密最簡單的方法:
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5671);
factory.useSslProtocol();
- 1
- 2
- 3
- 4
- 5
- 6
注意,在上面的示例中,客戶端預設不強制執行任何伺服器身份驗證(對等證書鏈驗證),“trust all certificates”TrustManager 被使用。這對於本地開發來說很方便,但容易發生中間人攻擊,因此不建議生產。要在RabbitMQ中瞭解更多關於TLS支援的資訊,請參閱“TLS指南”。如果您只想配置Java客戶端(特別是對等驗證和信任管理器部分),請閱讀TLS指南的適當部分。
================================================================
轉自:https://blog.csdn.net/csdnzouqi/article/details/78926603
- 概述
- Connections and Channels
- 連線到一個代理
- 使用 Exchanges and Queues佇列
- 釋出訊息Publishing messages
- 通道和併發性考慮事項執行緒安全
- 通過訂閱接收訊息Push API
- 恢復個人訊息Pull API
- 處理 unroutable無法傳送的 訊息
- 關閉協議
- 高階連線選項
- 從網路故障中自動恢復
- 未處理異常
- 度量和監控
- 谷歌應用引擎上的RabbitMQ Java客戶端
-
相關推薦
RabbitMQ開發庫的完整API文件(翻譯)
背景 譯文連結 我的譯文 概述 Connections and Channels 連線到一個代理 使用 Exchanges and Queues佇列 釋出訊息Publish
《RabbitMQ開發庫的完整API文件》翻譯
背景 譯文連結 我的譯文 概述 Connections and Channels 連線到一個代理 使用 Exchanges and Queues佇列 釋出訊息Publishing messages 通道
CNTK API文件翻譯(19)——藝術風格轉變
本教程展示瞭如何將一張圖片的風格轉換成另外一種。這讓我們可以將一張原始照片渲染成世界名畫的風格。 與建立一個好看的圖片不同,在本教程中你講學習如何在CNTK中載入一個已經訓練好的VGG模型,如何基於輸入變數獲取對應的梯度,以及如何在不使用CNTK的時候使用梯度
CNTK API文件翻譯(9)——使用自編碼器壓縮MNIST資料
在本期教程之前需要先完成第四期教程。 介紹 本教程介紹自編碼器的基礎。自編碼器是一種用於高效編碼的無監督學習人工神經網路,換句話說,自編碼器用於通過機器學習學來的演算法而不是人寫的演算法進行有損資料壓縮。由此而來,使用自編碼器編碼的目的是訓練出一套資料表
CNTK API文件翻譯(17)——多對多神經網路處理文字資料(1)
(本期教程需要翻譯的內容實在是太多了,將其分割成兩期,本期主要講理論和模型建立,下期主要講訓練、測試、優化等) 背景和簡介 本教程將帶你過一遍多對多神經網路基礎,以及如何在CNTK中實現它。具體來說,我們將實現一個多對多模型用來實現字音轉換。我們首先會介
CNTK API文件翻譯(25)——後記
這篇不是翻譯,是我自己寫的後記,CNTK API文件翻系列譯完結。 CNTK是微軟的一個深度學習套件,他的存在主要是可以讓開發人員不用知道里面的各種演算法的細節,就能使用訓練深度神經網路模型。他提供了已經封裝好的元件來給我們使用:你不需要知道隨機梯度下降下降演
CNTK API文件翻譯(18)——多對多神經網路處理文字資料(2)
(本期教程需要翻譯的內容實在是太多了,將其分割成兩期,上期主要講理論和模型建立,本期主要講訓練、測試、優化等) 訓練 在我們開始訓練之前,我們將定義訓練封裝器、貪婪解碼封裝器以及用於訓練模型的準則函式。首先是訓練封裝器。 def create_mo
CNTK API文件翻譯(23)——使用CTC標準訓練聲學模型
本教程假定所有讀者都完成了前10期教程,並且對聲學建模的資料形式有基礎的瞭解。本教程介紹了CNTK種可以用於訓練以CTC(Connectionist Temporal Classification)訓練準則為例的語音識別深度神經網路的模組。 介紹 CNT
CNTK API文件翻譯(20)——GAN處理MSIST資料基礎
完成本期教程需要完成本系列的第四篇教程。 介紹 生成模型在深度學習的半監督或者非監督學習領域引起了廣泛的專注,這些領域傳統上都是使用判別模型的。生成模型的思想是線收集某個研究領域巨量的資料,然後訓練得到一個可以生成這樣的資料集的模型。這是一個需要大量訓練
CNTK API文件翻譯(3)——前饋神經網路
這個教程的目的是為了讓你熟悉使用CNTK元件來進行分類任務,如果你看了本系列的邏輯迴歸部分或者熟悉機器學習,可以跳過介紹部分。 介紹(見上期,本期略) 前饋神經網路模型 本次使用的資料集和上期邏輯迴歸教程使用的資料集一樣,不過這期的模型會結合多個邏
java API文件翻譯
概述 軟體包 類 使用 樹 已過時 索引 幫助 JavaTM 2 Platform Standard Ed. 6 上一個 下一個 框架 無框架 所有類 JavaTM 2 Platform Standard Edition
Django Rest 與 React(Django2.1 + coverage測試 + xadmin + 線上api文件)-翻譯實踐強化版
原文: www.valentinog.com/blog/tutori… 翻譯版實踐教程: Django Rest 與 React(Django2.1 加 一點小測試 加一點譯者的小額外功能) 最終構建了一個有後臺管理 + 提供api服務 + Mysql資料庫 + 線上api文件的Lead系統。
CNTK API文件翻譯(8)——使用Pandas和金融資料進行時序資料基本分析
本期將帶來使用CNTK處理時間序列資料的教程。本教程中會展示怎樣為深度學習演算法準備時間資料、訓練神經網路和評估神經網路。具體來說,我們會探究預測交易性開放式指數基金(Exchange-traded Funds,EFI)的分類是否靠譜,進而通過這種簡單的分類來決
CNTK API文件翻譯(10)——使用LSTM預測時間序列資料
本篇教程展示如何用CNTK構建LSTM來進行時間序列資料的數值預測。 目標 我們使用一個連續函式的模擬資料集(本例使用正弦曲線)。對於函式y=sin(t),我們使用符合這個函式的N個值來預測之後的M個值。 在本教程中我們將使用基於LSTM的模型。L
CNTK API文件翻譯(2)——邏輯迴歸
這篇教程的目標人群是機器學習和CNTK的新手。在這篇教程中,你將訓練一個簡單但是強大的機器學習模型,這個模型被廣泛用於各個行業的應用中。這個訓練可以使用CNTK庫,通過擴充計算資源(CPU、GPU)來推廣到大量的資料集。 介紹 問題: 一個腫瘤醫院提供了
CNTK API文件翻譯(11)——使用LSTM預測時間序列資料(物聯網資料)
在上一期我們開發了一個簡單的LSTM神經網路來預測時序資料的值。在本期我們要把這模型用在真實世界的物聯網資料上。作為示例,我們會根據之前幾天觀測到的資料預測太陽能電池板的日產電量。 太陽能發電量預測是一個重要且艱難的問題。太陽能產電量的預測還與天氣預測密切相關
CNTK API文件翻譯(15)——自然語言理解
本教程展示瞭如何實現一個遞迴神經網路來處理文字,為航空出行資訊服務(ATIS)資料提供分詞標記任務(將不同的詞分到各自的類中,分類由訓練資料集提供)。我們從文字線型降維開始,然後訓練和使用LSTM神經網路。這將被擴充套件到相鄰的單詞並且雙向執行。最後我們將完成一
Unity開始android開發之旅——官方文件翻譯
本譯文只是自己檢視官方文件時的學習之作,大家以批判的方式來看待。 開始android開發之旅 為andorid作業系統開發遊戲使用的方式和IOS開發類似。但是,相較於IOS開發,android開發有一個嚴重的問題就是對於所有的android裝置來說,他們的硬體並不是完全標
CNTK API文件翻譯(12)——CNTK進階
這篇教程展示了CNTK中一些比較高階的特性,目標讀者是完成了之前教程或者是使用過其他機器學習元件的人。如果你是完完全全的新手,請先看我們之前的十多期教程。 歡迎來到CNTK。深度神經網路正在重新定義計算機程式設計。在指令式程式設計、函式式變成和申明式變成之外,
【Android應用開發】RecycleView API 翻譯 (文件翻譯)
Returns whether RecyclerView is currently computing a layout.If this method returns true, it means that RecyclerView is in a lockdown state and any attempt