RabbitMQ 客戶端API使用
目錄
RabbitMQ Java客戶端使用com.rabbitmq.client作為頂級包名,關鍵的Class和Interface有Channel、Connection、ConnectionFactory、Consumer等。AMQP協議層面的操作通過Channel介面實現。Connection是用來開啟Channel(通道)的,可以註冊事件處理器,也可以在應用結束時關閉連線。與RabbitMQ相關的開發工作,基本上也是圍繞Connection和Channel這兩個類展開的。
這裡按照一個完整的運轉流程進行講解,詳細內容有:連線、交換器/佇列的建立與繫結、傳送訊息、消費訊息、消費訊息的確認和關閉連線。
一、連線RabbitMQ伺服器
連線ActiveMQ伺服器的方式一:
ConnectionFactory factory = new ConnectionFactory(); factory.setHost(IP_ADDRESS); factory.setPort(PORT); factory.setVirtualHost(virtualHost); //虛擬訊息伺服器host factory.setUsername("root"); factory.setPassword("root"); Connection connection = factory.newConnection(); //建立連線
使用uri來連線ActiveMQ方式二:
ConnectionFactory factory = new ConnectionFactory();
factory.setUri("amqp://username:[email protected]:portNumber/virtualHost");
Connection connection = factory.newConnection(); //建立連線
Channel channel = connection.createChannel();//建立通道
注意點:
Connection可以用來建立多個Channel例項,但是Channel例項不能線上程問共享,應用程式應該為每一個執行緒開闢一個Channel。某些情況下Channel的操作可以併發執行,但是在其他情況下會導致在網路上出現錯誤的通訊幀交錯,同時也會影響傳送方確認(publisher confirm)機制的執行(後面介紹),所以多執行緒問共享Channel例項是非執行緒安全的。
Channel或者Connection中有個isOpen方法可以用來檢測其是否己處於開啟狀態(),但是我們一般不使用這個方法,一般用CreateXXX,newXX方法後,我們預設的認為Connection,Channel已經成功的開啟了,如果在使用Channel的時候其己經處於關閉狀態,那麼程式會丟擲一個com.rabbitmq.client.ShutdownSignalException,我們只需捕獲這個異常即可。當然同時也要試著捕獲IOExceptio口或者SocketException,以防Connection意外關閉。
二、使用交換器和佇列
交換器和佇列是AMQP中high level層面的構建模組,應用程式需確保在使用它們的時候就已經存在了,在使用之前需要先宣告(declare)它們,然後再bind起來,這其中涉及到了bindingKey,程式碼:
Connection connection = factory.newConnection(); //建立連線
Channel channel = connection.createChannel();//建立通道
//建立一個type="direct" 、持久化,非自動刪除的交換器
channel.exchangeDeclare(EXCHANGE_NAME,"direct",true,false,null);
//建立一個持久化,非排他的,非自動刪除的佇列
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
//將交換器與佇列通過路由繫結
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY);
1.宣告查詢刪除交換器
1.1 宣告交換器方法如下:
DeclareOk exchangeDeclare(String exchange, String type, boolean durable,
boolean autodelete,boolean internal, Map<String, Object> arguments) throws IOException;
引數詳細說明如下所述:
- exchange:交換器的名稱。
- type:交換器的型別,常見的如fanout、direct、topic,header 詳情看上一章。
- durable:設定是否持久化。durable設定為true表示持久化,反之是非持久化。持久化可以將交換器存檔,在伺服器重啟的時候不會丟失相關資訊。
- autoDelete:設定是否自動刪除。autoDelete設定為true則表示自動刪除。自動刪除的前提是至少有一個佇列或者交換器與這個交換器繫結,之後所有與這個交換器繫結的佇列或者交換器都與此解綁。注意不能錯誤地把這個引數理解為:"當與此交換器連線的客戶端都斷開時,RabbitMQ會自動刪除本交換器"。
- internal:設定是否是內建的。如果設定為true,則表示是內建的交換器,客戶端程式無法直接傳送訊息到這個交換器中,只能通過交換器路由到交換器這種方式。
- argument:其他一些結構化引數,比如alternate-exchange(有alternateexchange的詳情後面介紹)。
相關的過載方法:
其中沒有帶的引數,布林值預設是false,Map引數為null
DeclareOk exchangeDeclare(String exchange, String type) throws IOException;
DeclareOk exchangeDeclare(tring exchange, String type, boolean durable) throws IOException;
DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments) throws IOException;
DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException;
1.2 與上面相對應的把第二引數變為BuiltinExchangeType type,這是封裝的列舉type,也可以通過這個來生命交換器型別,原始碼如下:
public enum BuiltinExchangeType {
DIRECT("direct"),
FANOUT("fanout"),
TOPIC("topic"),
HEADERS("headers");
private final String type;
private BuiltinExchangeType(String type) {
this.type = type;
}
public String getType() {
return this.type;
}
}
1.3 還有宣告為noWait的方法,這個nowait引數指的是AMQP中Exchange.Declare命令的引數,意思是不需要伺服器返回,注意這個方法的返回值是void,而普通的exchangeDeclare方法的返回值是Exchange.DeclareOk,意思是在客戶端聲明瞭一個交換器之後,需要等待伺服器的返回(伺服器會返回Exchange.Declare-Ok這個AMQP命令)。針對"exchangeDeclareNoWait不需要伺服器任何返回值"這一點,考慮這樣一種情況,在宣告完一個交換器之後(實際伺服器還並未完成交換器的建立),那麼此時客戶端緊接著使用這個交換器,必然會發生異常。如果沒有特殊的緣由和應用場景,並不建議使用這個方法。
public void exchangeDeclareNoWait(String exchange, String type, boolean durable,
boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException
1.4 檢測交換器是否存在的方法,這個方法在實際應用過程中還是非常有用的,它主要用來檢測相應的交換器是否存在。如果存在則正常返回:如果不存在則丟擲異常:404 channel exception,同時Channel也會被關閉。
public DeclareOk exchangeDeclarePassive(String name)
1.5 刪除交換器的方法,方法中exchange表示交換器的名稱,而ifUnused用來設定是否在交換器沒有被使用的情況下刪除。如果isUnused設定為true,則只有在此交換器沒有被使用的情況下才會被刪除:如果設定false,則無論如何這個交換器都要被刪除,預設是false的。
DeleteOk exchangeDelete(String exchange, boolean ifUnused) throws IOException;
void exchangeDeleteNoWait(String exchange, boolean ifUnused) throws IOException;
DeleteOk exchangeDelete(String exchange) throws IOException; //預設為false
2. 宣告查詢刪除佇列
2.1 這裡只有兩個方法:
Queue.DeclareOk queueDeclare() throws IOException;
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive,
boolean autoDelete, Map<String, Object> arguments) throws IOException;
不帶任何引數的queueDeclare方法預設建立一個由RabbitMQ命名的(類似這種amq.gen-LhQzlgv3DOv8PIDabOXA名稱,這種佇列也稱之為匿名佇列〉、排他的(true)、自動刪除的(true)、非持久化(false)的佇列,如下所示獲取佇列的名字:
String queueName = channel.queueDeclare().getQueue( );
方法的引數詳細說明如下:
- queue:佇列的名稱
- durable:設定是否持久化。為true則設定佇列為持久化。持久化的佇列會存檔,在伺服器重啟的時候可以保證不丟失相關資訊
- exclusive:設定是否排他。為t r u e則設定佇列為排他的。如果一個佇列被宣告為排他佇列,該佇列僅對首次宣告它的連線可見,並在連線斷開時自動刪除。這裡需要注意三點:排他佇列是基於連線(Connection )可見的,同一個連線的不同通道( Channel )是可以同時訪問同一連線建立的排他佇列;"首次"是指如果一個連線己經聲明瞭一個排他佇列,其他連線是不允許建立同名的排他佇列的,這個與普通佇列不同:即使該佇列是持久化的,一旦連線關閉或者客戶端退出,該排他佇列都會被自動刪除,這種佇列適用於一個客戶端同時傳送和讀取訊息的應用場景
- autoDelete:設定是否自動刪除。為t r u e則設定佇列為自動刪除。自動刪除的前提是:至少有一個消費者連線到這個佇列,之後所有與這個佇列連線的消費者都斷開時,才會自動刪除。不能把這個引數錯誤地理解為:"當連線到此佇列的所有客戶端斷開時,這個佇列自動刪除",因為生產者客戶端建立這個佇列,或者沒有消費者客戶端與這個佇列連線時,都不會自動刪除這個佇列
- argurnents:設定佇列的其他一些引數,如x-message-ttl、x-expires、x-max-length、x-max-length-bytes、x-dead-letter-exchange、x-dead-letter-routing-key ,x-max-priority等
注意點:
生產者和消費者都能夠使用queueDeclare來宣告一個佇列,但是如果消費者在同一個通道上訂閱了另一個佇列,就無法再宣告隊列了。必須先取消訂閱,然後將通道置為"傳輸"模式,才能宣告佇列
2.2 這裡也有一個nowait宣告queue的方法,返回值void和exchangeDeclareNoWait類似, 表示不需要服務端的任何返回。同樣也需要注意,在呼叫完queueDeclareNoWait方法之後,緊接著使用宣告的佇列時有可能會發生異常情況。
public void queueDeclareNoWait(String queue, boolean durable, boolean exclusive,
boolean autoDelete, Map<String, Object> arguments)
2.3 同樣這裡還有個queueDeclarePassive的方法,也比較常用。這個方法用來檢測相應的佇列是否存在。如果存在則正常返回,如果不存在則丟擲異常:404 channel exception,同時Channel也會被關閉。方法定義如下:
public queueDeclarePassive(String queue)
2.4 刪除佇列的方法
- 其中queue表示佇列的名稱,
- ifUnused以參考交換器(ifUnused用來設定是否在交換器沒有被使用的情況下刪除。如果isUnused設定為true,則只有在此交換器沒有被使用的情況下才會被刪除:如果設定false,則無論如何這個交換器都要被刪除,預設是false的)
- ifEmpty設定為t r u e表示在佇列為空(佇列裡面沒有任何訊息堆積)的情況下才能夠刪除。
public com.rabbitmq.client.AMQP.Queue.DeleteOk queueDelete(String queue) throws IOException {
return this.queueDelete(queue, false, false);
}
public com.rabbitmq.client.AMQP.Queue.DeleteOk queueDelete(String queue, boolean ifUnused, boolean ifEmpty) throws IOException {
this.deleteRecordedQueue(queue);
return this.delegate.queueDelete(queue, ifUnused, ifEmpty);
}
public void queueDeleteNoWait(String queue, boolean ifUnused, boolean ifEmpty) throws IOException {
this.deleteRecordedQueue(queue);
this.delegate.queueDeleteNoWait(queue, ifUnused, ifEmpty);
}
3. 佇列繫結交換器和解綁 queueBind
3.1 繫結佇列和交換器
方法中涉及的引數詳解:
- queue:佇列名稱
- exchange:交換器的名稱
- routingKey:用來繫結佇列和交換器的路由鍵
- argument:定義繫結的一些引數
public Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments)
public AMQP.Queue.BindOk queueBind(String queue, String exchange, String routingKey)
public void queueBindNoWait(String queue, String exchange, String routingKey, Map<String, Object> arguments)
3.2 將繫結的佇列和交換器解綁
public Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey, Map<String, Object> arguments)
public Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey)
4.交換器和交換器繫結exchangeBind
這裡的destination表示和佇列相連的exchange交換器的名字,source表示和訊息釋出者相連的交換器名字。
public BindOk exchangeBind(String destination, String source, String routingKey, Map<String, Object> arguments)
public BindOk exchangeBind(String destination, String source, String routingKey)
void exchangeBindNoWait(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException
例子:
生產者傳送訊息至交換器source中,交換器source根據路由鍵找到與其匹配的另一個交換器destination,井把訊息轉發到destination中,進而儲存在destination繫結的佇列queue中,如下圖。
channel.exchangeDeclare("source","direct",false,true,null) ;
channel.exchangeDeclare("destination","fanout",false,true ,null );
channel.exchangeBind("destination","source","exKey");
channel.queueDeclare("queue", false, false, true, null );
channel.queueBind("queue","destination")
channel.basicPublish("source","exKey", null ,"exToExDemo".getBytes( )) ;
5.傳送訊息
方法用basicPublish如下:
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body);
void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body);
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body);
上面的引數解析如下:
- exchange:交換器的名稱,指明訊息需要傳送到哪個交換器中。如果設定為空字串,則訊息會被髮送到RabbitMQ預設的交換器中。
- routingKey:路由鍵,交換器根據路由鍵將訊息儲存到相應的佇列之中。
- props:訊息的基本屬性集,其包含14個屬性成員,分別有contentType、contentEncoding、headers(Map<String,Object>)、deliveryMode、priority、correlationld、replyTo、expiration、messageld、timestamp、type、userld、appld、clusterld。其中常用的幾種都在上面的示例中進行了演示。
- byte[] body:訊息體(payload),真正需要傳送的訊息。
- mandatory和immediate的詳細內容後面講解
常用的訊息傳送例子:
//傳送持久化資訊
String message = "hello world";
channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
這裡的PERSISTENT_TEXT_PLAIN如下所示:
public static final BasicProperties PERSISTENT_TEXT_PLAIN = new BasicProperties("text/plain",
(String)null, (Map)null, 2, 0, (String)null, (String)null, (String)null, (String)null, (Date)null, (String)null, (String)null, (String)null, (String)null);
5.1 上面這行程式碼傳送了一條訊息,這條訊息的投遞模式(deliveryMode)設定為2,即訊息會被持久化(即存入磁碟)在伺服器中。同時這條訊息的優先順序(priority)設定為1,content-type為"text/plain ",可以自己設訊息的屬性:
channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY,
new AMQP.BasicProperties().builder()
.contentType("text/plain")
.deliveryMode(2)
.priority(1)
.userId("hidden")
.build(),
message.getBytes());
5.2 也可以傳送帶有headers的訊息:
Map<String,Object> headers = new HashMap<>();
headers.put("location","here");
headers.put("time","today");
channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY,
new AMQP.BasicProperties().builder().headers(headers).build(),message.getBytes());
5.3 帶有過期(expiration)資訊的訊息:
channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY,
new AMQP.BasicProperties().builder().expiration("60000").build(),message.getBytes());
6.消費訊息
RabbitMQ的消費模式分兩種:推(Push)模式和拉(Pull)模式。推模式採用Basic.Consume進行消費,而拉模式則是呼叫Basic.Get進行消費。
6.1 推模式push
可以通過持續訂閱的方式來消費,使用到的類Consumer,DefaultConsumer,接受訊息一般通過實現Consumer介面或者繼承DefaultConsumer類,當呼叫Consumer相關的API方法時,不同的訂閱需要制定不同的消費者標籤(consumerTag)來區分彼此,在同一個channel中的消費者也需要通過唯一的消費標籤用來做區分,關鍵消費程式碼如下:
boolean autoAck = false;
channel.basicQos(64);//設定客戶端最多接受違背ack的資訊的個數
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("recieve message :"+new String(body));
try {
TimeUnit.SECONDS.sleep(1); //休眠一秒在返回確定資訊
}catch (InterruptedException e){
e.printStackTrace();
}
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
//這裡的自動應答被關閉,通過手動確認收到訊息後在做應答,這個可以有效的防止訊息不必要的丟失。
channel.basicConsume(QUEUE_NAME,autoAck,"myConsumerTag",consumer);
相關的消費方法:
String basicConsume(String queue, Consumer callback) throws IOException;
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
String basicConsume(String queue, boolean autoAck, String consumerTag, Consumer callback) throws IOException;
String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments, Consumer callback) throws IOException;
String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, Consumer callback) throws IOException;
對應的引數說明如下所述:
- queue:佇列的名稱;
- autoAck:設定是否自動確認。建議設成false,即不自動確認,預設不設定也是false;
- consumerTag:消者標籤,用來區分多個消費者,不設定預設“”;
- noLocal:設定為true則表示不能將同一個Connection中生產者傳送的訊息傳送給這個Connection中的消費者,預設是false;
- exclusive:設定是否排他,預設是false;
- arguments:設定消費者的其他引數,預設是null;
- callback:設定消費者的回撥函式。用來處理RabbitMQ推送過來的訊息,比如DefaultConsumer,使用時需要客戶端重寫(override )其中的方法。
對於消費者需要消費訊息一般都是重寫handleDelivery方法
public interface Consumer {
//這個方法會在其他方法呼叫前,返回消費者標籤
void handleConsumeOk(String consumerTag);
//取消訂閱的時候呼叫
void handleCancelOk(String consumerTag);
void handleCancel(String consumerTag) throws IOException;
//重寫這個方法會在channel和Connection關閉時候呼叫
void handleShutdownSignal(String consumerTag, ShutdownSignalException sig);
void handleRecoverOk(String consumerTag);
void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException;
}
我們可以通過channel.basicCancel方法來顯式地取消一個消費者的訂閱,channel.basicCancel(consumerTag);這行程式碼執行會首先觸發handleConsumerOk方法,然後呼叫handleDelivery方法,最後才出發handleCancelOk方法。
和生產者一樣,消費者客戶端同樣需要考慮執行緒安全的問題。消費者客戶端的這些callback會被分配到與Channel不同的執行緒池上,這意味著消費者客戶端可以安全地呼叫這些阻塞方法,比如channel.queueDeclare、channel.basicCancel等。
每個Channel都擁有自己獨立的執行緒。最常用的做法是一個Channel對應一個消費者,也就是意味著消費者彼此之間沒有任何關聯。當然也可以在一個Channel中維持多個消費者,但是要注意一個問題,如果Channel中的一個消費者一直在執行,那麼其他消費者的callback會被"耽擱"。
6.2 拉模式
拉模式的消費方式,通過channel.basicGet方法可以單條地獲取訊息,其返回值是GetResponse,Channel類的basicGet方法沒有其他過載方法,只有:
GetResponse basicGet(String queueName, boolean autoAck) throws IOException;
其中中queueName代表隊列的名稱,如果設定autoAck為false,那麼同樣需要呼叫channel.basicAck來確認訊息己被成功接收。
例子:
GetResponse response = channel.basicGet(QUEUE_NAME, false) ;
System.out.println(new String(response.getBody()));
channel.basicAck(response.getEnvelope().getDeliveryTag(), false);
注意點:推模式和拉模式的對比。Basic.Consume將通道(Channel)直為接收模式,直到取消佇列的訂閱為止。在接收模式期間,RabbitMQ會不斷地推送訊息給消費者,當然推送訊息的個數還是會受到Basic.Qos的限制.如果只想從佇列獲得單條訊息而不是持續訂閱,建議還是使用Basic . Get進行消費.但是不能將Basic.Get放在一個迴圈裡來代替Basic.Consume,這樣做會嚴重影響RabbitMQ的效能.如果要實現高吞吐量,消費者理應使用Basic.Consume方法。
7.消費端的確認和拒絕
7.1 消費資訊的確認
為了保證訊息從佇列可靠地達到消費者,RabbitMQ提供了訊息確認機制(message acknowledgement )。消費者在訂閱佇列時,可以指定autoAck引數,當autoAck等於false時,RabbitMQ會等待消費者顯式地回覆確認訊號後才從記憶體(或者磁碟)中移去訊息(實質上是先打上刪除標記,之後再刪除)。當autoAck等於t r u e時,RabbitMQ會自把傳送出去的訊息置為確認,然後從記憶體(或者磁碟)中刪除,而不管消費者是否真正地消費到了這些訊息。
採用訊息確認機制後,只要設定autoAck引數為false,消費者就有足夠的時間處理訊息(任務),不用擔心處理訊息過程中消費者程序掛掉後訊息丟失的問題,因為RabbitMQ會一直等待持有訊息直到消費者顯式呼叫Basic.Ack命令為止。當autoAck引數置為false,對於RabbitMQ服務端而言,佇列中的訊息分成了兩個部分:一部分是等待投遞給消費者的訊息:一部分是己經投遞給消費者,但是還沒有收到消費者確認訊號的訊息。如果RabbitMQ一直沒有收到消費者的確認訊號,並且消費此訊息的消費者己經斷開連線,則RabbitMQ會安排該訊息重新進入佇列,等待投遞給下一個消費者,當然也有可能還是原來的那個消費者。
boolean autoAck = false;
channel.basicQos(64);//設定客戶端最多接受違背ack的資訊的個數
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("recieve message :"+new String(body));
try {
TimeUnit.SECONDS.sleep(1); //休眠一秒在返回確定資訊
}catch (InterruptedException e){
e.printStackTrace();
}
//這裡在消費資訊後手動確認已經獲取到資訊了,然後Broker好刪除訊息
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
//這裡設定的自動應答會false
channel.basicConsume(QUEUE_NAME,autoAck,"myConsumerTag",consumer);
7.2 訊息的拒絕
//拒絕一條訊息
void basicReject(long deliveryTag, boolean requeue) throws IOException;
//拒絕批量訊息
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
其中deliveryTag是訊息的編號,是一個64位的長整型數值。
如果requeue引數設定為true,RabbitMQ會重新將這條訊息存入佇列,以便可以傳送給下一個訂閱的消費者;如果requeue引數設定為false,則RabbitMQ立即會把訊息從佇列中移除,而不會把它傳送給新的消費者。
multiple引數設定為false則表示拒絕編號為deliveryTag的這一條訊息,這時候basicNack和basicReject方法一樣;multiple引數設定為true則表示拒絕deliveryTag編號之前所有未被當前消費者確認的訊息。
注意要點:將channel.basicReject或者channel.basicNack中的requeue設直為f a l s e,可以啟用"死信佇列"的功能。死信佇列可以通過檢測被拒絕或者未送達的訊息來追蹤問題,後面講。
7.3 訊息的重新獲取
RecoverOk basicRecover() throws IOException;
RecoverOk basicRecover(boolean requeue) throws IOException;
這個channel.basicRecover方法用來請求RabbitMQ重新發送還未被確認的訊息。如果requeue引數設定為true,則未被確認的訊息會被重新加入到佇列中,這樣對於同一條訊息來說,可能會被分配給與前不同的消費者。如果requeue引數設定為false,那麼同一條訊息會被分配給與之前相同的消費者。預設情況下,如果不設定requeue這個引數,相當於channel.basicRecover(true),即requeue預設為true。
8.關閉連線
channel.close();
conn.close()
顯式地關閉Channel個好習慣,但這不是必須的,在Connection關閉的時候,Channel也會自動關閉。
AMQP協議中的Connection和Channel採用同樣的方式來管理網路失敗、內部錯誤和顯式地關閉連線。Connection和Channel所具備的生命週期如下所述:
- Open:開啟狀態,代表當前物件可以使用。
- Closing:正在關閉狀態。當前物件被顯式地通知呼叫關閉方法(shutdown),這樣就產生了一個關閉請求讓其內部物件進行相應的操作,並等待這些關閉操作的完成。
- Closed:已經關閉狀態。當前物件已經接收到所有的內部物件已經完成關閉動作的通知,並且其也關閉了自身。Connection和Channel最終都是會成為losed的狀態,不論是程式正常呼叫的關閉方法,或者是客戶端的異常,再或者是發生了網路異常。
Connection和Channel中,與關閉相關的方法有:
addShutdownListener(ShutdownListenerlistener) ;
removeShutdownListener(ShutdownListner listener )
當Connection或者Channel的狀態轉變為Closed的時候會呼叫ShutdownListener。而且如果將一個ShutdownListener註冊到一個己經處於Closed狀態的物件(這裡特指Connection和Channel物件)時,會立刻呼叫ShutdownListener。
getCloseReason方法可以讓你知道物件關閉的原因;
isOpen方法檢測物件當前是否處於開啟狀態;
close(int closeCode , StringcloseMessage方法顯式地通知當前物件執行關閉操作。
可以根據這個新增關閉監聽器,檢測失敗的原因,異常情況,例如:
connection.addShutdownListener(new ShutdownListener() {
@Override
public void shutdownCompleted(ShutdownSignalException e) {
if (e.isHardError()){ //這個可以知道是connection異常還是channel異常
Connection conn = (Connection) e.getReference();
if (!e.isInitiatedByApplication()){
Method reason = e.getReason();
...
}
}else {
Connection conn = (Connection) e.getReference();
}
}
});