RocketMQ原始碼分析之RocketMQ事務訊息實現原理中篇----事
上節已經梳理了RocketMQ傳送事務訊息的流程(基於二階段提交),本節將繼續深入學習事務狀態訊息回查,我們知道,第一次提交到訊息伺服器時訊息的主題被替換為RMQ_SYS_TRANS_HALF_TOPIC,本地事務執行完後如果返回本地事務狀態為UN_KNOW時,第二次提交到伺服器時將不會做任何操作,也就是說此時訊息還存在與RMQ_SYS_TRANS_HALF_TOPIC主題中,並不能被訊息消費者消費,那這些訊息最終如何被提交或回滾呢?
原來RocketMQ使用TransactionalMessageCheckService執行緒定時去檢測
RMQ_SYS_TRANS_HALF_TOPIC主題中的訊息,回查訊息的事務狀態。TransactionalMessageCheckService的檢測頻率預設1分鐘,可通過在broker.conf檔案中設定transactionCheckInterval的值來改變預設值,單位為毫秒。
接下來將深入分析該執行緒的實現原理,從而解開事務訊息回查機制。
程式碼@1:從broker配置檔案中獲取transactionTimeOut引數值。
程式碼@2:從broker配置檔案中獲取transactionCheckMax引數值,表示事務的最大檢測次數,如果超過檢測次數,訊息會預設為丟棄,即回滾訊息。
接下來重點分析TransactionalMessageService#check的實現邏輯:
step1:根據主題名稱,獲取該主題下所有的訊息佇列。
Step2:迴圈遍歷訊息佇列,從單個訊息消費佇列去獲取訊息。
Step3:獲取對應的操作佇列,其主題為:RMQ_SYS_TRANS_OP_HALF_TOPIC,然後獲取操作佇列的消費進度、待操作的消費佇列的消費進度,如果任意一小於0,忽略該訊息佇列,繼續處理下一個佇列。
Step4:呼叫fillOpRemoveMap主題填充removeMap、doneOpOffset資料結構,這裡主要的目的是避免重複呼叫事務回查介面,這裡說一下RMQ_SYS_TRANS_HALF_TOPIC、RMQ_SYS_TRANS_OP_HALF_TOPIC這兩個主題的作用。
RMQ_SYS_TRANS_HALF_TOPIC:prepare訊息的主題,事務訊息首先先進入到該主題。
RMQ_SYS_TRANS_OP_HALF_TOPIC:當訊息伺服器收到事務訊息的提交或回滾請求後,會將訊息儲存在該主題下。
本段程式碼比較長,卻是事務狀態回查的重點實現。
程式碼@1:先解釋幾個區域性變數的含義。
- getMessageNullCount :獲取空訊息的次數
- newOffset :當前處理RMQ_SYS_TRANS_HALF_TOPIC#queueId的最新進度
- i:當前處理訊息的佇列偏移量,其主題依然為RMQ_SYS_TRANS_HALF_TOPIC。
程式碼@2:這段程式碼應該不陌生,這是RocketMQ處理任務的一個通用處理邏輯,就是一個任務處理,可以限制每次最多處理的時間,RocketMQ為待檢測主題RMQ_SYS_TRANS_HALF_TOPIC的每個佇列,做事務狀態回查,一次最多不超過60S,目前該值不可配置。
程式碼@3:如果removeMap中包含當前處理的訊息,則繼續下一條,removeMap中的值是通過Step3中填充的,具體實現邏輯是從RMQ_SYS_TRANS_OP_HALF_TOPIC主題中拉取32條,如果拉取的訊息佇列偏移量大於等於RMQ_SYS_TRANS_HALF_TOPIC#queueId當前的處理進度時,會新增到removeMap中,表示已處理過。
程式碼@4:根據訊息佇列偏移量i從消費佇列中獲取訊息。
程式碼@5:如果訊息為空,則根據允許重複次數進行操作,預設重試一次,目前不可配置。其具體實現為:
- 如果超過重試次數,直接跳出,結束該訊息佇列的事務狀態回查。
- 如果是由於沒有新的訊息而返回為空(拉取狀態為:PullStatus.NO_NEW_MSG),則結束該訊息佇列的事務狀態回查。
- 1.其他原因,則將偏移量i設定為: getResult.getPullResult().getNextBeginOffset(),重新拉取。
程式碼@6:判斷該訊息是否需要discard(吞沒,丟棄,不處理)、或skip(跳過),其依據如下:
- needDiscard 依據:如果該訊息回查的次數超過允許的最大回查次數,則該訊息將被丟棄,即事務訊息提交失敗,不能被消費者消費,其做法,主要是每回查一次,在訊息屬性TRANSACTION_CHECK_TIMES中增1,預設最大回查次數為5次。
- needSkip依據:如果事務訊息超過檔案的過期時間,預設72小時(具體請檢視RocketMQ過期檔案相關內容),則跳過該訊息。
程式碼@7:處理事務超時相關概念,先解釋幾個區域性變數:、
- valueOfCurrentMinusBorn :該訊息已儲存的時間,等於系統當前時間減去訊息儲存的時間戳。
- checkImmunityTime :立即檢測事務訊息的時間。
- transactionTimeout:事務訊息的超時時間,其設計的意義是,應用程式在傳送事務訊息後,事務不會馬上提交,該時間就是假設事務訊息傳送成功後,應用程式事務提交的時間,在這段時間內,RocketMQ任務事務未提交,故不應該在這個時間段嚮應用程式傳送回查請求。
- 程式碼@8:如果訊息指定了事務訊息過期時間屬性(PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS),如果當前時間已超過該值。
程式碼@9:如果當前時間還未過(應用程式事務結束時間),則跳出本次回查處理的,等下一次再試。
程式碼@10:判斷是否需要傳送事務回查訊息,具體邏輯:
- 如果從操作佇列(RMQ_SYS_TRANS_OP_HALF_TOPIC)中沒有已處理訊息並且已經超過(應用程式事務結束時間),引數transactionTimeOut值。
- 如果操作佇列不為空,並且最後一天條訊息的儲存時間已經超過transactionTimeOut值。
程式碼@11:如果需要傳送事務狀態回查訊息,則先將訊息再次傳送到RMQ_SYS_TRANS_HALF_TOPIC主題中,傳送成功則返回true,否則返回false,這裡還有一個實現關鍵點:
如果傳送成功,會將該訊息的queueOffset、commitLogOffset設定為重新存入的偏移量,為什麼需要這樣呢,答案在listener.resolveHalfMsg(msgExt)中。
傳送具體的事務回查機制,這裡用一個執行緒池來非同步傳送回查訊息,為了回查進度儲存的簡化,這裡只要傳送了回查訊息,當前回查進度會向前推動,如果回查失敗,上一步驟新增的訊息將可以再次傳送回查訊息,那如果回查訊息傳送成功,那會不會下一次又重複傳送回查訊息呢?這個可以根據OP佇列中的訊息來判斷是否重複,如果回查訊息傳送成功並且訊息伺服器完成提交或回滾操作,這條訊息會發送到OP佇列中,然後fillOpRemoveMap根據處理進度獲取一批已處理的訊息,來與訊息判斷是否重複,由於fillopRemoveMap一次只拉32條訊息,那又如何保證一定能拉取到與當前訊息的處理記錄呢?其實就是通過程式碼@10來實現的,如果此批訊息最後一條未超過事務延遲訊息,則繼續拉取更多訊息進行判斷(@12)和(@14),op佇列也會隨著回查進度的推進而推進。
程式碼@12:如果無法判斷是否傳送回查訊息,則載入更多的已處理訊息進行刷選。
程式碼@13:儲存(Prepare)訊息佇列的回查進度。
程式碼@14:儲存處理佇列(op)的進度。
上述講解了TransactionalMessageCheckService回查定時執行緒的傳送回查訊息的整體流程與實現細節,接下來重點分析一下上述步驟@11,通過非同步方式傳送訊息回查的實現過程。
程式碼@1:首先構建回查事務狀態請求訊息,請求核心引數包括:訊息offsetId、訊息ID(索引)、訊息事務ID、事務訊息佇列中的偏移量(RMQ_SYS_TRANS_HALF_TOPIC)。
程式碼@2:恢復原訊息的主題、佇列,並設定storeSize為0。
程式碼@3:獲取生產者組名稱。
程式碼@4:根據生產者組獲取任意一個生產者,通過與其連線傳送事務回查訊息,回查訊息的請求者為【Broker伺服器】,接收者為(client,具體為訊息生產者)。
其處理類為:org.apache.rocketmq.client.impl.ClientRemotingProcessor#processRequest,其詳細邏輯實現方法為:
程式碼@1:最終呼叫生產者的checkTransactionState方法。
上述程式碼雖多,其實實現思路非常清晰,先使用一個匿名類( Runnable )構建一個執行任務,然後提交到checkExecutor執行緒池中執行,這與我第一篇文章的猜測是吻合的,那重點分析一下該任務的允許邏輯,對應在run方法中。
程式碼@1:獲取訊息傳送者的TransactionListener。
程式碼@2:執行TransactionListener#checkLocalTransaction,檢測本地事務狀態,也就是應用程式需要實現TransactionListener#checkLocalTransaction,告知RocketMQ該事務的事務狀態,然後返回COMMIT_MESSAGE、ROLLBACK_MESSAGE、UNKNOW中的一個,然後向Broker傳送END_TRANSACTION命令即可,
程式碼@3:傳送END_TRANSACTION到Broker,其具體實現,已經在 https://blog.csdn.net/prestigeding/article/details/81263833 中詳細講解過,在此不重複分析。
到這裡,事務訊息狀態回查流程就講解完畢,接下來以一張流程圖結束本篇的講解。
下一篇,將重點分析Broker在收到事務狀態為COMMIT_MESSAGE、ROLLBACK_MESSAGE時如何提交、回滾事務。
作者:丁威