1. 程式人生 > >【轉】Spring AMQP 原始碼分析 05

【轉】Spring AMQP 原始碼分析 05

### 準備

## 目標

瞭解 Spring AMQP Message Listener 如何處理異常

## 前置知識

《Spring AMQP 原始碼分析 04 - MessageListener》

## 相關資源

原始碼版本:Spring AMQP 1.7.3.RELEASE

## 測試程式碼

gordon.study.rabbitmq.springamqp.AsyncConsumerWithErrorHandler.java

### 分析

## 訊息消費異常處理流程

根據上一遍文章的分析,示例程式碼 AsyncConsumerWithErrorHandler 第26行開始消費訊息,方法執行在 AsyncMessageProcessingConsumer 執行緒例項中,呼叫棧如下:

由 AbstractMessageListenerContainer 的 doInvokeListener 方法直接發起對 onMessage 方法的呼叫,程式碼如下:

對於 onMessage 訊息處理過程中丟擲的異常, wrapToListenerExecutionFailedExceptionIfNeeded 方法將所有非 ListenerExecutionFailedException 異常包裝為 ListenerExecutionFailedException 異常(通過 instanceof 判斷,因此 ListenerExecutionFailedException 子類不會被包裝),並重新丟擲。

呼叫棧一直退出,直到 SimpleMessageListenerContainer(AbstractMessageListenerContainer).executeListener(Channel, Message) line: 729   

handleListenerException 方法最終呼叫 invokeErrorHandler 方法,通過屬性 ErrorHandler errorHandler 的 handleError 方法正式處理異常。

預設的 errorHandle 就是 ConditionalRejectingErrorHandler,其 handleError 邏輯很簡單:如果丟擲的異常,其原因鏈中不包含 AmqpRejectAndDontRequeueException,同時 ConditionalRejectingErrorHandler 內部屬性 FatalExceptionStrategy exceptionStrategy 的 isFatal 方法返回 true(可以看成是無法恢復的嚴重異常),則將異常包裝為 AmqpRejectAndDontRequeueException 重新丟擲。

這是判斷原因鏈中是否包含 AmqpRejectAndDontRequeueException 的程式碼:

 

預設情況下, exceptionStrategy 是 DefaultExceptionStrategy 的例項:

只有 ListenerExecutionFailedException 異常及其子類才可能是 fatal,但是對於本次呼叫棧,wrapToListenerExecutionFailedExceptionIfNeeded 方法保證了丟擲的異常是 ListenerExecutionFailedException。

isCauseFatal 方法定義了一些嚴重異常,很顯然,這些異常都是些無論重試多少次都會出錯的異常,因此應該被包裝為 AmqpRejectAndDontRequeueException  異常。

DefaultExceptionStrategy 預留了 isUserCauseFatal 方法給使用者擴充套件。

異常繼續往外拋,到 SimpleMessageListenerContainer.doReceiveAndExecute(BlockingQueueConsumer) line: 1260,會呼叫 BlockingQueueConsumer 的 rollbackOnExceptionIfNecessary 方法。該方法先判斷是否要向 RabbitMQ 確認訊息,如果需要確認(意味著要呼叫 channel 的 basicReject 方法),再根據異常的原因鏈中是否存在 AmqpRejectAndDontRequeueException 異常決定如何設定  basicReject 方法的 requeue 引數。

## 示例程式碼分析

從流程分析中可知,onMessage 方法如果丟擲異常,一般情況下會導致訊息被 reject,同時重新入隊。這個預設設定比較安全。

如果我們不想要訊息重新入隊呢?最簡單的方法就是丟擲 AmqpRejectAndDontRequeueException,就像示例程式碼中被註釋掉的第33行程式碼那樣。但是這導致業務消費邏輯與框架實現繫結過深,因此,我們採用過載 DefaultExceptionStrategy 的 isUserCauseFatal 方法來決定不同的業務異常要不要讓 reject 的訊息重新入隊,正如示例程式碼第42行所示。當異常型別是 UserDefineException 時,訊息被 reject 同時不會重入佇列。