【轉】Spring AMQP 原始碼分析 05
### 準備
## 目標
瞭解 Spring AMQP Message Listener 如何處理異常
## 前置知識
《Spring AMQP 原始碼分析 04 - MessageListener》
## 相關資源
原始碼版本:Spring AMQP 1.7.3.RELEASE
## 測試程式碼
gordon.study.rabbitmq.springamqp.AsyncConsumerWithErrorHandler.java
### 分析
## 訊息消費異常處理流程
根據上一遍文章的分析,示例程式碼 AsyncConsumerWithErrorHandler 第26行開始消費訊息,方法執行在 AsyncMessageProcessingConsumer 執行緒例項中,呼叫棧如下:
由 AbstractMessageListenerContainer 的 doInvokeListener 方法直接發起對 onMessage 方法的呼叫,程式碼如下:
對於 onMessage 訊息處理過程中丟擲的異常, wrapToListenerExecutionFailedExceptionIfNeeded 方法將所有非 ListenerExecutionFailedException 異常包裝為 ListenerExecutionFailedException 異常(通過 instanceof 判斷,因此 ListenerExecutionFailedException 子類不會被包裝),並重新丟擲。
呼叫棧一直退出,直到 SimpleMessageListenerContainer(AbstractMessageListenerContainer).executeListener(Channel, Message) line: 729
handleListenerException 方法最終呼叫 invokeErrorHandler 方法,通過屬性 ErrorHandler errorHandler 的 handleError 方法正式處理異常。
預設的 errorHandle 就是 ConditionalRejectingErrorHandler,其 handleError 邏輯很簡單:如果丟擲的異常,其原因鏈中不包含 AmqpRejectAndDontRequeueException,同時 ConditionalRejectingErrorHandler 內部屬性 FatalExceptionStrategy exceptionStrategy 的 isFatal 方法返回 true(可以看成是無法恢復的嚴重異常),則將異常包裝為 AmqpRejectAndDontRequeueException 重新丟擲。
這是判斷原因鏈中是否包含 AmqpRejectAndDontRequeueException 的程式碼:
預設情況下, exceptionStrategy 是 DefaultExceptionStrategy 的例項:
只有 ListenerExecutionFailedException 異常及其子類才可能是 fatal,但是對於本次呼叫棧,wrapToListenerExecutionFailedExceptionIfNeeded 方法保證了丟擲的異常是 ListenerExecutionFailedException。
isCauseFatal 方法定義了一些嚴重異常,很顯然,這些異常都是些無論重試多少次都會出錯的異常,因此應該被包裝為 AmqpRejectAndDontRequeueException 異常。
DefaultExceptionStrategy 預留了 isUserCauseFatal 方法給使用者擴充套件。
異常繼續往外拋,到 SimpleMessageListenerContainer.doReceiveAndExecute(BlockingQueueConsumer) line: 1260,會呼叫 BlockingQueueConsumer 的 rollbackOnExceptionIfNecessary 方法。該方法先判斷是否要向 RabbitMQ 確認訊息,如果需要確認(意味著要呼叫 channel 的 basicReject 方法),再根據異常的原因鏈中是否存在 AmqpRejectAndDontRequeueException 異常決定如何設定 basicReject 方法的 requeue 引數。
## 示例程式碼分析
從流程分析中可知,onMessage 方法如果丟擲異常,一般情況下會導致訊息被 reject,同時重新入隊。這個預設設定比較安全。
如果我們不想要訊息重新入隊呢?最簡單的方法就是丟擲 AmqpRejectAndDontRequeueException,就像示例程式碼中被註釋掉的第33行程式碼那樣。但是這導致業務消費邏輯與框架實現繫結過深,因此,我們採用過載 DefaultExceptionStrategy 的 isUserCauseFatal 方法來決定不同的業務異常要不要讓 reject 的訊息重新入隊,正如示例程式碼第42行所示。當異常型別是 UserDefineException 時,訊息被 reject 同時不會重入佇列。