1. 程式人生 > >利用Rocketmq4.2版來實現分散式事務

利用Rocketmq4.2版來實現分散式事務

花了點時間學了RocketMQ,下面是本人的一點點心得,如果覺的寫的好就點個贊,但如果你要借鑑話,我還是勸你看下面參考資料裡的視訊(作者為阿里牛人),雖然他分享的視訊是為了推銷阿里雲的DRDS、ONS(RocketMQ阿里版),只是講了個大概,沒有細說,但是指明一個大的方向,讓人非常的受益。

借用阿里牛人視訊中的ppt:來說明單機事務拆分成分散式事務分解思想(具體自己看視訊)。




圖中左邊的事務3與事務5為遠端呼叫的網路事務。



上圖為訊息傳送者端通過訊息叢集完整的事務流程,ONS訊息叢集這裡用rockMQ叢集代替。

RocketMQ在 V3.1.5 開始,使用 資料庫 實現【事務狀態】的儲存。但未開源,因為rocketmq閹割了對生產者的LocalTransactionState狀態的回查機制,所以增加了生產端事務的複雜度。本來由RocketMQ中介軟體通過回查機制來讓生產者知道事務資訊傳送成功,現在要生產者自己來確認。(後面有詳情講解事務訊息同時成功或失敗的情況)

如下是來自官方文件的說明,可能是閹割了對生產者的LocalTransactionState狀態的回查機制的一部分原因:
RocketMQ 這種實現事務方式,沒有通過 KV 儲存做,而是通過 Offset 方式,存在一個顯著缺陷,即通過 Offset 更改資料,會令系統的髒頁過多,需要特別關注,

我們先看一下訊息傳送者端成功執行事務併發送確認訊息後通過RocketMQ的控制檯用key值檢視成功執行事務中傳遞的訊息:在RocketMQ3.2.6版本中是一條訊息,在RocketMQ4.2版本中發現有兩條訊息。雖然這條訊息是mq叢集在成功接收生產者提交發送的COMMIT_MESSGE訊息(通過oneway方式)後在訊息叢集本地產生,沒有增加生產者傳送訊息的網路延時時間,但這種實現方式也是有代價的,事務訊息增加一倍(雖然是在MQ叢集本機拷貝,但增加了叢集的IO壓力),官方這樣改應該是益大於弊吧。

下面是按key值查詢事務成功提交COMMIT_MESSGE訊息後的返回資訊:

QueryResult 
[indexLastUpdateTimestamp=1516002830440,
	messageList=[
1.		MessageExt [queueId=1, storeSize=291, queueOffset=0, sysFlag=4, bornTimestamp=1516002831147, bornHost=/192.168.88.1:6313, storeTimestamp=1516002830323, storeHost=/192.168.88.133:10911, msgId=C0A8588500002A9F0000000000000246, commitLogOffset
=582, bodyCRC=1229495611, reconsumeTimes=0, preparedTransactionOffset=0, toString()= Message [topic=pay, flag=0, properties= {KEYS=5cf5dd03-5811-4b7f-b97c-186598e6d08b, TRAN_MSG=true, UNIQ_KEY=C0A8080B2080085EDE7B4B824F2A0000, PGROUP=transaction-pay, TAGS=tag}, body=67]], 2. MessageExt [queueId=1, storeSize=291, queueOffset=0, sysFlag=8, bornTimestamp=1516002831147, bornHost=/192.168.88.1:6313, storeTimestamp=1516002830440, storeHost=/192.168.88.133:10911, msgId=C0A8588500002A9F0000000000000369, commitLogOffset=873, bodyCRC=1229495611, reconsumeTimes=0, preparedTransactionOffset=582, toString()= Message [topic=pay, flag=0, properties= {KEYS=5cf5dd03-5811-4b7f-b97c-186598e6d08b, TRAN_MSG=true, UNIQ_KEY=C0A8080B2080085EDE7B4B824F2A0000, PGROUP=transaction-pay, TAGS=tag}, body=67]] ] ]

共返回兩條訊息:兩條訊息中大部分資料是一樣的,但sysFlag、storeTimestamp、msgId、commitLogOffsetpreparedTransactionOffset欄位是不一樣的:其中第1條為prepared傳送的訊息,第2條只有在提交COMMIT_MESSGE訊息成功後產生。

注意sysFlag、preparedTransactionOffset欄位與prepared訊息的區別,當提交COMMIT_MESSGE訊息成功後,推測MQ叢集做了如下動作:1. 讀取prepared訊息,修改sysFlag、preparedTransactionOffset值,2. 在存入commitlog日誌檔案,設定consumerqueue序列;因為當作一條新的訊息處理,所以toreTimestamp、msgId、commitLogOffset欄位自然也就變了。所以按照發送的prepared訊息的返回結果顯示的msgId檢視sysFlag狀態只是prepared訊息的sysFlag狀態,RocketMQ4.2版本的話要用key值去查詢,才能檢視事務提交成功的訊息標誌sysFlag=8

下面是按key值查詢事務失敗提交ROLLBACK_MESSAGE訊息後的返回資訊:

QueryResult 
[indexLastUpdateTimestamp=1516002830440,
	messageList=[
1.		MessageExt [queueId=1, storeSize=291, queueOffset=0, sysFlag=4, bornTimestamp=1516002831147, bornHost=/192.168.88.1:6313, storeTimestamp=1516002830323, storeHost=/192.168.88.133:10911, msgId=C0A8588500002A9F0000000000000246, commitLogOffset=582, bodyCRC=1229495611, reconsumeTimes=0, preparedTransactionOffset=0, toString()=
			Message 
			[topic=pay, flag=0, properties=
				{KEYS=5cf5dd03-5811-4b7f-b97c-186598e6d08b, TRAN_MSG=true, UNIQ_KEY=C0A8080B2080085EDE7B4B824F2A0000, PGROUP=transaction-pay, TAGS=tag}, 
			body=67]]
	]
]

