(五) RabbitMQ實戰教程(面向Java開發人員)之RabbitMQ異常處理
阿新 • • 發佈:2018-12-25
RabbitMQ異常處理
使用JAVA客戶端整合RabbitMQ進行的許多操作都會丟擲異常,我們可以自定義異常處理器進行處理,比如我們希望在RabbitMQ消費訊息失敗時記錄一條日誌,又或者在訊息消費失敗時傳送一則通知等操作
RabbitMQ Java Client
1.建立連線工具類 並設定異常處理器
public class ChannelUtils {
public static Channel getChannelInstance(String connectionDescription) {
try {
ConnectionFactory connectionFactory = getConnectionFactory();
Connection connection = connectionFactory.newConnection(connectionDescription);
return connection.createChannel();
} catch (Exception e) {
throw new RuntimeException("獲取Channel連線失敗");
}
}
private static ConnectionFactory getConnectionFactory() {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.56.128" );
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("roberto");
connectionFactory.setPassword("roberto");
connectionFactory.setAutomaticRecoveryEnabled(true);
connectionFactory.setNetworkRecoveryInterval(10000 );
Map<String, Object> connectionFactoryPropertiesMap = new HashMap();
connectionFactoryPropertiesMap.put("principal", "RobertoHuang");
connectionFactoryPropertiesMap.put("description", "RGP訂單系統V2.0");
connectionFactoryPropertiesMap.put("emailAddress", "[email protected]");
connectionFactory.setClientProperties(connectionFactoryPropertiesMap);
// 設定自定義異常處理器
connectionFactory.setExceptionHandler(new DefaultExceptionHandler() {
@Override
public void handleConsumerException(Channel channel, Throwable exception, Consumer consumer, String consumerTag, String methodName) {
System.out.println("----------訊息消費異常處理----------");
System.out.println("訊息消費異常日誌記錄:" + exception.getMessage());
super.handleConsumerException(channel, exception, consumer, consumerTag, methodName);
}
});
return connectionFactory;
}
}
2.建立訊息生產者
public class MessageProducer {
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = ChannelUtils.getChannelInstance("RGP訂單系統訊息生產者");
channel.exchangeDeclare("roberto.order", BuiltinExchangeType.DIRECT, true, false, false,new HashMap<>());
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().deliveryMode(2).contentType("UTF-8").build();
channel.basicPublish("roberto.order", "add", false, basicProperties, "訂單資訊".getBytes());
}
}
3.建立消費者 在進行訊息消費時我們手動丟擲了一個異常
public class MessageConsumer {
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = ChannelUtils.getChannelInstance("RGP訂單系統訊息消費者");
AMQP.Queue.DeclareOk declareOk = channel.queueDeclare("roberto.order.add", true, false, false, new HashMap<>());
channel.exchangeDeclare("roberto.order", BuiltinExchangeType.DIRECT, true, false, false, new HashMap<>());
channel.queueBind(declareOk.getQueue(), "roberto.order", "add", new HashMap<>());
channel.basicConsume(declareOk.getQueue(), true, "RGP訂單系統ADD處理邏輯消費者", new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(consumerTag);
System.out.println(envelope.toString());
System.out.println(properties.toString());
System.out.println("訊息內容:" + new String(body));
throw new RuntimeException("新增訂單訊息消費出現異常");
}
});
}
}
4.依次啟動訊息消費者和生產者 控制檯輸出如下
RGP訂單系統ADD處理邏輯消費者
Envelope(deliveryTag=1, redeliver=false, exchange=roberto.order, routingKey=add)
#contentHeader<basic>(content-type=UTF-8, content-encoding=null, headers=null, delivery-mode=2, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
訊息內容:訂單資訊
----------訊息消費異常處理----------
訊息消費異常日誌記錄:新增訂單訊息消費出現異常
說明當消費訊息出現異常時,會進入我們自定義異常處理的邏輯。需要注意的是預設RabbitMQ Java Client在發生異常時會將Channel/Connection關閉,進而使程式未能按照預期的方向執行,所以我們在軟體設計的時候應當考慮周全
Spring AMQP配置方式
Spring AMQP在監聽器丟擲一個異常的時候它會將該異常包裝成ListenerExecutionFailedException,通常這個訊息會被拒絕並且重新放入佇列中,如果設定DefaultRequeueRejected屬性為false將把這個訊息直接丟棄。需要注意的是如果丟擲的異常是 ARADRE 或其他被RabbitMQ認為是致命錯誤的異常,即便DefaultRequeueRejected的值為true該訊息也不會重新加入佇列,而是會被直接丟棄。當丟擲異常為以下幾種異常時訊息將不會重新入佇列
1.org.springframework.amqp.support.converter.MessageConversionException
2.org.springframework.messaging.converter.MessageConversionException
3.org.springframework.messaging.handler.invocation.MethodArgumentResolutionException
4.java.lang.NoSuchMethodException
5.java.lang.ClassCastException
關於Spring AMQP異常處理可參見部落格:Spring AMQP異常處理