1. 程式人生 > >RabbitMQ客戶端開發嚮導

RabbitMQ客戶端開發嚮導

Ⅰ、高層介面

  • ConnectionFactory
  • Connection
  • Channel
  • Consumor

Ⅱ、操作流程及API

【一】建立連線工廠ConnectionFactory

ConnectionFactory factory = new ConnectionFactory();

​ 我們可以為actory設定各種引數來進行連線初始化

factory.setUsername("guest");//設定伺服器登入賬號
factory.setPassword("guest");//設定伺服器登入密碼
factory.setHost("127.0.0.1");//設定伺服器IP
factory.setPort("15427");//設定伺服器埠
factory.setVirtualHost("/");//設定虛擬主機

【二】建立連線Connection

Connection connection = factory.newConnection();

【三】建立通道Channel

Channel channel = connection.createChannel();

知識點:

​ 連線和通道的關係:連線是客戶端與伺服器開啟的TCP連線,由於TCP連線的開啟和銷燬都需要消耗大量的效能,所以我們在連線的基礎上使用了通道的概念,對連線進行邏輯再分,每個連線都可以建立多個通道,這些通道共用一個連線。

​ 通道就是RabbitMQ中最實用的元件了,幾乎所有針對交換器、佇列、生產者、消費者的操作全部都在這裡面執行。

【四】建立交換器Exchange

Exchange.DeclareOK exchangeDeclare(
    String exchange, // 宣告交換器名稱
    String type, // 宣告交換器型別
    boolean durable, // 宣告是否持久化
    boolean autoDelete, // 宣告是否自動刪除
    boolean internal, // 宣告是否內建
    Map<String, Object> arguments // 建立交換器的其他引數
    ) throws IOException
Exchange.DeclareOK exchangeDeclare(
    String exchange, // 宣告交換器名稱
    BuiltinExchangeType type, // 宣告交換器型別
    boolean durable, // 宣告是否持久化
    boolean autoDelete, // 宣告是否自動刪除
    boolean internal, // 宣告是否內建
    Map<String, Object> arguments // 建立交換器的其他引數
    ) throws IOException

​ 建立交換器的方法有上面兩種,這兩種的區別僅在於宣告交換器的型別type引數的型別不同,第一種常用。其實上面的方法每個都有很多個過載的方法,採用一些預設的引數。這裡只列舉引數最全的方法,來介紹其各個引數的意義。

知識點:交換器型別

RabbitMQ的交換器擁有四種類型,分別為fanoutdirecttopicheadersfanout型別的交換器會將訊息路由到所有與其繫結的佇列中,類似於廣播;direct型別是預設的交換器型別,它只會將訊息路由到指定的佇列中,這個佇列的繫結key必須與訊息的路由key完全一致;topic型別的交換器是最常使用的交換器型別,是一種模糊匹配的交換器,它會將訊息路由到所有繫結key與訊息路由key可匹配的佇列上;至於headers型別的交換器並不常用,因為其效能較差,並不實用。

​ 這裡BuiltinExchangeType型別是一個列舉,它裡面定義了這四種 型別的列舉值:FANOUTDIRECTTOPICHEADERS

知識點:交換器持久化

​ 開啟交換器的持久化,那麼交換器在建立好之後會持久化到磁碟,一般對於生產環境中長期使用的交換器,最好開啟持久化功能,用以提升RabbitMQ的可用性(高可用要點之一),伺服器異常宕機的情況下可以硬體恢復。

知識點:交換器自動刪除

​ 交換器在不再使用的情況下是可以自動刪除的,只要在建立交換器的時候設定autoDelete屬性為true即可開啟自動刪除功能,預設為false

​ 自動刪除功能必須要在交換器曾經繫結過佇列或者交換器的情況下,處於不再使用的時候才會自動刪除,如果是剛剛建立的尚未繫結佇列或者交換器的交換器或者早已建立只是未進行佇列或者交換器繫結的交換器是不會自動刪除的。

