1. 程式人生 > >Spring AMQP 源碼分析 05 - 異常處理

Spring AMQP 源碼分析 05 - 異常處理

.html strong necessary hand invoke dex eas sam sta

### 準備

## 目標

了解 Spring AMQP Message Listener 如何處理異常

## 前置知識

《Spring AMQP 源碼分析 04 - MessageListener

## 相關資源

Offical doc:<http://docs.spring.io/spring-amqp/docs/1.7.3.RELEASE/reference/html/_reference.html#exception-handling> Sample code:<https://github.com/gordonklg/study>,rabbitmq module 源碼版本:Spring AMQP 1.7.3.RELEASE

## 測試代碼

gordon.study.rabbitmq.springamqp.AsyncConsumerWithErrorHandler.java 技術分享

### 分析

## 消息消費異常處理流程

根據上一遍文章的分析,示例代碼 AsyncConsumerWithErrorHandler 第26行開始消費消息,方法運行在 AsyncMessageProcessingConsumer 線程實例中,調用棧如下: 技術分享
AbstractMessageListenerContainer doInvokeListener 方法直接發起對 onMessage 方法的調用,代碼如下: 技術分享
對於 onMessage 消息處理過程中拋出的異常, wrapToListenerExecutionFailedExceptionIfNeeded 方法將所有非 ListenerExecutionFailedException 異常包裝為 ListenerExecutionFailedException 異常(通過 instanceof 判斷,因此 ListenerExecutionFailedException 子類不會被包裝),並重新拋出。 調用棧一直退出,直到 SimpleMessageListenerContainer(AbstractMessageListenerContainer).executeListener(Channel, Message) line: 729
技術分享 handleListenerException 方法最終調用 invokeErrorHandler 方法,通過屬性 ErrorHandler errorHandlerhandleError 方法正式處理異常。 技術分享 默認的 errorHandle 就是 ConditionalRejectingErrorHandler,其 handleError 邏輯很簡單:如果拋出的異常,其原因鏈中不包含 AmqpRejectAndDontRequeueException,同時 ConditionalRejectingErrorHandler 內部屬性 FatalExceptionStrategy exceptionStrategy 的 isFatal 方法返回 true(可以看成是無法恢復的嚴重異常),則將異常包裝為 AmqpRejectAndDontRequeueException 重新拋出。 技術分享 這是判斷原因鏈中是否包含 AmqpRejectAndDontRequeueException 的代碼: 技術分享 默認情況下, exceptionStrategy DefaultExceptionStrategy 的實例 技術分享 只有 ListenerExecutionFailedException 異常及其子類才可能是 fatal,但是對於本次調用棧,wrapToListenerExecutionFailedExceptionIfNeeded 方法保證了拋出的異常是 ListenerExecutionFailedException isCauseFatal 方法定義了一些嚴重異常,很顯然,這些異常都是些無論重試多少次都會出錯的異常,因此應該被包裝為 AmqpRejectAndDontRequeueException 異常。 DefaultExceptionStrategy 預留了 isUserCauseFatal 方法給用戶擴展。
異常繼續往外拋,到 SimpleMessageListenerContainer.doReceiveAndExecute(BlockingQueueConsumer) line: 1260,會調用 BlockingQueueConsumer rollbackOnExceptionIfNecessary 方法。該方法先判斷是否要向 RabbitMQ 確認消息,如果需要確認(意味著要調用 channel 的 basicReject 方法),再根據異常的原因鏈中是否存在 AmqpRejectAndDontRequeueException 異常決定如何設置 basicReject 方法的 requeue 參數。

## 示例代碼分析

從流程分析中可知,onMessage 方法如果拋出異常,一般情況下會導致消息被 reject,同時重新入隊。這個默認設置比較安全。 如果我們不想要消息重新入隊呢?最簡單的方法就是拋出 AmqpRejectAndDontRequeueException,就像示例代碼中被註釋掉的第33行代碼那樣。但是這導致業務消費邏輯與框架實現綁定過深,因此,我們采用重載 DefaultExceptionStrategy isUserCauseFatal 方法來決定不同的業務異常要不要讓 reject 的消息重新入隊,正如示例代碼第42行所示。當異常類型是 UserDefineException 時,消息被 reject 同時不會重入隊列。

Spring AMQP 源碼分析 05 - 異常處理