如果傳送ROLLBACK_MESSAGE訊息,在控制檯只會查到一條prepared訊息,MQ叢集對prepared訊息不作任何處理。



上圖為訊息接收者端通過訊息叢集完整的事務流程。(後面有詳情講解訊息超時與重複的問題)

-------------------------------------------------------------------------------------------------------------

分散式事務流程看下面的流程圖:


分發布式事務通過訊息中介軟體解耦為相互獨立的(本地事務+非同步)而本地事務間訊息傳遞統一由訊息中介軟體負責

一、消費者叢集事務

1. 在執行本地事務時要注意:本地事務要儘量保證冪等性(如s*s = s,也就是事務不管執行多少次結果都一樣),如不能保證冪等性,要在業務上去對訊息消費的去重(在消費者叢集新增去重表,在事務開始前校驗此訊息是否重複,在事務提交前插入相關資料。去重表具體參見生產者叢集事務的回查表,可以省略count欄位)。因為RocketMQ不保證資訊不重複,雖然重複機率很小。

2. 對於消費者叢集執行本地事務失敗的情況,阿里提供給我們的解決方法是:人工解決。按照事務的流程,因為某種原因事務失敗,那麼需要回滾整個流程。如果訊息系統要實現這個回滾流程的話,系統複雜度將大大提升,且很容易出現Bug,估計出現Bug的概率會比消費失敗的概率大很多。這也是RocketMQ目前暫時沒有解決這個問題的原因,在設計實現訊息系統時,我們需要衡量是否值得花這麼大的代價來解決這樣一個出現概率非常小的問題,這也是大家在解決疑難問題時需要多多思考的地方。

3. 對於阿里來說,一個分散式事務可以能涉及的是幾百個子系統,對於他們來說處理分散式事務回滾代價太大;但對於的分散式事務只涉及幾個子系統,回滾不太複雜的情況下,我想是否可以對回滾分散式事務可以用一個反向的分散式事務解決,代價就是在消費端本地事務處理失敗,回滾本地事務後傳送一條分散式事務失敗訊息給生產者。而生產者需要額外為分散式事務設計相對應的回滾分散式事務的介面。視訊中的阿里牛人說阿里分散式事務中消費失敗的機率很小,在他印象中一兩年才出現一次,這樣的機率是否值得我們去做一個回滾分散式事務的設計呢?

-------------------------------------------------------------------------------------

二、生產者叢集的事務:

因為rocketmq閹割了對LocalTransactionState狀態的回查機制,所以生產者必須確認rocketMQ叢集是否收到LocalTransactionState狀態;

這裡只需要考慮本地事務執行成功後的情況(因為本地事務失敗不管確認訊息傳送成功與失敗MQ叢集都不會再發送訊息到消費者):

1. 本地事務成功後宕機,確認訊息沒有發出,分散式事務只執行一半。
2. 確認訊息COMMIT_MESSGE發出,但因網路不可達RocketMQ叢集沒收到。
3. 確認訊息COMMIT_MESSGE發出,RocketMQ叢集收到COMMIT_MESSGE訊息,但rocketmq取消了回查機制,
生產者還是不知道COMMIT_MESSGE發出是否成功

上面三種情況的本質是一樣的,就是生產者本地事務成功後,COMMIT_MESSGE訊息是否送達rocketmq叢集;所以可以看做同一種情況.

解決方案流程圖:


官方在rocketmq叢集上使用了資料庫來實現回查機制,那我也學官方用資料庫來實現回查機制,只是我把回查機制放在了生產者叢集上。

一、在執行本地事務commit前向回查表插入訊息的KEY值。

二、在生產者叢集上設定一個定時任務(根據自身分散式事務流程執行的時間設定)。

      1. 從回查表獲取CONFIRM為0的記錄列表,從記錄列表中獲取COUNT為3的記錄,當count列達到指定閥值(假定是3)時:
此時記錄的COUNT為3,如果CONFIRM還是為0,那麼說明對此事務的回查次數為3,但RocketMQ叢集還未收到COMMIT_MESSAGE訊息,說明發送COMMIT_MESSAGE訊息失敗,但本地事務已經執行成功,那麼必須要重發與此條記錄中KEY值相對應的Perpared訊息的確認訊息。根據KEY值向MQ叢集查詢訊息,根據獲取的訊息重新用同步的方式傳送此條訊息到MQ叢集,並更新此記錄的CONFIRM為1,COUNT+1
      2. 根據第1步獲取的記錄列表,取出CONFIRM為0且COUNT小於3的記錄,根據KEY值向MQ叢集查詢訊息。
      3. 根據第2步獲取的訊息判斷是否是sysFlag為8的訊息;如果,更新回查表對應KEY記錄的CONFIRM為1,COUNT為count+1,如果不是,更新回查表對應KEY記錄的COUNT為count+1。

參考資料:

http://v.youku.com/v_show/id_XODY4ODE3OTY0.html?from=s1.8-1-1.2          

強烈建議看他的視訊:總共有十個視訊,都是關於分散式事務與RocketMQ。

https://www.jianshu.com/p/453c6e7ff81c 根據上面視訊而寫的。

https://segmentfault.com/a/1190000009512510 事務原始碼