1. 程式人生 > >RabbitMQ 客戶端API使用

RabbitMQ 客戶端API使用

目錄

RabbitMQ Java客戶端使用com.rabbitmq.client作為頂級包名,關鍵的Class和Interface有Channel、Connection、ConnectionFactory、Consumer等。AMQP協議層面的操作通過Channel介面實現。Connection是用來開啟Channel(通道)的,可以註冊事件處理器,也可以在應用結束時關閉連線。與RabbitMQ相關的開發工作,基本上也是圍繞Connection和Channel這兩個類展開的。

          這裡按照一個完整的運轉流程進行講解,詳細內容有:連線、交換器/佇列的建立與繫結、傳送訊息、消費訊息、消費訊息的確認和關閉連線。

一、連線RabbitMQ伺服器

   連線ActiveMQ伺服器的方式一:

        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(IP_ADDRESS);
        factory.setPort(PORT);
        factory.setVirtualHost(virtualHost); //虛擬訊息伺服器host
        factory.setUsername("root");
        factory.setPassword("root");
        Connection connection = factory.newConnection(); //建立連線

使用uri來連線ActiveMQ方式二:

        ConnectionFactory factory = new ConnectionFactory();
        factory.setUri("amqp://username:[email protected]:portNumber/virtualHost");
        Connection connection = factory.newConnection(); //建立連線
        Channel channel = connection.createChannel();//建立通道

注意點:

      Connection可以用來建立多個Channel例項,但是Channel例項不能線上程問共享,應用程式應該為每一個執行緒開闢一個Channel。某些情況下Channel的操作可以併發執行,但是在其他情況下會導致在網路上出現錯誤的通訊幀交錯,同時也會影響傳送方確認(publisher confirm)機制的執行(後面介紹),所以多執行緒問共享Channel例項是非執行緒安全的。

     Channel或者Connection中有個isOpen方法可以用來檢測其是否己處於開啟狀態(),但是我們一般不使用這個方法,一般用CreateXXX,newXX方法後,我們預設的認為Connection,Channel已經成功的開啟了,如果在使用Channel的時候其己經處於關閉狀態,那麼程式會丟擲一個com.rabbitmq.client.ShutdownSignalException,我們只需捕獲這個異常即可。當然同時也要試著捕獲IOExceptio口或者SocketException,以防Connection意外關閉。

二、使用交換器和佇列

        交換器和佇列是AMQP中high level層面的構建模組,應用程式需確保在使用它們的時候就已經存在了,在使用之前需要先宣告(declare)它們,然後再bind起來,這其中涉及到了bindingKey,程式碼:   

        Connection connection = factory.newConnection(); //建立連線
        Channel channel = connection.createChannel();//建立通道
        //建立一個type="direct" 、持久化,非自動刪除的交換器
        channel.exchangeDeclare(EXCHANGE_NAME,"direct",true,false,null);
        //建立一個持久化,非排他的,非自動刪除的佇列
        channel.queueDeclare(QUEUE_NAME,true,false,false,null);
        //將交換器與佇列通過路由繫結
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY);

1.宣告查詢刪除交換器

       1.1 宣告交換器方法如下:   

 DeclareOk exchangeDeclare(String exchange, String type, boolean durable, 
    boolean autodelete,boolean internal, Map<String, Object> arguments) throws IOException;

引數詳細說明如下所述:

  • exchange:交換器的名稱。
  • type:交換器的型別,常見的如fanout、direct、topic,header 詳情看上一章
  • durable:設定是否持久化。durable設定為true表示持久化,反之是非持久化。持久化可以將交換器存檔,在伺服器重啟的時候不會丟失相關資訊。
  • autoDelete:設定是否自動刪除。autoDelete設定為true則表示自動刪除。自動刪除的前提是至少有一個佇列或者交換器與這個交換器繫結,之後所有與這個交換器繫結的佇列或者交換器都與此解綁。注意不能錯誤地把這個引數理解為:"當與此交換器連線的客戶端都斷開時,RabbitMQ會自動刪除本交換器"。
  • internal:設定是否是內建的。如果設定為true,則表示是內建的交換器,客戶端程式無法直接傳送訊息到這個交換器中,只能通過交換器路由到交換器這種方式。
  • argument:其他一些結構化引數,比如alternate-exchange(有alternate­exchange的詳情後面介紹)。

