十一、訊息對佇列和消費確認
訊息到佇列
訊息傳送後,如何確定到訊息是否到達了相應的佇列?RabbitMQ預設在傳送訊息時,如果不能根據交換器型別和路由鍵找到相應的佇列,訊息將直接丟棄。而要想知道訊息是否達到佇列或沒到佇列卻不想訊息丟失,RabbitMQ提供有解決方案:
設定
mandatory
引數- 為
true
時,交換器無法根據自身的型別和路由鍵找到一個符合條件的佇列,訊息返回給生產者; - 為
false
時,交換器無法根據自身的型別和路由鍵找到一個符合條件的佇列,訊息直接丟棄。RabbitMQ預設。
可以通過呼叫
channel.addReturnListener
來新增ReturnListener
public class Send { final static String EXCHANGE = "mandatoryParam"; public static void main(String[] args) { Connection connection = null; Channel channel; try { connection = ConnectionUtils.getConnection(); channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.DIRECT); //訊息傳送時,設定mandatory引數為true channel.basicPublish(EXCHANGE, "", true, null, "test".getBytes("utf-8")); //監聽訊息是否路由到佇列,沒路由到將接收到返回的訊息 channel.addReturnListener(new ReturnListener() { @Override public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("沒找到符合的佇列而退回的訊息 = " + new String(body, "utf-8")); } }); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); }finally { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } }
當傳送訊息不能成功路由到佇列時,ReturnListener會監聽到返回的"test"訊息並列印輸出。
- 為
設定immediate引數
訊息關聯的佇列上有消費者,就投遞;如果所有匹配的佇列上都沒有消費者,則將訊息返回給生產者。
- 為
true
時,如果所有匹配的佇列上都沒有消費者,則將訊息返回給生產者。 - 為
false
時,佇列有沒有消費者都投遞。
RabbitMQ3.0版本開始已經不支援對imediate引數的支援(建議採用TTL和DLX),瞭解即可。
- 為
備份交換器
生產者在傳送訊息的時候如果不設定mandatory引數,那麼訊息在未被路由的情況下將會丟失。
如果設定了mandatory引數,那麼需要新增ReturnListener的程式設計邏輯,生產者的程式碼將變得複雜。
如果既不想複雜化生產者的程式設計邏輯,又不想訊息丟失,那麼可以使用備份交換器,這樣可以將未被路由到佇列的訊息儲存在RabbitMQ中,再在需要的時候去處理這些訊息。
備份交換器的使用有兩種方法:
在呼叫
channel.exchangeDeclare
方法時新增alternate-exchange
引數來實現訊息傳送客戶端程式碼:
public class Send { final static String exchange = "正常交換器"; final static String exchange2 = "備份交換器"; final static String queue = "正常佇列"; final static String queue2 = "備份佇列"; public static void main(String[] args) { Connection connection = null; Channel channel; try { connection = ConnectionUtils.getConnection(); channel = connection.createChannel(); //要使用備份交換器exchange2,必須要設定該引數 Map<String, Object> param = new HashMap<>(); param.put("alternate-exchange", exchange2); //建立正常交換器,訊息正常路由到佇列 channel.exchangeDeclare(exchange, BuiltinExchangeType.DIRECT, true, false, param); channel.queueDeclare(queue, true, false, false, null); channel.queueBind(queue, exchange, exchange); //建立備份路由器,訊息不能正常路由到佇列後,則訊息傳送到該備份路由器從而路由到備份佇列 channel.exchangeDeclare(exchange2, BuiltinExchangeType.FANOUT, true, false, null); channel.queueDeclare(queue2, true, false, false , null); channel.queueBind(queue2, exchange2, ""); //測試不能正常路由到佇列 channel.basicPublish(exchange, "無效的路由鍵", MessageProperties.PERSISTENT_TEXT_PLAIN, "備份交換器".getBytes("utf-8")); } catch (IOException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); }finally { try { connection.close(); } catch (IOException e) { e.printStackTrace(); } } } }
如上面傳送訊息程式碼所示,傳送到交換器的訊息因為RoutingKey不匹配不能正常路由到佇列,如果沒有設定
mandatory
引數或備份交換器,訊息將會直接丟棄。而現在使用了備份交換器,訊息將重新發送到備份交換器,備份交換器再路由到備份佇列中,從而訊息不會丟失。對於備份交換器,和普通交換器一樣,支援交換器四大型別。建議使用
fanout
型別,因為訊息被髮送到備份交換器的路由鍵和從生產者發出的路由鍵是一樣的,如果使用direct
型別,則備份交換器和備份佇列的路由鍵和生產者指定的路由鍵一樣,但如果生產者因為RoutingKey錯誤導致不能路由到正常佇列,這時,備份路由器就不能預知到生產者錯誤路由鍵和備份路由器的路由鍵匹配而不能路由到備份佇列也會導致訊息丟失。而使用fanout
型別,即使生產者錯誤的路由鍵也能路由到備份佇列中。也可以通過策略(Policy,後面說明使用)的方法實現。
命令列輸入:
rabbitmqctl set_policy AE "^正常交換器$" '{"alternate-exchange":"備份交換器"}'
如果兩者同時使用,則前者比後者優先順序高,會覆蓋掉Policy的設定。
即使設定了備份交換器,以下幾種情況仍然會導致訊息:
- 設定的備份交換器不存在
- 備份交換器沒有繫結任何佇列
- 備份交換器沒有任何匹配的佇列
如果備份交換器和
mandatory
引數一起使用,mandatory
引數將無效。
消費確認
在上面程式中,如果RabbitMQ一旦向消費客戶端傳送訊息後,立即將其標記刪除。在這時,剛好消費客戶端接收到還沒處理,突然因為其他原因而死掉,此時就出現訊息會永久丟失的情況。
為了確保訊息不會丟失,要使用RabbitMQ的訊息確認。消費者發出ack(nowledgement)告訴RabbitMQ已接收並處理,RabbitMQ再刪除。
RabbitMQ訊息確認:如果消費者死亡(通道關閉、連線關閉或TCP連線丟失等)而不傳送確認給RabbitMQ,RabbitMQ將理解訊息未完全處理並將重新進入佇列,訊息將重新發送,並且該訊息不會超時失效。
要使用訊息確認,要開啟手動訊息確認。在前面的工作佇列的例項,在channel.basicConsume(QUEUE, true, consumer);
中,autoAck = true
為自動訊息確認,需要設定autoAck = false
關閉自動確認來使用手動訊息確認。
設定autoAck = true
後,設定訊息確認有三個方法:
channel.basicAck(long deliveryTag, boolean requeue):用於訊息成功接收
引數:
- deliveryTag:RabbitMQ傳送訊息的標籤,是單調遞增的正整數,可通過
envelope.getDeliveryTag()
獲取 - requeue:
- 為
true
時,RabbitMQ將自動確認所有指定小於deliveryTag
的訊息,例如:在Channel上傳遞過來的deliveryTag
標籤有5
、6
、7
、8
,當消費者先接收到標籤8
並確認,則小於8
的有5
、6
、7
都全部確認。 - 為
false
時,對於上面的例子,只有8
確認,其他不會確認。
- 為
- deliveryTag:RabbitMQ傳送訊息的標籤,是單調遞增的正整數,可通過
channel.basicNack(long deliveryTag, boolean multiple,boolean requeue):用於拒絕接收(批量)
引數:
deliveryTag:RabbitMQ傳送訊息的標籤,是單調遞增的正整數,可通過
envelope.getDeliveryTag()
獲取multiple:
為
true
,則表示拒絕deliveryTag
編號以下所有未被當前消費者確認的訊息。為
false
,則表示拒絕編號為deliveryTag
的這一條訊息,這時候basicNack
和basicReject
方法一樣;
requeue:被拒絕的訊息是否重新入佇列。true時,重新入佇列重發給下一個消費者;false時,立即把訊息刪除,不再會發給消費者
channel.basicReject(long deliveryTag, boolean requeue):用於拒絕接收(單條)
引數意義同上
訊息成功確認
生產者客戶端傳送訊息
/**
* @author Hayson
* @date 2018/11/23 13:39
* @description rabbitmq生產者傳送多條訊息
*/
public class Send {
final static String QUEUE = "helloWorld";
public static void main(String[] args) throws IOException, TimeoutException {
send();
}
public static void send() throws IOException, TimeoutException {
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE, false, false, false, null);
for (int i = 0; i < 5; i++) {
String message = "Hello World! " + i;
//傳送訊息,指定傳送交換器(""則為自帶預設交換器)
channel.basicPublish("", QUEUE, null, message.getBytes("UTF-8"));
System.out.println("傳送訊息:" + message);
}
channel.close();
connection.close();
}
}
消費者客戶端成功接收訊息:
/**
* @author Hayson
* @date 2018/11/23 13:41
* @description rabbitmq消費者接收訊息2
*/
public class Receiver2 {
final static String QUEUE = "helloWorld";
public static void main(String[] args) throws IOException, TimeoutException {
recevier();
}
public static void recevier() throws IOException, TimeoutException {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE, false, false, false, null);
//關閉自動確認
boolean autoAck = false;
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag,
Envelope envelope,
AMQP.BasicProperties properties,
byte[] body)throws IOException {
//獲取標籤
long deliveryTag = envelope.getDeliveryTag();
//為false, 確認只接收當前deliveryTag
channel.basicAck(deliveryTag, false);
//為true, 確認接收所有小於等於deliveryTag的訊息
//channel.basicAck(deliveryTag, true);
String message = new String(body, "UTF-8");
System.out.println("接收到訊息:" + message);
}
};
//關閉自動確認
channel.basicConsume(QUEUE, autoAck, consumer);
//channel.close();
//connection.close();
}
}
上面消費者客戶端程式碼中,autoAck = false
和multiple = false
表示確認了成功接收了當前deliveryTag的訊息。autoAck = false
和multiple = true
表示確認接收所有小於等於deliveryTag的訊息。
如果設定了autoAck = false
,而不確認接收訊息時,例如註釋上面消費者客戶端程式碼channel.basicAck(deliveryTag, false);
後,在消費者接收到訊息後不確認返回給生產者時,生產者不會刪除佇列中當前的訊息並標記為unAcked
:
訊息拒絕確認
對於上面消費者客戶端程式碼,修改如下:
boolean autoAck = false; //繼承DefaultConsumer類來實現消費,獲取訊息 Consumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)throws IOException { long deliveryTag = envelope.getDeliveryTag(); // 消費者拒絕當前標籤訊息確認,並重新進入佇列傳送 //channel.basicNack(deliveryTag, false, true); // 消費者拒絕當前標籤訊息確認,並刪除佇列訊息 channel.basicNack(deliveryTag, false, false); String message = new String(body, "UTF-8"); System.out.println("接收到訊息:" + message); } }; channel.basicConsume(QUEUE, autoAck, consumer);
上面訊息拒絕確認使用了channel.basicNack方法,當然也可以使用channel.basicReject方法。消費者拒絕訊息後,如果標記了重新進入佇列傳送,有可能會重新發送給當前消費者,也有可能傳送給其他消費者。如果拒絕後不需要的訊息,可以標記直接刪除。