Spring AMQP 源碼分析 05 - 異常處理
阿新 • • 發佈:2017-07-18
.html strong necessary hand invoke dex eas sam sta
gordon.study.rabbitmq.springamqp.AsyncConsumerWithErrorHandler.java
由 AbstractMessageListenerContainer 的 doInvokeListener 方法直接發起對 onMessage 方法的調用,代碼如下:
對於 onMessage 消息處理過程中拋出的異常, wrapToListenerExecutionFailedExceptionIfNeeded 方法將所有非 ListenerExecutionFailedException 異常包裝為 ListenerExecutionFailedException 異常(通過 instanceof 判斷,因此 ListenerExecutionFailedException 子類不會被包裝),並重新拋出。
調用棧一直退出,直到 SimpleMessageListenerContainer(AbstractMessageListenerContainer).executeListener(Channel, Message) line: 729
handleListenerException 方法最終調用 invokeErrorHandler 方法,通過屬性 ErrorHandler errorHandler 的 handleError 方法正式處理異常。
默認的 errorHandle 就是 ConditionalRejectingErrorHandler,其 handleError 邏輯很簡單:如果拋出的異常,其原因鏈中不包含 AmqpRejectAndDontRequeueException,同時 ConditionalRejectingErrorHandler 內部屬性 FatalExceptionStrategy exceptionStrategy 的 isFatal 方法返回 true(可以看成是無法恢復的嚴重異常),則將異常包裝為 AmqpRejectAndDontRequeueException 重新拋出。
這是判斷原因鏈中是否包含 AmqpRejectAndDontRequeueException 的代碼:
默認情況下, exceptionStrategy 是 DefaultExceptionStrategy 的實例:
只有 ListenerExecutionFailedException 異常及其子類才可能是 fatal,但是對於本次調用棧,wrapToListenerExecutionFailedExceptionIfNeeded 方法保證了拋出的異常是 ListenerExecutionFailedException。
isCauseFatal 方法定義了一些嚴重異常,很顯然,這些異常都是些無論重試多少次都會出錯的異常,因此應該被包裝為 AmqpRejectAndDontRequeueException 異常。
DefaultExceptionStrategy 預留了 isUserCauseFatal 方法給用戶擴展。
異常繼續往外拋,到 SimpleMessageListenerContainer.doReceiveAndExecute(BlockingQueueConsumer) line: 1260,會調用 BlockingQueueConsumer 的 rollbackOnExceptionIfNecessary 方法。該方法先判斷是否要向 RabbitMQ 確認消息,如果需要確認(意味著要調用 channel 的 basicReject 方法),再根據異常的原因鏈中是否存在 AmqpRejectAndDontRequeueException 異常決定如何設置 basicReject 方法的 requeue 參數。
### 準備
## 目標
了解 Spring AMQP Message Listener 如何處理異常## 前置知識
《Spring AMQP 源碼分析 04 - MessageListener》## 相關資源
Offical doc:<http://docs.spring.io/spring-amqp/docs/1.7.3.RELEASE/reference/html/_reference.html#exception-handling> Sample code:<https://github.com/gordonklg/study>,rabbitmq module 源碼版本:Spring AMQP 1.7.3.RELEASE## 測試代碼
### 分析
## 消息消費異常處理流程
根據上一遍文章的分析,示例代碼 AsyncConsumerWithErrorHandler 第26行開始消費消息,方法運行在 AsyncMessageProcessingConsumer 線程實例中,調用棧如下:由 AbstractMessageListenerContainer 的 doInvokeListener 方法直接發起對 onMessage 方法的調用,代碼如下:
異常繼續往外拋,到 SimpleMessageListenerContainer.doReceiveAndExecute(BlockingQueueConsumer) line: 1260,會調用 BlockingQueueConsumer 的 rollbackOnExceptionIfNecessary 方法。該方法先判斷是否要向 RabbitMQ 確認消息,如果需要確認(意味著要調用 channel 的 basicReject 方法),再根據異常的原因鏈中是否存在 AmqpRejectAndDontRequeueException 異常決定如何設置 basicReject 方法的 requeue 參數。
## 示例代碼分析
從流程分析中可知,onMessage 方法如果拋出異常,一般情況下會導致消息被 reject,同時重新入隊。這個默認設置比較安全。 如果我們不想要消息重新入隊呢?最簡單的方法就是拋出 AmqpRejectAndDontRequeueException,就像示例代碼中被註釋掉的第33行代碼那樣。但是這導致業務消費邏輯與框架實現綁定過深,因此,我們采用重載 DefaultExceptionStrategy 的 isUserCauseFatal 方法來決定不同的業務異常要不要讓 reject 的消息重新入隊,正如示例代碼第42行所示。當異常類型是 UserDefineException 時,消息被 reject 同時不會重入隊列。Spring AMQP 源碼分析 05 - 異常處理