相關的過載方法:

其中沒有帶的引數,布林值預設是false,Map引數為null

    DeclareOk exchangeDeclare(String exchange, String type) throws IOException;
    DeclareOk exchangeDeclare(tring exchange, String type, boolean durable) throws IOException;
    DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments) throws IOException;
    DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException;

     1.2 與上面相對應的把第二引數變為BuiltinExchangeType type,這是封裝的列舉type,也可以通過這個來生命交換器型別,原始碼如下:

public enum BuiltinExchangeType {
    DIRECT("direct"),
    FANOUT("fanout"),
    TOPIC("topic"),
    HEADERS("headers");
    private final String type;
    private BuiltinExchangeType(String type) {
        this.type = type;
    }
    public String getType() {
        return this.type;
    }
}

       1.3 還有宣告為noWait的方法,這個nowait引數指的是AMQP中Exchange.Declare命令的引數,意思是不需要伺服器返回,注意這個方法的返回值是void,而普通的exchangeDeclare方法的返回值是Exchange.DeclareOk,意思是在客戶端聲明瞭一個交換器之後,需要等待伺服器的返回(伺服器會返回Exchange.Declare-Ok這個AMQP命令)。針對"exchangeDeclareNoWait不需要伺服器任何返回值"這一點,考慮這樣一種情況,在宣告完一個交換器之後(實際伺服器還並未完成交換器的建立),那麼此時客戶端緊接著使用這個交換器,必然會發生異常。如果沒有特殊的緣由和應用場景,並不建議使用這個方法。

