1. 程式人生 > >(五) RabbitMQ實戰教程(面向Java開發人員)之RabbitMQ異常處理

(五) RabbitMQ實戰教程(面向Java開發人員)之RabbitMQ異常處理

RabbitMQ異常處理

使用JAVA客戶端整合RabbitMQ進行的許多操作都會丟擲異常,我們可以自定義異常處理器進行處理,比如我們希望在RabbitMQ消費訊息失敗時記錄一條日誌,又或者在訊息消費失敗時傳送一則通知等操作

RabbitMQ Java Client

1.建立連線工具類 並設定異常處理器

public class ChannelUtils {
    public static Channel getChannelInstance(String connectionDescription) {
        try {
            ConnectionFactory connectionFactory = getConnectionFactory();
            Connection connection = connectionFactory.newConnection(connectionDescription);
            return
connection.createChannel(); } catch (Exception e) { throw new RuntimeException("獲取Channel連線失敗"); } } private static ConnectionFactory getConnectionFactory() { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("192.168.56.128"
); connectionFactory.setPort(5672); connectionFactory.setVirtualHost("/"); connectionFactory.setUsername("roberto"); connectionFactory.setPassword("roberto"); connectionFactory.setAutomaticRecoveryEnabled(true); connectionFactory.setNetworkRecoveryInterval(10000
); Map<String, Object> connectionFactoryPropertiesMap = new HashMap(); connectionFactoryPropertiesMap.put("principal", "RobertoHuang"); connectionFactoryPropertiesMap.put("description", "RGP訂單系統V2.0"); connectionFactoryPropertiesMap.put("emailAddress", "[email protected]"); connectionFactory.setClientProperties(connectionFactoryPropertiesMap); // 設定自定義異常處理器 connectionFactory.setExceptionHandler(new DefaultExceptionHandler() { @Override public void handleConsumerException(Channel channel, Throwable exception, Consumer consumer, String consumerTag, String methodName) { System.out.println("----------訊息消費異常處理----------"); System.out.println("訊息消費異常日誌記錄:" + exception.getMessage()); super.handleConsumerException(channel, exception, consumer, consumerTag, methodName); } }); return connectionFactory; } }

2.建立訊息生產者

public class MessageProducer {
    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = ChannelUtils.getChannelInstance("RGP訂單系統訊息生產者");

        channel.exchangeDeclare("roberto.order", BuiltinExchangeType.DIRECT, true, false, false,new HashMap<>());

        AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder().deliveryMode(2).contentType("UTF-8").build();
        channel.basicPublish("roberto.order", "add", false, basicProperties, "訂單資訊".getBytes());
    }
}

3.建立消費者 在進行訊息消費時我們手動丟擲了一個異常

public class MessageConsumer {
    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = ChannelUtils.getChannelInstance("RGP訂單系統訊息消費者");

        AMQP.Queue.DeclareOk declareOk = channel.queueDeclare("roberto.order.add", true, false, false, new HashMap<>());

        channel.exchangeDeclare("roberto.order", BuiltinExchangeType.DIRECT, true, false, false, new HashMap<>());

        channel.queueBind(declareOk.getQueue(), "roberto.order", "add", new HashMap<>());

        channel.basicConsume(declareOk.getQueue(), true, "RGP訂單系統ADD處理邏輯消費者", new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println(consumerTag);
                System.out.println(envelope.toString());
                System.out.println(properties.toString());
                System.out.println("訊息內容:" + new String(body));
                throw new RuntimeException("新增訂單訊息消費出現異常");
            }
        });
    }
}

4.依次啟動訊息消費者和生產者 控制檯輸出如下

RGP訂單系統ADD處理邏輯消費者
Envelope(deliveryTag=1, redeliver=false, exchange=roberto.order, routingKey=add)
#contentHeader<basic>(content-type=UTF-8, content-encoding=null, headers=null, delivery-mode=2, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
訊息內容:訂單資訊
----------訊息消費異常處理----------
訊息消費異常日誌記錄:新增訂單訊息消費出現異常

說明當消費訊息出現異常時,會進入我們自定義異常處理的邏輯。需要注意的是預設RabbitMQ Java Client在發生異常時會將Channel/Connection關閉,進而使程式未能按照預期的方向執行,所以我們在軟體設計的時候應當考慮周全

Spring AMQP配置方式

Spring AMQP在監聽器丟擲一個異常的時候它會將該異常包裝成ListenerExecutionFailedException,通常這個訊息會被拒絕並且重新放入佇列中,如果設定DefaultRequeueRejected屬性為false將把這個訊息直接丟棄。需要注意的是如果丟擲的異常是 ARADRE 或其他被RabbitMQ認為是致命錯誤的異常,即便DefaultRequeueRejected的值為true該訊息也不會重新加入佇列,而是會被直接丟棄。當丟擲異常為以下幾種異常時訊息將不會重新入佇列

1.org.springframework.amqp.support.converter.MessageConversionException
2.org.springframework.messaging.converter.MessageConversionException
3.org.springframework.messaging.handler.invocation.MethodArgumentResolutionException
4.java.lang.NoSuchMethodException
5.java.lang.ClassCastException

關於Spring AMQP異常處理可參見部落格:Spring AMQP異常處理