1. 程式人生 > >RabbitMQ的Java客戶端API指南

RabbitMQ的Java客戶端API指南

目前RabbitMQ官方的Java客戶端版本已升至5.0.0,5.x系列的版本需要JDK 8支援,4.x系列的版本支援JDK 6。 1、綜述 RabbitMQ Java客戶端使用com.rabbitmq.client作為頂層包。有4個關鍵類和介面:Channel、Connection、ConnectionFactory、Consumer。其中,Channel提供了各種AMQP 0-9-1的協議操作;Connection用於建立通道、註冊連線的生命週期事件處理器、關閉連線;ConnectionFactory用於獲取Connection,並提供連線配置功能,如配置連線的虛擬主機、使用者名稱等。 2、連線到訊息代理 [1] 新建ConnectionFactory [2] 設定ConnectionFactory,如主機、埠、使用者名稱、密碼、虛擬主機等 [3] 獲取Connection [4] 建立通道 [5] 利用通道進行各種協議操作... [6] 關閉通道 [7] 關閉連線 說明:在設定ConnectionFactory時,可呼叫方法或採用URI形式進行設定。另外,當連線關閉時,會自動關閉通道,所以在關閉連線前關閉通道不是嚴格必須的,但通常是一種好的實踐。 3、使用交換器和佇列 在使用交換器和佇列前必須進行宣告,宣告是為了確保在使用前這些交換器或佇列存在,如果不存在的話會建立它們。 [m] exchangeDeclare [m] queueDeclare [m] queueBind 上述三種方法都提供了若干不同形引數目的過載版本,這種設計在整個客戶端API中會經常遇到。 4、釋出訊息 [m] basicPublish [m] AMQP.BasicProperties#builder 說明:如果訊息代理髮生“資源不足告警”(記憶體使用過大或磁碟空間不足),那麼Channel#basicPublish會阻塞。 5、通道和併發 根據經驗,不應該線上程間共享Channel例項,應該一個執行緒一個Channel。 多個執行緒在同一Channel上併發釋出訊息時,可能產生不正確的幀交錯,由此產生連線層協議異常並導致連線關閉。執行緒間共用一個Channel也會干擾生產者確認。所以應該避免多個執行緒在同一Channel上併發釋出訊息。但如果共享同一通道的兩個執行緒,一個是生產者、一個是消費者,則是安全的。 訊息伺服器以push方式投遞訊息是併發進行的,並提供每個通道上訊息接收順序保證(和訊息傳送順序相同)。訊息伺服器的併發投遞是通過給每個連線配置一個ExecutorService實現的,也可以通過設定ConnectionFactory來自定義執行器,自定義的執行器會在從該連線工廠獲取的連線間共享。 當消費者手動確認時,要注意是哪個執行緒進行確認。如果不是接收到訊息的執行緒進行確認,則批量確認可能產生重複確認,由此產生通道層協議異常並導致通道關閉。一次只確認一個訊息的則是安全的。 說明:RabbitMQ具有記憶體和磁碟告警功能,設計告警的目的是為了阻塞生產者、讓消費者繼續消費。然而,AMQP允許生成者和消費者在同一個通道上操作、也允許在同一連線的不同通道上操作,這種設計是不完美的。在實際中,對大部分應用而言不會造成大問題,因為告警調節只是表現為延遲。不過,其他設計也是允許的,建議對生產者和消費者使用不同的連線。 6、訂閱訊息(push API) 消費者訂閱訊息時,用消費者標籤(tag)來區分不同的訂閱,所以Consumer介面中的方法都必須傳入消費者標籤字串。消費者標籤可以由客戶端產生,也可以由服務端提供。如果希望使用服務端產生的消費者標籤(單個節點內具有唯一性),使用不帶消費者標籤引數的Channel#basicConsume方法或傳入一個空字串作為消費者標籤引數,從Channel#basicConsume方法的返回值中就能獲取到(服務端產生的)消費者標籤。可以用該標籤來取消消費者。 不同的消費者例項必須使用不同的消費者標籤。如果在一個連線上使用重複的消費者標籤,可能導致自動連線恢復有問題並且在監視消費者時不易區分。 子類化(用起來很便利的類)DefaultConsumer即可實現一個Consumer,將該匿名內部類例項傳入Channel#basicConsume方法即可建立一個訂閱。Consumer中的方法都是回撥方法,即當某些事件發生時就會被自動呼叫,包括:handleDelivery、handleShowdonwSingle、handleConsumeOk、handleCacelOk、handleCancle等。可以通過Channel#basicCancel取消某個消費者。 在Consumer的回撥方法中,可以執行Connection或Channel的阻塞方法,因為對Consumer的回撥是線上程池中執行的,不是在例項化Channel的執行緒中執行的。每一個Channel都有自己的分發執行緒。一般情況下一個通道上一個消費者,這樣的話就不會影響其他消費者。如果一個通道上有多個消費者,當其中某個消費者耗時較長的話,會影響對其他消費者上回調(事件處理任務)的分發。 7、獲取單個訊息(pull API) Channel#basicGet,支援自動確認和手動確認。 8、處理不可路由的訊息 如果釋出訊息時設定了 mandatory標識(flag),但是不能被路由,訊息代理會將其退回給生產者。為了能夠得到退回通知,生產者需要在通道上新增一個ReturnListener物件(Channel#addReturnListener),不然該訊息就會被無聲地丟掉。 9、關閉協議 在AMQP 0-9-1協議中,連線和通道採用相同的方式來處理網路失敗、內部失敗以及明確的請求關閉。它們都具有3種生命週期狀態:open、closing、closed。無論是應用請求關閉、客戶端庫錯誤、網路請求關閉還是網路錯誤等,這些物件最終都會處於closed狀態。 在AMQP連線和通道物件中,具有如下與關閉相關的方法: [m] addShutdownListener/removeShutdownListener [m] getCloseReason [m] isOpen [m] close ShutdownSignalException物件提供了分析關閉原因的方法。該物件可以通過getCloseReason獲取,或者通過ShutdownListener#shutdownCompleted的引數進行訪問。 在生產環境程式碼中應該忽略isOpen方法,因為容易發生競態條件(race conditions),使用者程式碼無法實現原子性。 10、高階連線選項 在ConnectionFactory和Connection類中提供了一些方法用於自定義連線行為,滿足高階定製需求。 * 定製消費者執行緒池 * 使用主機列表 * 支援服務發現 * 設定心跳檢測超時 * Java NIO支援 11、網路異常時自動恢復 當RabbitMQ節點和客戶端之間的網路連線發生異常時,客戶端可以自動恢復連線和拓撲(佇列、交換器、繫結和消費者)。自動恢復過程如下: [1] 重連 [2] 恢復連線上的監聽器 [3] 重新開啟通道 [4] 恢復通道上的監聽器 [5] 恢復通道的basic.qos設定,釋出者確認和事務設定 拓撲恢復(對每個通道進行)過程如下: [1] 重新宣告交換器 [2] 重新宣告佇列 [3] 恢復所有繫結 [4] 恢復所有消費者 如果自動恢復失敗(如RabbitMQ節點仍未正常工作),客戶端就會每隔一段時間重試,該時間間隔固定,預設為5秒。如果建立連線時使用了主機列表,那麼會將列表打亂並挨個重試。如果打開了自動連線恢復功能,可以在連線和通道上註冊恢復監聽器來處理恢復事件。 當連線斷開時,使用Channel#basicPublish釋出的訊息會丟失。在連線恢復後,客戶端並不會將它們重新入隊進行再次釋出。要想確保釋出的訊息到達RabbitMQ訊息代理,需要使用釋出者確認並對連線失敗做相應處理。 當連線斷開時,需要過一段時間才能偵測到。所以存在一個時間視窗,Java客戶端和應用程式都沒有意識到連線斷開。在這個時間視窗中,釋出的訊息仍然會被序列化並寫入TCP套接字。它們要想投遞成只能靠生成者確認(屬於協議擴充套件)來保證,因為在AMQP 0-9-1中將訊息釋出完全設計為非同步方式。 當開啟了自動恢復功能的連線檢測到套接字或I/O操作錯誤,就會在一段時間後(預設5秒)開始恢復。之所以這樣設計,是考慮到雖然很多網路失敗是瞬態的且一般只存在很短的時間,但並不會馬上消失。自動連線恢復會以固定的時間間隔進行重試直到建立新連線。 當連線處於正在恢復狀態時,在該連線的通道上進行訊息釋出會丟擲異常。此時客戶端不會對這些訊息進行任何快取,應該由應用開發者來記錄這些訊息並在恢復成功後重新發布。如果生產者不能承受訊息丟失的風險,就應該使用生產者確認來確保訊息釋出成功。 如果因為通道級異常造成通道關閉,不會觸發自動連線恢復。這樣的異常一般說明是應用級有問題,Java客戶端庫無法做出一個知情的決定。 當使用消費者(手動)確認時,在訊息傳送到消費者後、確認完成前,從消費者到RabbitMQ節點的網路連線可能會斷開。在自動連線恢復後,RabbitMQ會重置所有通道上的投遞標籤,此時使用老的投遞標籤進行basic.ack、basic.nack和basic.reject會產生通道異常。為了避免異常發生,RabbitMQ會記錄和更新投遞標籤確保在恢復前後它們是單調增加的。Channel#basicAck、Channel#basicNack和Chanel#basicReject會將調整後的投遞標籤轉換為RabbitMQ使用的投遞標籤。使用老投遞標籤的確認不會被髮送。使用消費者確認和自動恢復的消費者必須能夠處理二次投遞。 12、未處理的異常 關於連線、通道、恢復和消費者生命週期的未處理異常都交由異常處理器完成,使用ConnectionFactory#setExceptionHandler設定。