public void exchangeDeclareNoWait(String exchange, String type, boolean durable, 
    boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException 
      

       1.4 檢測交換器是否存在的方法,這個方法在實際應用過程中還是非常有用的,它主要用來檢測相應的交換器是否存在。如果存在則正常返回:如果不存在則丟擲異常:404  channel exception,同時Channel也會被關閉。 

 public DeclareOk exchangeDeclarePassive(String name)

    1.5 刪除交換器的方法,方法中exchange表示交換器的名稱,而ifUnused用來設定是否在交換器沒有被使用的情況下刪除。如果isUnused設定為true,則只有在此交換器沒有被使用的情況下才會被刪除:如果設定false,則無論如何這個交換器都要被刪除,預設是false的。    

    DeleteOk exchangeDelete(String exchange, boolean ifUnused) throws IOException;
    void exchangeDeleteNoWait(String exchange, boolean ifUnused) throws IOException;
    DeleteOk exchangeDelete(String exchange) throws IOException; //預設為false

2. 宣告查詢刪除佇列

         2.1 這裡只有兩個方法:

 Queue.DeclareOk queueDeclare() throws IOException;

 Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, 
    boolean autoDelete, Map<String, Object> arguments) throws IOException;  

       不帶任何引數的queueDeclare方法預設建立一個由RabbitMQ命名的(類似這種amq.gen-LhQzlgv3DOv8PIDabOXA名稱,這種佇列也稱之為匿名佇列〉、排他的(true)、自動刪除的(true)、非持久化(false)的佇列,如下所示獲取佇列的名字:

String queueName = channel.queueDeclare().getQueue( );

方法的引數詳細說明如下:

  • queue:佇列的名稱
  • durable:設定是否持久化。為true則設定佇列為持久化。持久化的佇列會存檔,在伺服器重啟的時候可以保證不丟失相關資訊
  • exclusive:設定是否排他。為t r u e則設定佇列為排他的。如果一個佇列被宣告為排他佇列,該佇列僅對首次宣告它的連線可見,並在連線斷開時自動刪除。這裡需要注意三點:排他佇列是基於連線(Connection )可見的,同一個連線的不同通道( Channel )是可以同時訪問同一連線建立的排他佇列;"首次"是指如果一個連線己經聲明瞭一個排他佇列,其他連線是不允許建立同名的排他佇列的,這個與普通佇列不同:即使該佇列是持久化的,一旦連線關閉或者客戶端退出,該排他佇列都會被自動刪除,這種佇列適用於一個客戶端同時傳送和讀取訊息的應用場景
  • autoDelete:設定是否自動刪除。為t r u e則設定佇列為自動刪除。自動刪除的前提是:至少有一個消費者連線到這個佇列,之後所有與這個佇列連線的消費者都斷開時,才會自動刪除。不能把這個引數錯誤地理解為:"當連線到此佇列的所有客戶端斷開時,這個佇列自動刪除",因為生產者客戶端建立這個佇列,或者沒有消費者客戶端與這個佇列連線時,都不會自動刪除這個佇列
  • argurnents:設定佇列的其他一些引數,如x-message-ttl、x-expires、x-max-length、x-max-length-bytes、x-dead-letter-exchange、x-dead-­letter-routing-key ,x-max-priority等

注意點:

       生產者和消費者都能夠使用queueDeclare來宣告一個佇列,但是如果消費者在同一個通道上訂閱了另一個佇列,就無法再宣告隊列了。必須先取消訂閱,然後將通道置為"傳輸"模式,才能宣告佇列

  2.2  這裡也有一個nowait宣告queue的方法,返回值void和exchangeDeclareNoWait類似, 表示不需要服務端的任何返回。同樣也需要注意,在呼叫完queueDeclareNoWait方法之後,緊接著使用宣告的佇列時有可能會發生異常情況。

public void queueDeclareNoWait(String queue, boolean durable, boolean exclusive, 
       boolean autoDelete, Map<String, Object> arguments)

2.3  同樣這裡還有個queueDeclarePassive的方法,也比較常用。這個方法用來檢測相應的佇列是否存在。如果存在則正常返回,如果不存在則丟擲異常:404  channel  exception,同時Channel也會被關閉。方法定義如下:

public queueDeclarePassive(String queue)

2.4 刪除佇列的方法

  • 其中queue表示佇列的名稱,
  • ifUnused以參考交換器(ifUnused用來設定是否在交換器沒有被使用的情況下刪除。如果isUnused設定為true,則只有在此交換器沒有被使用的情況下才會被刪除:如果設定false,則無論如何這個交換器都要被刪除,預設是false的)
  •  ifEmpty設定為t r u e表示在佇列為空(佇列裡面沒有任何訊息堆積)的情況下才能夠刪除。
    public com.rabbitmq.client.AMQP.Queue.DeleteOk queueDelete(String queue) throws IOException {
        return this.queueDelete(queue, false, false);
    }

    public com.rabbitmq.client.AMQP.Queue.DeleteOk queueDelete(String queue, boolean ifUnused, boolean ifEmpty) throws IOException {
        this.deleteRecordedQueue(queue);
        return this.delegate.queueDelete(queue, ifUnused, ifEmpty);
    }

    public void queueDeleteNoWait(String queue, boolean ifUnused, boolean ifEmpty) throws IOException {
        this.deleteRecordedQueue(queue);
        this.delegate.queueDeleteNoWait(queue, ifUnused, ifEmpty);
    }

3. 佇列繫結交換器和解綁 queueBind

     3.1 繫結佇列和交換器    

       方法中涉及的引數詳解:

  • queue:佇列名稱
  • exchange:交換器的名稱
  • routingKey:用來繫結佇列和交換器的路由鍵
  • argument:定義繫結的一些引數
public Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments)

public AMQP.Queue.BindOk queueBind(String queue, String exchange, String routingKey) 

public void queueBindNoWait(String queue, String exchange, String routingKey, Map<String, Object> arguments)

       3.2 將繫結的佇列和交換器解綁

 public Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey, Map<String, Object> arguments)

 public Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey) 

4.交換器和交換器繫結exchangeBind

         這裡的destination表示和佇列相連的exchange交換器的名字,source表示和訊息釋出者相連的交換器名字。

public BindOk exchangeBind(String destination, String source, String routingKey, Map<String, Object> arguments) 

public BindOk exchangeBind(String destination, String source, String routingKey)

void  exchangeBindNoWait(String destination, String source, String routingKey, Map<String, Object> arguments)  throws  IOException

      例子:     

生產者傳送訊息至交換器source中,交換器source根據路由鍵找到與其匹配的另一個交換器destination,井把訊息轉發到destination中,進而儲存在destination繫結的佇列queue中,如下圖。

channel.exchangeDeclare("source","direct",false,true,null) ;
channel.exchangeDeclare("destination","fanout",false,true ,null );
channel.exchangeBind("destination","source","exKey");
channel.queueDeclare("queue", false, false, true, null );
channel.queueBind("queue","destination")
channel.basicPublish("source","exKey", null ,"exToExDemo".getBytes( )) ;