​ 不再使用的交換器指的即使那些曾經繫結過佇列或者交換器,現在已經沒有任何繫結佇列或者交換器的情況下的交換器。

知識點:內建交換器

​ 內建交換器是一種特殊的交換器,這種交換器不能直接接收生產者傳送的訊息,只能作為類似於佇列的方式繫結到另一個交換器,來接收這個交換器中路由的訊息,內建交換器同樣可以繫結佇列和路由訊息,只是其接收訊息的來源與普通交換器不同。

​ 交換器的建立可以在程式碼中實現,也可以不再程式碼中實現。推薦在程式碼中實現,因為當要建立的交換器已存在於RabbitMQ伺服器中時,是不會再次建立的,而是直接返回建立成功。

【五】繫結交換器exchangeBind

Exchange.bindOK exchangeBind(
    String destination, // 指定目標交換器
    String source, // 指定源交換器
    String routingKey, // 指定繫結Key
    Map<String, Object> arguments // 其他一些結構化引數
    ) throws IOException

知識點:交換器繫結

​ 交換器一般用來被佇列繫結,但其實它也可以被另一個交換器繫結,繫結其實就是將二者關聯起來,繫結兩個交換器,就是將兩個交換器關聯起來,這裡面有一個主動方,一個被動方,主動方是要執行繫結的交換器(目標交換器),被動方是被繫結的交換器(源交換器),繫結的過程其實就是將目標交換器的繫結key送給源交換器,這時其實可以將目標交換器看成是一個佇列,將自己繫結到源交換器上,依靠的也是繫結key,不過這裡的繫結key是沒明確的路由Key,而非topic類似的模糊key。源交換器會將目標交換器當做一個佇列進行看待,將接收到的路由key與目標交換器繫結key完全匹配訊息路由到這個目標交換器中。

​ 這個方法同樣有過載的方法,來預設化一些引數。

【六】建立佇列Queue

Queue.DeclareOK queueDeclare(
    String queue, // 宣告佇列名稱
    boolean durable, // 宣告是否持久化
    boolean exclusive, // 宣告是否排他
    boolean autoDelete, // 宣告是否自動刪除
    Map<String, Object> arguments // 設定佇列的其他一些引數
    ) throws IOException

知識點:佇列持久化

​ 建立佇列的時候也可以設定是否支援持久化到磁碟,生產環境中我們一般都會將其設定為持久化,這也保證了伺服器宕機的情況下重啟後,佇列可以恢復,以保證資料安全(高可用要點之二)

知識點:排他佇列

​ 建立佇列的時候有一個排他引數exclusive,排他佇列只對首次建立該佇列的通道所在的連線可見,並且該連線內的所有通道都可以訪問這個排他佇列,在這個連線斷開之後,該佇列自動刪除,由此可見這個佇列可以說是綁到連線上的,對同一伺服器的其他連線不可見。

​ 這種排他優先於持久化,即使設定了佇列持久化,在連線斷開後,該佇列也會自動刪除。

​ 非排他佇列不依附於連線而存在,同一伺服器上的多個連線都可以訪問這個佇列。

知識點:佇列自動刪除

​ 佇列的自動刪除類似於交換器的自動刪除,都必須是曾經使用過的佇列才能執行自動刪除,如果是建立之後根本就沒有用過的佇列是不會觸發自動刪除的。

​ 這個方法同樣有過載的方法,來預設化一些引數。

【七】繫結佇列queueBind

Queue.BindOK queueBind(
    String queue, // 指定佇列名稱
    String exchange, // 指定交換器名稱
    String routingKey, // 宣告繫結key
    Map<String,Object> arguments // 定義繫結的一些引數
    ) throws IOException