5.傳送訊息

方法用basicPublish如下:

 void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body);

void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body);

void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body);

上面的引數解析如下:

  • exchange:交換器的名稱,指明訊息需要傳送到哪個交換器中。如果設定為空字串,則訊息會被髮送到RabbitMQ預設的交換器中。
  • routingKey:路由鍵,交換器根據路由鍵將訊息儲存到相應的佇列之中。
  • props:訊息的基本屬性集,其包含14個屬性成員,分別有contentType、contentEncoding、headers(Map<String,Object>)、deliveryMode、priority、correlationld、replyTo、expiration、messageld、timestamp、type、userld、appld、clusterld。其中常用的幾種都在上面的示例中進行了演示。
  • byte[] body:訊息體(payload),真正需要傳送的訊息。
  • mandatory和immediate的詳細內容後面講解

常用的訊息傳送例子:

 //傳送持久化資訊
String message = "hello world";
channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());

這裡的PERSISTENT_TEXT_PLAIN如下所示:
 public static final BasicProperties PERSISTENT_TEXT_PLAIN = new BasicProperties("text/plain",
(String)null, (Map)null, 2, 0, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null);

5.1 上面這行程式碼傳送了一條訊息,這條訊息的投遞模式(deliveryMode)設定為2,即訊息會被持久化(即存入磁碟)在伺服器中。同時這條訊息的優先順序(priority)設定為1,content-type為"text/plain ",可以自己設訊息的屬性:   

 channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY,
                new AMQP.BasicProperties().builder()
                        .contentType("text/plain")
                        .deliveryMode(2)
                        .priority(1)
                        .userId("hidden")
                        .build(),
                message.getBytes());

5.2 也可以傳送帶有headers的訊息:

  Map<String,Object> headers = new HashMap<>();
        headers.put("location","here");
        headers.put("time","today");
        channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY,
                new AMQP.BasicProperties().builder().headers(headers).build(),message.getBytes());

5.3 帶有過期(expiration)資訊的訊息:

 channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY,
                new AMQP.BasicProperties().builder().expiration("60000").build(),message.getBytes());

6.消費訊息

     RabbitMQ的消費模式分兩種:推(Push)模式和拉(Pull)模式。推模式採用Basic.Consume進行消費,而拉模式則是呼叫Basic.Get進行消費。

       6.1 推模式push

 可以通過持續訂閱的方式來消費,使用到的類Consumer,DefaultConsumer,接受訊息一般通過實現Consumer介面或者繼承DefaultConsumer類,當呼叫Consumer相關的API方法時,不同的訂閱需要制定不同的消費者標籤(consumerTag)來區分彼此,在同一個channel中的消費者也需要通過唯一的消費標籤用來做區分,關鍵消費程式碼如下:

        boolean autoAck = false;
        channel.basicQos(64);//設定客戶端最多接受違背ack的資訊的個數
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("recieve message :"+new String(body));
                try {
                    TimeUnit.SECONDS.sleep(1);  //休眠一秒在返回確定資訊
                }catch (InterruptedException e){
                    e.printStackTrace();
                }
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
       //這裡的自動應答被關閉,通過手動確認收到訊息後在做應答,這個可以有效的防止訊息不必要的丟失。
        channel.basicConsume(QUEUE_NAME,autoAck,"myConsumerTag",consumer);  

相關的消費方法:

    String basicConsume(String queue, Consumer callback) throws IOException;

    String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;

    String basicConsume(String queue, boolean autoAck, String consumerTag, Consumer callback) throws IOException;

    String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments, Consumer callback) throws IOException;

    String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, Consumer callback) throws IOException;

對應的引數說明如下所述:

  • queue:佇列的名稱;
  • autoAck:設定是否自動確認。建議設成false,即不自動確認,預設不設定也是false;
  • consumerTag:消者標籤,用來區分多個消費者,不設定預設“”
  • noLocal:設定為true則表示不能將同一個Connection中生產者傳送的訊息傳送給這個Connection中的消費者,預設是false
  • exclusive:設定是否排他,預設是false
  • arguments:設定消費者的其他引數,預設是null
  • callback:設定消費者的回撥函式。用來處理RabbitMQ推送過來的訊息,比如DefaultConsumer,使用時需要客戶端重寫(override )其中的方法。