​ 這裡routingKey的值需要由指定的交換器的型別累決定使用什麼形式的key,如果是fanout型別的交換器,會忽略routingKey的值,如果是direct型別的交換器,使用明確的繫結Key,如果是topic型別的交換器,則使用帶有匹配符*#的模糊key。

【八】釋出訊息basicPublish

void basicPublish(
    String exchange, // 指定目的交換器
    String routingKey, // 宣告訊息的路由key
    boolean mandatory, // 是否為無法路由的訊息進行返回處理
    boolean immediate, // 是否對路由到無消費者佇列的訊息進行返回處理
    BasicProperties props, // 訊息的一些基本屬性設定
    byte[] body // 訊息體
    ) throws IOException

知識點:mandatory

​ 訊息釋出的時候設定訊息的mandatory屬性用於設定訊息在傳送到交換器之後無法路由到佇列的情況對訊息的處理方式,設定為true表示將訊息返回到生產者,否則直接丟棄訊息。

​ 上述無法路由的情況可以是在無法找到匹配訊息路由key的佇列,導致訊息無法路由到佇列中

​ 當mandatory=true時,出現無法路由訊息被返回,那麼返回的訊息又回到生產者,怎麼接收呢,這就要靠返回監聽器ReturnListener

知識點:ReturnListener

​ 我們在設定訊息的mandatory=true的時候,就需要對返回的訊息進行處理了,我們可以在通道中新增返回監聽器來監聽返回的訊息,一旦監聽到這些訊息,我們就著手對其進行再處理,一般我們會進行重發。

channel.addReturnListener(new ReturnListener(){
    @Override
    publish void handleReturn(
        int repayCode, // 迴應碼
        String repayText, // 迴應內容
        String exchange, // 來源交換器
        String routingKey, // 訊息的路由key
        AMQP.BasicProperties basicProperties, // 訊息的其他屬性
        byte[] body // 訊息體
    ) throws IOException {
        // do some thing to handle the return message
    }
});

【九】消費訊息basicConsumer、basicGet

1、推送訊息basicConsume

String basicConsume(
    String queue, // 指定佇列名稱
    boolean autoAck, // 是否自動迴應
    String consumerTag, // 宣告消費者標籤,區分不同的消費者
    boolean noLocal, // 是否不能將訊息推送給同一個連線內的消費者
    boolean exclusive, // 是否排他
    Map<String, Object> arguments, // 設定消費者的其他引數 
    Consumer callback // 設定消費者回調函式,處理訊息
    ) throws IOException

知識點:自動迴應

​ 推送訊息時設定訊息自動迴應,那麼訊息在推送給消費者後就會自動迴應伺服器,伺服器收到回回應就會刪除這條訊息,這樣無法保證訊息能被正確處理,因為消費者雖然收到了訊息,但它可能無法處理、或者拒絕等,這時伺服器中訊息卻已被刪除,導致訊息丟失。(高可用要點之三)

​ 一般情況下我們將其設定為false,然後在回撥函式中訊息處理完畢之後手動進行迴應。

void basicAck(
    long deliveryTag, // 指定推送標識編號
    boolean multiple // 是否批量回應
    ) throws IOException

注意:推送標識

deliveryTag表示的是推送編號,是一個單調遞增正整數編號,它用來標識channel中一次訊息推送,與訊息綁在一起。

​ 當一個消費者向伺服器註冊basicConsume之後,伺服器就會使用basic.delivery給消費者推送訊息,deliveryTag就用來標識這樣一個推送。但要注意,它只在當前通道channel內有效。

​ 消費者受到訊息的時候同時會收到這個推送標識,當要回應、拒絕訊息的時候就需要帶著這個標識。

注意:批量回應

multiple引數為boolean值,用來表示是否進行批量回應,當值為true時表示進行批量回應,它會對推送編號小於給定編號的所有訊息進行迴應。false表示只回應指定編號的訊息。

知識點:推送範圍

noLocal這個引數表示是否可以將訊息推送給與訊息釋出者同一連線內的消費者,如果noLocal=false表示可以推送,noLocal=true則表示不能推送,那麼就只能推送到其他連線中的消費者。

知識點:排他

知識點:回撥函式

​ 回撥函式主要用於定義針對訊息的處理邏輯,一般採用如下方式定義:

Consumer consumer = new DefaultConsumer(channel){
    @Override
    public void handleDelivery(
        String consumerTag, // 消費者標籤,可用於消費者驗證傳送目標的正確性
        Envelope envelope, // AMQP操作引數,可以獲取其中引數進行訊息處理
        Amqp.BasicProperties properties, // 訊息的基本屬性
        Byte[] body // 訊息體,用於存放推送的訊息
    ){
        // do something to handle deleveried message
    }
}

知識點:AMQP操作引數

EnvelopeRabbitMQ定義的用於封裝AMQP操作引數的類,裡面主要封裝了四個引數:

private final long _deliveryTag;// 推送編號
private final boolean _redeliver; // 是否重發標籤
private final String _exchange; // 當前操作對應的交換器
private final String _routingKey; // 關聯的路由Key

​ 其中_redeliber=true表示這是一個失敗的basicAck之後的訊息的重新推送。這裡面的這四個引數我們都可以在消費者客戶端處理訊息的時候使用。

​ 其實basicConsume是消費者訂閱方法,目的在於訂閱某個佇列,是訊息推送的前提,真正的訊息推送並沒有在Channel類中實現,因為推送操作是由RabbitMQ伺服器自動發起的,不需要生產者或者消費者手動觸發,所以也就沒用提供介面。

​ 真正的訊息推送是由RabbitMQ伺服器的Basic.delivery方法實現的。

2、拉取訊息basicGet

GetResponse basicGet(
    String queue, // 指定佇列名稱
    boolean autoAck // 是否自動迴應
    ) throws IOException;

​ 拉取訊息就是指消費者主動從伺服器獲取訊息,每次只能獲取一條訊息。推送訊息是被動的獲取。這裡的autoAck和之前推送的設定一樣,一般設定為false,表示不主動迴應,採用手動迴應(高可用要點之三)

【十】拒絕訊息basicReject、basicNack

1、拒絕一個訊息basicReject

void basicReject(
    long deliveryTag, // 訊息推送編號
    boolean requeue // 是否重新入隊
    ) throws IOException;

2、拒絕多個訊息basicNack

void basicNack(
    long deliveryTag, // 訊息推送編號
    boolean multiple, // 是否批量拒絕
    boolean requeue // 是否重新入隊
    )throws IOException;

知識點:重新入隊

​ 當一個訊息推送到某一個消費者,這個消費者無法處理時它進行了拒絕操作,如果指定requeue值為true,表示被拒絕的訊息還可以重新發送到佇列,可被繼續推送到其他消費者,如果設定為false,那麼這條訊息會被立刻從佇列刪除。

​ 如果將這兩個方法的requeue引數設定為false,那麼可以啟用死信佇列功能,因為這樣的話,返回的訊息會變成死信,如果伺服器中設定有死信交換器DLX,並且已關聯到該佇列,那麼這個訊息就會被髮送到死信交換器,從而被路由到繫結的死信佇列中得以保留,我們可以通過排查這些訊息來進行伺服器優化。

知識點:批量拒絕

​ 訊息的單個拒絕與批量拒絕使用的不是同一個方法,批量拒絕的方法basicNack中有個決定是否批量拒絕的引數mutiple,如果設定為false,表示不執行批量拒絕,那麼它的效果等同於basicReject方法,如果設定為true,表示拒絕小於指定推送編號的所有未被當前消費者消費的訊息。

【十一】取消消費者basicCancel

void basicCancel(String consumerTag // 指定要取消的消費者標籤
    ) throws IOException;

【十二】關閉連線

channel.close();// 關閉通道
connection.close();// 關閉TCP連線