對於消費者需要消費訊息一般都是重寫handleDelivery方法

public interface Consumer {
    //這個方法會在其他方法呼叫前,返回消費者標籤
    void handleConsumeOk(String consumerTag);
    //取消訂閱的時候呼叫
    void handleCancelOk(String consumerTag);

    void handleCancel(String consumerTag) throws IOException;
    //重寫這個方法會在channel和Connection關閉時候呼叫
    void handleShutdownSignal(String consumerTag, ShutdownSignalException sig);

    void handleRecoverOk(String consumerTag);

    void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException;
}

      我們可以通過channel.basicCancel方法來顯式地取消一個消費者的訂閱,channel.basicCancel(consumerTag);這行程式碼執行會首先觸發handleConsumerOk方法,然後呼叫handleDelivery方法,最後才出發handleCancelOk方法。

       和生產者一樣,消費者客戶端同樣需要考慮執行緒安全的問題。消費者客戶端的這些callback會被分配到與Channel不同的執行緒池上,這意味著消費者客戶端可以安全地呼叫這些阻塞方法,比如channel.queueDeclare、channel.basicCancel等。

      每個Channel都擁有自己獨立的執行緒。最常用的做法是一個Channel對應一個消費者,也就是意味著消費者彼此之間沒有任何關聯。當然也可以在一個Channel中維持多個消費者,但是要注意一個問題,如果Channel中的一個消費者一直在執行,那麼其他消費者的callback會被"耽擱"。

       6.2 拉模式

     拉模式的消費方式,通過channel.basicGet方法可以單條地獲取訊息,其返回值是GetResponse,Channel類的basicGet方法沒有其他過載方法,只有:

 GetResponse basicGet(String queueName, boolean autoAck) throws IOException;

其中中queueName代表隊列的名稱,如果設定autoAck為false,那麼同樣需要呼叫channel.basicAck來確認訊息己被成功接收。

例子:    

GetResponse  response  =  channel.basicGet(QUEUE_NAME,  false) ;
System.out.println(new  String(response.getBody()));
channel.basicAck(response.getEnvelope().getDeliveryTag(), false);

注意點:推模式和拉模式的對比。Basic.Consume將通道(Channel)直為接收模式,直到取消佇列的訂閱為止。在接收模式期間,RabbitMQ會不斷地推送訊息給消費者,當然推送訊息的個數還是會受到Basic.Qos的限制.如果只想從佇列獲得單條訊息而不是持續訂閱,建議還是使用Basic . Get進行消費.但是不能將Basic.Get放在一個迴圈裡來代替Basic.Consume,這樣做會嚴重影響RabbitMQ的效能.如果要實現高吞吐量,消費者理應使用Basic.Consume方法。

7.消費端的確認和拒絕

       7.1 消費資訊的確認

            為了保證訊息從佇列可靠地達到消費者,RabbitMQ提供了訊息確認機制(message acknowledgement )。消費者在訂閱佇列時,可以指定autoAck引數,當autoAck等於false時,RabbitMQ會等待消費者顯式地回覆確認訊號後才從記憶體(或者磁碟)中移去訊息(實質上是先打上刪除標記,之後再刪除)。當autoAck等於t r u e時,RabbitMQ會自把傳送出去的訊息置為確認,然後從記憶體(或者磁碟)中刪除,而不管消費者是否真正地消費到了這些訊息。

採用訊息確認機制後,只要設定autoAck引數為false,消費者就有足夠的時間處理訊息(任務),不用擔心處理訊息過程中消費者程序掛掉後訊息丟失的問題,因為RabbitMQ會一直等待持有訊息直到消費者顯式呼叫Basic.Ack命令為止。當autoAck引數置為false,對於RabbitMQ服務端而言,佇列中的訊息分成了兩個部分:一部分是等待投遞給消費者的訊息:一部分是己經投遞給消費者,但是還沒有收到消費者確認訊號的訊息。如果RabbitMQ一直沒有收到消費者的確認訊號,並且消費此訊息的消費者己經斷開連線,則RabbitMQ會安排該訊息重新進入佇列,等待投遞給下一個消費者,當然也有可能還是原來的那個消費者。          

        boolean autoAck = false;  
        channel.basicQos(64);//設定客戶端最多接受違背ack的資訊的個數
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("recieve message :"+new String(body));
                try {
                    TimeUnit.SECONDS.sleep(1);  //休眠一秒在返回確定資訊
                }catch (InterruptedException e){
                    e.printStackTrace();
                }
                //這裡在消費資訊後手動確認已經獲取到資訊了,然後Broker好刪除訊息
                channel.basicAck(envelope.getDeliveryTag(),false);
            }
        };
        //這裡設定的自動應答會false
        channel.basicConsume(QUEUE_NAME,autoAck,"myConsumerTag",consumer);

  7.2 訊息的拒絕

    //拒絕一條訊息
    void basicReject(long deliveryTag, boolean requeue) throws IOException;
    //拒絕批量訊息
    void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;

        其中deliveryTag是訊息的編號,是一個64位的長整型數值。

      如果requeue引數設定為true,RabbitMQ會重新將這條訊息存入佇列,以便可以傳送給下一個訂閱的消費者;如果requeue引數設定為false,則RabbitMQ立即會把訊息從佇列中移除,而不會把它傳送給新的消費者。

      multiple引數設定為false則表示拒絕編號為deliveryTag的這一條訊息,這時候basicNack和basicReject方法一樣;multiple引數設定為true則表示拒絕deliveryTag編號之前所有未被當前消費者確認的訊息。

注意要點:將channel.basicReject或者channel.basicNack中的requeue設直為f a l s e,可以啟用"死信佇列"的功能。死信佇列可以通過檢測被拒絕或者未送達的訊息來追蹤問題,後面講。

7.3 訊息的重新獲取

    RecoverOk basicRecover() throws IOException;

    RecoverOk basicRecover(boolean requeue) throws IOException;

      這個channel.basicRecover方法用來請求RabbitMQ重新發送還未被確認的訊息。如果requeue引數設定為true,則未被確認的訊息會被重新加入到佇列中,這樣對於同一條訊息來說,可能會被分配給與前不同的消費者。如果requeue引數設定為false,那麼同一條訊息會被分配給與之前相同的消費者。預設情況下,如果不設定requeue這個引數,相當於channel.basicRecover(true),即requeue預設為true。

8.關閉連線

channel.close();
conn.close()  

顯式地關閉Channel個好習慣,但這不是必須的,在Connection關閉的時候,Channel也會自動關閉。

      AMQP協議中的Connection和Channel採用同樣的方式來管理網路失敗、內部錯誤和顯式地關閉連線。Connection和Channel所具備的生命週期如下所述:

  •  Open:開啟狀態,代表當前物件可以使用。
  • Closing:正在關閉狀態。當前物件被顯式地通知呼叫關閉方法(shutdown),這樣就產生了一個關閉請求讓其內部物件進行相應的操作,並等待這些關閉操作的完成。
  •  Closed:已經關閉狀態。當前物件已經接收到所有的內部物件已經完成關閉動作的通知,並且其也關閉了自身。Connection和Channel最終都是會成為losed的狀態,不論是程式正常呼叫的關閉方法,或者是客戶端的異常,再或者是發生了網路異常。

Connection和Channel中,與關閉相關的方法有:

addShutdownListener(ShutdownListenerlistener) ;

removeShutdownListener(ShutdownListner listener ) 

當Connection或者Channel的狀態轉變為Closed的時候會呼叫ShutdownListener。而且如果將一個ShutdownListener註冊到一個己經處於Closed狀態的物件(這裡特指Connection和Channel物件)時,會立刻呼叫ShutdownListener。

getCloseReason方法可以讓你知道物件關閉的原因;

isOpen方法檢測物件當前是否處於開啟狀態;

close(int  closeCode ,  StringcloseMessage方法顯式地通知當前物件執行關閉操作。

可以根據這個新增關閉監聽器,檢測失敗的原因,異常情況,例如:

     connection.addShutdownListener(new ShutdownListener() {
            @Override
            public void shutdownCompleted(ShutdownSignalException e) {
                if (e.isHardError()){  //這個可以知道是connection異常還是channel異常
                    Connection conn = (Connection) e.getReference();
                    if (!e.isInitiatedByApplication()){
                        Method reason = e.getReason();
                        ...
                    }
                }else {
                    Connection conn = (Connection) e.getReference();
                }

            }
        });