1. 程式人生 > >基於Dubbo&RocketMQ實現SOA與分散式事務

基於Dubbo&RocketMQ實現SOA與分散式事務




基於Dubbo&RocketMQ實現SOA與分散式事務

專案介紹

專案定位

  1. 首先本專案是基於前後端分離的架構,後端僅提供RESTful介面,前端使用的是Vue.js。
    本專案的單機版本見 Github
    同時有對應的前端專案,因為主力在後端,所以可能質量一般,見 Github
  2. 本篇文章介紹的是經過SOA服務化拆分後的版本,基本功能不變,按業務模組進行了拆分。
    之前我也寫過介紹有關SOA服務化拆分的專案示例的文章 SOA服務化拆分,但是沒有實現資料庫的拆分,這也會牽扯到一個分散式事務的問題,它們是分散式環境下不可避免的問題,前一篇文章回避了這一問題,而本篇文章將介紹資料庫的分庫和基於RocketMQ實現分散式事務的關鍵邏輯。
  3. 本專案的Github 傳送門 。歡迎各位的star和fork,這也是驅使我分享更多技術文章的動力。

功能

使用者模組

  • 獲取圖片驗證碼
  • 登入:解決重複登入問題
  • 註冊
  • 分頁查詢使用者資訊
  • 修改使用者資訊
  • 重置密碼

站內信模組

  • 一對一發送站內信
  • 管理員廣播
  • 讀取站內信(未讀和已讀)
  • 一對多傳送站內信
  • 分頁查詢站內信

郵件模組

  • 單獨傳送郵件
  • 群發郵件
  • Thymeleaf郵件模板

產品模組

  • 獲取所有產品類別
  • 分頁獲取某一類別的所有產品
  • 獲取某一產品的詳細資訊
  • 新增產品型別
  • 新增某一產品

訂單模組

  • 下單,購買某一產品
  • 瀏覽歷史訂單
  • 取消訂單

新聞模組

  • 讀取最新新聞
  • 新增新聞

支付模組

  • 使用者充值
  • 訂單付款

Cos雲端儲存(檔案上傳下載)

  1. 準備:獲取appId、secretId、secretKey以及在官網上設定CORS
  2. JS部分:進行檔案上傳/下載
  3. Java部分:搭建鑑權伺服器,提供token
    只需要JS引入cos的包,Java不需要

涉及技術

  • SpringBoot+多環境配置(dev,proc,test)
  • Dubbo
  • SpringMVC
  • Spring
  • MyBaits
  • MyBatis Generator
  • MyBatis PageHelper
  • Druid
  • Lombok
  • JWT
  • Spring Security
  • JavaMail
  • Thymeleaf
  • HttpClient
  • Spring Scheduler
  • Hibernate Validator
  • Redis Cluster
  • MySQL主從複製,讀寫分離,按業務分庫
  • Spring Async
  • Spring Cache
  • Swagger
  • Spring Test
  • Spring Actuator
  • Logback+Slf4j多環境日誌
  • i18n
  • Maven Multi-Module

部署

專案使用到了20個虛擬機器以及本地的應用伺服器(4個),如果全部部署到分散式環境中大概需要24臺機器。
vms

Redis(所有app共用,6臺)

101,102,107,108,109,110:
1)將每個節點下aof、rdb、nodes.conf本地備份檔案刪除;
放在root目錄下
rm -rf /root/dump.rdb
rm -rf /root/nodes.conf
/usr/local/bin/redis-server /opt/redis/redis.conf

2)101節點:
/usr/local/bin/redis-trib.rb create –replicas 1 192.168.1.101:6379 192.168.1.102:6379 192.168.1.107:6379 192.168.1.108:6379 192.168.1.109:6379 192.168.1.110:6379

MySQL(每個服務三臺)

開機啟動!不用寫了
106,111,112:
service mysqld start

123,124,125
service mysqld start

Zookepper(所有app共用,三臺,或一臺)

118,119,120:
zkServer.sh start
zkServer.sh status

Dubbo管控臺(所有app共用,一臺,可選)

121:
/usr/local/apache-tomcat-8.5.20/bin/startup.sh

RocketMQ(雙Master)

113,114:
啟動nameserver
cd /usr/local/alibaba-rocketmq/bin/
nohup sh mqnamesrv &

啟動rocketmq
113:
cd /usr/local/alibaba-rocketmq/bin
nohup sh mqbroker -c /usr/local/alibaba-rocketmq/conf/2m-noslave/broker-a.properties >/dev/null 2>&1 &
jps

114:
cd /usr/local/alibaba-rocketmq/bin
nohup sh mqbroker -c /usr/local/alibaba-rocketmq/conf/2m-noslave/broker-b.properties >/dev/null 2>&1 &
jps

113:
/usr/local/apache-tomcat-8.5.20/bin/startup.sh

RocketMQ(雙Master雙Slave)

113,114,116,117:
113
啟動nameserver
cd /usr/local/alibaba-rocketmq/bin/
nohup sh mqnamesrv &

啟動rocketmq
113:
cd /usr/local/alibaba-rocketmq/bin
nohup sh mqbroker -c /usr/local/alibaba-rocketmq/conf/2m-noslave/broker-a.properties >/dev/null 2>&1 &
jps

114:
cd /usr/local/alibaba-rocketmq/bin
nohup sh mqbroker -c /usr/local/alibaba-rocketmq/conf/2m-noslave/broker-b.properties >/dev/null 2>&1 &
jps

116:
cd /usr/local/alibaba-rocketmq/bin
nohup sh mqbroker -c /usr/local/alibaba-rocketmq/conf/2m-2s-async/broker-a-s.properties >/dev/null 2>&1 &
jps

117:
cd /usr/local/alibaba-rocketmq/bin
nohup sh mqbroker -c /usr/local/alibaba-rocketmq/conf/2m-2s-async/broker-b-s.properties >/dev/null 2>&1 &
jps

113:
/usr/local/apache-tomcat-8.5.20/bin/startup.sh

完整的虛擬機器安裝和軟體安裝可以參考本專案的Github中的《Linxu叢集搭建》。

SOA拆分

業務拆分

  1. 使用者子系統(擁有使用者庫 eshop_user):
    • 使用者模組:user+mail,涉及user,role,mail,mail_text,balance表
    • 產品模組:product,涉及product,category表
    • 新聞模組:news,涉及news表
    • 訊息模組: producer_transaction_message表
  2. 訂單子系統(擁有訂單庫eshop_order):
    • 訂單模組: order,涉及order表
    • 訊息模組: consumer_transaction_message表
  3. 郵件子系統(無資料庫)

拆分時的約定

  1. 公共的domain、enumeration都放在common模組下
  2. 一般情況下api模組放service介面和exception異常
    注意自己模組的異常放在自己模組的api模組下(Dubbo異常機制)
  3. i18n資原始檔放在common下即可,別的模組下不用放

專案啟動順序

email,order,user,web

注意事項

  1. 所有實體類都要實現serializable介面
  2. Dubbo異常處理機制:
    異常類和介面類在同一jar包裡,直接丟擲,否則被調方service中丟擲的異常,在呼叫方中會被包一層RuntimeException,無法獲得原來的異常。

資料庫拆分

首先是按照業務分庫:每個模組擁有每個模組對應的資料庫,比如訂單模組對應著訂單庫,裡面有訂單表。
本專案中的業務分庫實際操作如下:
1. 使用者模組對應使用者庫:
這裡寫圖片描述
2. 訂單模組對應訂單庫:
這裡寫圖片描述
3. 郵件模組不需要資料庫。
對每個資料庫都做了主從複製和讀寫分離,一主二從,保證資料庫的高可用。

其次是分表:如果表的體積過大,可以將單張表拆為多張表,每張表持有原表的一部分資料,在插入時可以根據一定的規則分散到多張表中。
以上可以用一張圖來描述,圖源網路,侵刪。
這裡寫圖片描述

按照業務分庫可能會帶來的問題

  • 跨庫Join
  • 分散式事務

分表可能會帶來的問題

  • 全域性分散式ID生成
  • 分片規則選取
  • 排序分頁等

解決方案

RDBMS -> NoSQL -> NewSQL
在RDBMS基礎上有一些分散式資料庫中介軟體,比如MyCat;另一種則是新型資料庫,稱為NewSQL,一般是相容某一種RDBMS以保證使用者使用無障礙(比如相容MySQL),比如TiDB。
這裡寫圖片描述

中介軟體的侷限性(來自網路)

效能

基於 MySQL 的方案它的天花板在哪裡,它的天花板特別明顯。有一個思路是能不能通過 MySQL 的 server 把 InnoDB 變成一個分散式資料庫,聽起來這個方案很完美,但是很快就會遇到天花板。因為 MySQL 生成的執行計劃是個單機的,它認為整個計劃的 cost 也是單機的,我讀取一行和讀取下一行之間的開銷是很小的,比如迭代 next row 可以立刻拿到下一行。實際上在一個分散式系統裡面,這是不一定的。

另外,你把資料都拿回來計算這個太慢了,很多時候我們需要把我們的 expression 或者計算過程等等運算推下去,向上返回一個最終的計算結果,這個一定要用分散式的 plan,前面控制執行計劃的節點,它必須要理解下面是分散式的東西,才能生成最好的 plan,這樣才能實現最高的執行效率。

比如說你做一個 sum,你是一條條拿回來加,還是讓一堆機器一起算,最後給我一個結果。 例如我有 100 億條資料分佈在 10 臺機器上,並行在這 10臺機器我可能只拿到 10 個結果,如果把所有的資料每一條都拿回來,這就太慢了,完全喪失了分散式的價值。聊到 MySQL 想實現分散式,另外一個實現分散式的方案就是 Proxy。但是 Proxy 本身的天花板在那裡,就是它不支援分散式的 transaction,它不支援跨節點的 join,它無法理解複雜的 plan,一個複雜的 plan 打到 Proxy 上面,Proxy 就傻了,我到底應該往哪一個節點上轉發呢,如果我涉及到 subquery sql 怎麼辦?所以這個天花板是瞬間會到,在傳統模型下面的修改,很快會達不到我們的要求。

高可用(運維)

另外一個很重要的是,MySQL 支援的複製方式是半同步或者是非同步,但是半同步可以降級成非同步,也就是說任何時候資料出了問題你不敢切換,因為有可能是非同步複製,有一部分資料還沒有同步過來,這時候切換資料就不一致了。前一陣子出現過某公司突然不能支付了這種事件,今年有很多這種類似的 case,所以微博上大家都在說“說好的異地多活呢?”……
為什麼傳統的方案在這上面解決起來特別的困難,天花板馬上到了,基本上不可能解決這個問題。另外是多資料中心的複製和資料中心的容災,MySQL 在這上面是做不好的。

SQL支援

基於中介軟體來進行分庫, 確實對 SQL 有閹割的情況,並不是所有sql都能夠支援。主要原因是資料被拆分了。而資料一旦被拆分到多個節點,則: 1.複雜的join查詢2. 同時更新多個數據庫節點的sql語句這兩類SQL的支援難度,就比較高。這也是目前市面上所有中介軟體都無法滿足的兩點。複雜的join查詢之所以難以支援,是因為要跨節點join;同時更新多個節點的sql難以支援,是因為很難解決多個節點的併發一致性問題。但是除了這兩點之外,其他的sql型別,一款中介軟體是能夠努力做到的。

與本專案的結合點

本專案沒有考慮分表,資料量實在太小,而且實現非常繁瑣。另外分表並不是SOA專案的必須實現的,但分散式事務是SOA專案必須要解決的,資料庫不拆分,僅拆分業務,是無法實現服務的彈性擴充套件的。

基於本專案的示例和學習性質,所以希望選取一種較為簡單的解決方案。當然最希望是Dubbo本身能支援分散式事務,但很遺憾,Dubbo目前不支援。

其次是考慮使用MyCat中介軟體,但它對分散式事務支援不夠好,目前僅支援弱XA(下一個主題即分散式事務,稍後介紹),而它對分表可以遮蔽內部細節的優勢又沒有得到體現,所以沒有被採用。

之後又考慮使用TiDB,據說是分散式資料庫方面的最新成果,但資料太少,而且擔心無法實現MySQL的平滑過渡,可能有坑需要填等問題,最後也沒有采用。但其本身是非常優秀的技術,其官網上對其介紹如下:

TiDB 是新一代開源分散式 NewSQL 資料庫,模型受 Google Spanner / F1 論文的啟發, 實現了自動的水平伸縮,強一致性的分散式事務,基於 Raft 演算法的多副本複製等重要 NewSQL 特性。 TiDB 結合了 RDBMS 和 NoSQL 的優點,部署簡單,線上彈性擴容和非同步表結構變更不影響業務, 真正的異地多活及自動故障恢復保障資料安全,同時相容 MySQL 協議,使遷移使用成本降到極低。

另外還有一些閉源的解決方案,比如阿里雲、騰訊雲中都有分散式資料庫的解決方案,也沒有考慮(還是優先開源)。
最終是沒有采用資料庫中介軟體或者NewSQL,而是採用了原生MySQL,加上手工實現(common模組)的讀寫分離。

分散式事務(重點)

介紹

@Transactional
void bussinessMethod(){
    // 遠端Service,修改的是遠端資料庫
    AService.updateA();
    // 本地Service,修改的是本地資料庫
    BService.updateB();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

以上便是一個分散式事務的基本示例。以往我們使用本地事務,一般是使用Spring的@Transactional宣告式事務。比如執行兩個業務操作,都是修改了資料庫的,我們要求兩個資料庫DML增改刪操作要麼全部執行,要麼一個都不執行。
比如第一個業務操作執行成功後,而第二個業務操作執行失敗,丟擲異常時,第一個業務操作會被回滾,這時候就可以確保資料是一致性的。
但是在分散式環境下,第一個業務操作執行成功後,而第二個業務操作執行失敗,丟擲異常,而此時是無法回滾第一個業務操作對資料庫的修改的,因為不同同一個資料庫,資料庫的Connection就同,無法放到同一個事務中管理。此時就會出現資料不一致的問題。

常見解決方案

首先要說明的是事務分為柔性事務即不去追求實時的一致性,而是最終一致性和嚴格事務。實現嚴格的一致性的典型解決方案是2PC,在Java中為XA實現,但其因為效率問題無法在網際網路應用中被青睞。而柔性事務則經常被使用。

2PC(2-Phase-Commit)

它分成兩個階段,先由一方進行提議(propose)並收集其他節點的反饋(vote),再根據反饋決定提交(commit)或中止(abort)事務。我們將提議的節點稱為協調者(coordinator),其他參與決議節點稱為參與者(participants, 或cohorts)。
這裡寫圖片描述
兩階段提交中的第二階段, 協調者需要等待所有參與者發出yes請求, 或者一個參與者發出no請求後, 才能執行提交或者中斷操作. 這會造成長時間同時鎖住多個資源, 造成效能瓶頸, 如果參與者有一個耗時長的操作, 效能損耗會更明顯.
實現複雜, 不利於系統的擴充套件, 不推薦。

TCC

TCC, 是基於補償型事務的AP系統的一種實現, 具有最終一致性.
下面以客戶購買商品時的付款操作為例進行講解:

Try:
完成所有的業務檢查(一致性),預留必須業務資源(準隔離性);
體現在本例中, 就是確認客戶賬戶餘額足夠支付(一致性), 鎖住客戶賬戶, 商戶賬戶(準隔離性).
Confirm:
使用Try階段預留的業務資源執行業務(業務操作必須是冪等的), 如果執行出現異常, 要進行重試.
在這裡就是執行客戶賬戶扣款, 商戶賬戶入賬操作.
Cancle:
釋放Try階段預留的業務資源, 在這裡就是釋放客戶賬戶和商戶賬戶的鎖;
如果任一子業務在Confirm階段有操作無法執行成功, 會造成對業務活動管理器的響應超時, 此時要對其他業務執行補償性事務. 如果補償操作執行也出現異常, 必須進行重試, 若實在無法執行成功, 則事務管理器必須能夠感知到失敗的操作, 進行log(用於事後人工進行補償性事務操作或者交由中介軟體接管在之後進行補償性事務操作).
這裡寫圖片描述

TCC能夠對分散式事務中的各個資源進行分別鎖定, 分別提交與釋放, 例如, 假設有AB兩個操作, 假設A操作耗時短, 那麼A就能較快的完成自身的try-confirm-cancel流程, 釋放資源. 無需等待B操作. 如果事後出現問題, 追加執行補償性事務即可.
TCC是繫結在各個子業務上的(除了cancel中的全域性回滾操作), 也就是各服務之間可以在一定程度上”非同步並行”執行。

非同步確保型/可靠訊息最終一致(基於訊息中介軟體,要求MQ支援事務訊息->目前阿里閉源版MQ支援)

這裡寫圖片描述

執行步驟如下:

  1. MQ傳送方傳送遠端事務訊息到MQ Server;
  2. MQ Server給予響應, 表明事務訊息已成功到達MQ Server.
  3. MQ傳送方Commit本地事務.
    若本地事務Commit成功, 則通知MQ Server允許對應事務訊息被消費; 若本地事務失敗, 則通知MQ Server對應事務訊息應被丟棄.
    若MQ傳送方超時未對MQ Server作出本地事務執行狀態的反饋, 那麼需要MQ Server向MQ傳送方主動回查事務狀態, 以決定事務訊息是否能被消費.
  4. 當得知本地事務執行成功時, MQ Server允許MQ訂閱方消費本條事務訊息.
    需要額外說明的一點, 就是事務訊息投遞到MQ訂閱方後, 並不一定能夠成功執行. 需要MQ訂閱方主動給予消費反饋(ack)

如果MQ訂閱方執行遠端事務成功, 則給予消費成功的ack, 那麼MQ Server可以安全將事務訊息移除;
如果執行失敗, MQ Server需要對訊息重新投遞, 直至消費成功.

這裡寫圖片描述
本專案就是採用這種方法實現的,而且是基於閹割版的RocketMQ為(未實現訊息回查)實現了外圍的訊息回查。

最大努力通知型(基於訊息中介軟體,定期校對)

這裡寫圖片描述
這是分散式事務中要求最低的一種, 也可以通過訊息中介軟體實現, 與前面非同步確保型操作不同的一點是, 在訊息由MQ Server投遞到消費者之後, 允許在達到最大重試次數之後正常結束事務.

1.業務活動的主動方,在完成業務處理之後,向業務活動的被動方傳送訊息,允許訊息丟失。
2.主動方可以設定時間階梯型通知規則,在通知失敗後按規則重複通知,直到通知N次後不再通知。
3.主動方提供校對查詢介面給被動方按需校對查詢,用於恢復丟失的業務訊息。
4.業務活動的被動方如果正常接收了資料,就正常返回響應,並結束事務。
5.如果被動方沒有正常接收,根據定時策略,向業務活動主動方查詢,恢復丟失的業務訊息。

相比於可靠訊息最終一致方案,最大努力通知方案設計上比較簡單,主要是由兩部分構成。

1.實時訊息服務(MQ):接收主動方傳送的MQ訊息。
2.通知服務子系統:監聽MQ訊息,當收到訊息後,向被動方傳送通知(一般是URL方式),同時生成通知記錄。如果沒有接收到被動方的返回訊息,就根據通知記錄進行重複通知。

最大努力通知方案實現方式比較簡單,本質上就是通過定期校對,適用於資料一致性時間要求不太高的場合,其實不把它看作是分散式事務方案,只認為是一種跨平臺的資料處理方案也是可以的。

與本專案的結合點(RocketMQ)

RocketMQ與其他訊息中介軟體的一個區別是支援事務訊息。這個事務訊息其實在上面的可靠訊息最終一致中已經介紹過了,下面就RocketMQ而言說明一下其原理。
關於RocketMQ的完整介紹可以參考本專案的Github中的《RocketMQ筆記》。

RocketMQ事務訊息原理

一個分散式事務被拆為一個本地事務和一個訊息傳送。
而訊息傳送的前提是本地事務執行成功,本地事務提交後,訊息才會傳送出去;否則會取消該訊息的傳送。
流程如下:
1. Producer向Broker傳送Prepared訊息,可能會發送失敗
2. 執行本地事務
3. 如果本地事務執行成功,則傳送Confirm訊息;如果失敗,那麼回滾本地事務,取消傳送Confirm訊息,Broker會刪除Prepared訊息。
4. Producer傳送Confirm訊息時可能會發送失敗,此時訊息的狀態仍為Prepared。
5. Broker接收到Confirm訊息時,會將該訊息推送給Consumer。
6. 設定Scheduler去向Producer輪詢Prepared訊息的當前狀態,稱為訊息回查。訊息回查主要目的是檢測Confirm訊息傳送失敗的情況。
7. Consumer接收到訊息,執行本地事務。
8. 本地事務執行成功時,會返回給Broker一個ACK,執行失敗時,Broker會定期重新發送給Consumer該訊息,超過重試次數時可以選擇不再重試。

RocketMQ第一階段傳送Prepared訊息時,會拿到訊息的地址,第二階段執行本地事務,第三階段通過第一階段拿到的地址去訪問訊息,並修改訊息的狀態。
為解決確認訊息傳送失敗的問題(訊息回查),RocketMQ會定期掃描訊息叢集中的事務訊息,如果發現了Prepared訊息,它會向訊息傳送端(生產者)確認,Bob的錢到底是減了還是沒減呢?如果減了是回滾還是繼續傳送確認訊息呢?RocketMQ會根據傳送端設定的策略來決定是回滾還是繼續傳送確認訊息。這樣就保證了訊息傳送與本地事務同時成功或同時失敗。

問題

  1. prepared訊息可能會發送失敗,此時會丟擲異常。解決方案一般是返回錯誤結果,使用者進行重試。當然也可以設定Producer對傳送失敗的訊息進行重試。
  2. 本地事務執行失敗,返回錯誤結果,使用者進行重試。
  3. Confirm訊息傳送失敗,此時Producer一般是無法感知到的。此時需要Broker遍歷Prepared訊息,進行訊息回查。而開源RocketMQ閹割了這部分的內容,這也是一會我將要介紹的外圍解決事務回查問題的內容。如果可以檢測到Confirm訊息傳送失敗的情況,那麼一般解決方案是重新發送Confirm訊息。
  4. 重試傳送Confirm訊息,仍失敗,且超過重試次數時要進行記錄,可以考慮人工處理:繼續重發或者回滾本地事務(需要自行實現)。
  5. Consumer消費失敗。消費失敗時RocketMQ會自動進行重試,我們可以自己去設定一個重試次數,超過重試次數時進行記錄,可以考慮人工處理:繼續重試或者回滾本地事務。
  6. Consumer訊息重複。RocketMQ不能保證訊息重複,而對於當前業務而言是不能重複的,這個解決方案在後面也會介紹。

分散式事務設計

場景

在業務中有一處需要使用者為訂單付款,該業務會修改使用者庫的balance(使用者餘額表),扣減使用者的餘額,然後會修改訂單庫的order(訂單表)和enterprise(企業餘額表),將訂單狀態設定為已被支付,並增加企業的餘額。這裡就同時修改多個數據庫,涉及到了分散式事務的問題。我最終是使用了RocketMQ的事務訊息,並從外圍解決了訊息回查的問題。

他人思路

在設計我的解決方案前嘗試搜尋了一下別人的實現 傳送門。他的解決方案是在producer和consumer方設定了兩個scheduler,感覺是有些複雜的。我是在其基礎上進行了簡化,並解決了一些其他問題,使得整個解決方案比較完整和邏輯自洽。

我的設計

A和B是兩個Service,A執行本地事務,B執行遠端事務。A會呼叫B的遠端服務,完成整個業務。就本專案而言,A就是使用者模組的AccountService,B就是訂單模組的OrderService。A和B都有一張表,儲存著訊息資料。從MQ的視角看來,A是訊息的Producer,B是訊息的Consumer。

A(本地事務執行方,MQProducer)

1) db
producer_msg(msgId,body,message_status,create_time,update_time,send_times,topic) msgId這裡為orderId

2) mq
作為producer時,註冊Topic account:當執行本地事務時同時插入producer_msg,預設status都是未被消費。如果本地事務執行失敗,那麼直接回滾,不插入。當訊息傳送失敗時,我們已經在producer_msg插入了記錄,可以進行回查。

3) scheduler
A需要同步B的資料庫,使得兩個資料庫資料一致,不同的即為確認資訊傳送失敗的。
訊息狀態有未被消費、已被消費、消費失敗、超過消費失敗的重試次數、超過確認訊息傳送失敗的重試次數和已被回滾。
A和B資料庫同步維護所有訊息,只是A資料庫儲存內容更多,比如會儲存訊息的body。
如果訊息已經是超過重試次數或已被消費,那麼A不會再去考慮它。
A的Scheduler會遍歷A資料庫,找出未被消費和消費失敗的id且建立時間距離當前時間超過1min,傳送給B。
B會遍歷這些id

for(id in ids){
    如果 id 不存在,說明確認訊息傳送失敗,
    如果 id 存在,則將該id對應的status一併返回,map.put(id,status)
} 
  • 1
  • 2
  • 3
  • 4

A 接收到map後,keySet取得所有id,拿傳送過去的id減去這些id(差集),就是確認訊息傳送失敗的訊息,進行重新發送;遍歷map,將本地資料庫同步為B資料庫。

這個方法可能會出現訊息重複,因為A剛傳送訊息,B該沒有處理,A的Scheduler就去查詢了,當然訊息都沒有被消費,因為A會重發剛才的訊息,但是B有做訊息去重,所以不會影響。

B(遠端事務執行方,MQConsumer)

1) db
consumer_msg(msgId,create_time. message_status,topic) msgId這裡是orderId

2) mq
作為consumer,註冊Topic account:
當接收到訊息後,查詢是否被執行過,如果沒有被消費過(id未找到)或者消費失敗了(這裡解決了訊息重複消費的問題),則執行遠端事務後插入/更新consumer_ msg(status為已被消費),已被消費則跳過。
遠端事務執行失敗時,插入/更新consumer_ msg(status為消費失敗)
超過重試消費次數的訊息也更新consumer_ msg,status為超過消費的重試次數。
B這裡就維護它所接收的訊息的狀態。

訊息表

在producer這一方設計了producer_transaction_message表。
這裡寫圖片描述
- msgId是訊息唯一id,可以採用業務上的id來實現,比如訂單id。
- body是訊息體,比如訂單物件的序列化結果。
- message_status是訊息狀態
- update_time是最後更新記錄時間
- create_time是訊息建立時間
- send_times是確認訊息重複傳送次數
- topic是訊息主題,這裡均為account

在consumer這一方設計了consumer_transaction_message表。
這裡寫圖片描述
看得出來是producer的表的部分列,其含義也是相同的。

分散式事務實現程式碼

Producer方

MQProducerConfig(配置MQProducer)

@Configuration
@Slf4j
@Getter
public classMQProducerConfig {
    @Value("${spring.rocketmq.group-name}")
    private String groupName;
    @Value("${spring.rocketmq.namesrv-addr}")
    private String namesrvAddr;
    @Value("${spring.rocketmq.topic}")
    private String topic;
    @Value("${spring.rocketmq.confirm-message-faiure-retry-times}")  
    private Integer retryTimes;
    public static final Integer CHECK_GAP = 1; 

    @Bean
    public MQProducer mqProducer() throws MQClientException {
        TransactionMQProducer producer = new TransactionMQProducer(groupName);
        producer.setNamesrvAddr(namesrvAddr); 
        producer.setTransactionCheckListener(new TransactionCheckListener() {
            @Override
            public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
                // doNothing
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        });
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
            public void run() {
                producer.shutdown();
            }
        }));
        producer.start();
        log.info("producer started!");
        return producer;
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

AccountLocalTransactionExecutor(執行本地事務)

@Component
@Slf4j
public classAccountLocalTransactionExecutorimplementsLocalTransactionExecuter {
    @Autowired
    private PayService payService;
    @Autowired
    private ProducerTransactionMessageService messageService;

    @Override
    public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
        try {
            String paymentPassword = (String) arg;
            OrderDO order = ProtoStuffUtil.deserialize(msg.getBody(), OrderDO.class);
            if (order.getOrderStatus() != OrderStatus.UNPAID) {
                log.info("{} 訂單狀態不為unpaid", order.getId());
                throw new OrderStateIllegalException(order.getOrderStatus().toString());
            }
            // 本地事務,減少使用者賬戶餘額
            // 丟擲異常時會進行回滾,下面構造訊息儲存到資料庫也不會被執行
            payService.decreaseAccount(order.getUser().getId(), order.getTotalPrice(), paymentPassword);
            // 儲存訊息至資料庫
            ProducerTransactionMessageDO messageDO = ProducerTransactionMessageDO.builder()
                    .id(order.getId())
                    .body(msg.getBody())
                    .createTime(LocalDateTime.now())
                    .updateTime(LocalDateTime.now())
                    .messageStatus(MessageStatus.UNCONSUMED)
                    .topic(msg.getTopic())
                    .sendTimes(0)
                    .build();
            messageService.save(messageDO);
            // 成功通知MQ訊息變更 該訊息變為:<確認傳送>
            return LocalTransactionState.COMMIT_MESSAGE;
        } catch (Exception e) {
            e.printStackTrace();
            log.info("本地事務執行失敗,直接回滾!");
            // 失敗則不通知MQ 該訊息一直處於:<暫緩傳送>
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41

AccountServiceImpl(Producer支付業務入口)

@Service
@Slf4j
public classAccountServiceImplimplementsAccountService {
    @Autowired
    private MQProducerConfig config;
    @Autowired
    private MQProducer producer;
    @Autowired
    private AccountLocalTransactionExecutor executor;
    @Autowired
    private ProducerTransactionMessageService messageService;
    @Autowired
    private PayService payService;


    @Override
    public void commit(OrderDO order, String paymentPassword) {
        Message message = new Message();
        message.setTopic(config.getTopic());
        message.setBody(ProtoStuffUtil.serialize(order));
        TransactionSendResult result = null;
        try {
            result = this.producer.sendMessageInTransaction(message, executor, paymentPassword);
            log.info("事務訊息傳送結果:{}", result);
            log.info("TransactionState:{} ", result.getLocalTransactionState());
            // 因為無法獲得executor中丟擲的異常,只能模糊地返回訂單支付失敗資訊。
            // TODO 想辦法從executor中找到原生異常
        } catch (Exception e) {
            log.info("AccountService丟擲異常...");
            e.printStackTrace();
        }
        if (result.getLocalTransactionState() == LocalTransactionState.ROLLBACK_MESSAGE) {
            throw new OrderPaymentException(order.getId());
        }
    }

    @Transactional
    @Override
    public void rollback(ProducerTransactionMessageDO message) {
        OrderDO order = ProtoStuffUtil.deserialize(message.getBody(), OrderDO.class);
        message.setMessageStatus(MessageStatus.ROLLBACK);
        message.setUpdateTime(LocalDateTime.now());
        messageService.update(message);
        payService.increaseAccount(order.getUser().getId(), order.getTotalPrice());
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47

TransactionCheckScheduler(訊息回查)

@Component
public classTransactionCheckScheduler {
    @Autowired
    private ProducerTransactionMessageService messageService;

    /**
     * 每分鐘執行一次事務回查
     */
    @Scheduled(fixedRate = 60 * 1000)
    public void checkTransactionMessage(){
        messageService.check();
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
ProducerTransactionMessageServiceImpl(Producer訊息服務提供者)
@Slf4j
public classProducerTransactionMessageServiceImplimplementsProducerTransactionMessageService {
    @Autowired
    private MQProducer producer;
    @Autowired
    private MQProducerConfig config;
    @Autowired
    private ProductTransactionMessageDOMapper mapper;
    @Autowired
    private ConsumerTransactionMessageService consumerTransactionMessageService;

    @Transactional
    @Override
    public void save(ProducerTransactionMessageDO message) {
        mapper.insert(message);
    }

    @Transactional
    @Override
    public void check() {
        List<Long> all = mapper.findMessageIdsByStatusCreatedAfter(Arrays.asList(MessageStatus.UNCONSUMED, MessageStatus.CONSUME_FAILED), MQProducerConfig.CHECK_GAP);
        Map<Long, MessageStatus> statusMap = consumerTransactionMessageService.findConsumerMessageStatuses(all);
        for (Map.Entry<Long, MessageStatus> entry : statusMap.entrySet()) {
            mapper.updateByPrimaryKeySelective(ProducerTransactionMessageDO.builder().id(entry.getKey()).messageStatus(entry.getValue()).updateTime(LocalDateTime.now()).build());
        }
        all.removeAll(statusMap.keySet());
        // 此時all為確認訊息傳送失敗的
        this.reSend(mapper.selectBatchByPrimaryKeys(all));
    }

    @Transactional
    @Override
    public void reSend(List<ProducerTransactionMessageDO> messages) {
        for (ProducerTransactionMessageDO messageDO : messages) {
            if (messageDO.getSendTimes() == config.getRetryTimes()) {
                messageDO.setUpdateTime(LocalDateTime.now());
                messageDO.setMessageStatus(MessageStatus.OVER_CONFIRM_RETRY_TIME);
                mapper.updateByPrimaryKeySelective(messageDO);
                continue;
            }
            Message message = new Message();
            message.setTopic(config.getTopic());
            message.setBody(messageDO.getBody());
            try {
                SendResult result = producer.send(message);
                messageDO.setSendTimes(messageDO.getSendTimes() + 1);
                messageDO.setUpdateTime(LocalDateTime.now());
                mapper.updateByPrimaryKeySelective(messageDO);
                log.info("傳送重試訊息完畢,Message:{},result:{}", message, result);
            } catch (Exception e) {
                e.printStackTrace();
                log.info("傳送重試訊息時失敗! Message:{}", message);
            }
        }
    }

    @Transactional
    @Override
    public void delete(Long id) {
        mapper.deleteByPrimaryKey(id);
    }

    @Transactional(readOnly = true)
    @Override
    public List<ProducerTransactionMessageDO> findByIds(List<Long> ids) {
        return mapper.selectBatchByPrimaryKeys(ids);
    }

    @Transactional(readOnly = true)
    @Override
    public PageInfo<ProducerTransactionMessageDO> findByQueryDTO(MessageQueryConditionDTO dto) {
        return mapper.findByCondition(dto, dto.getPageNum(), dto.getPageSize()).toPageInfo();
    }

    @Override
    public void update(ProducerTransactionMessageDO message) {
        mapper.updateByPrimaryKeySelective(message);
    }

}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80

Consumer

MQConsumerConfig(配置MQConsumer)

@Configuration
@Slf4j
@Getter
public classMQConsumerConfig {
    private DefaultMQPushConsumer consumer;

    @Value("${spring.rocketmq.group-name}")
    private String groupName;
    @Value("${spring.rocketmq.namesrv-addr}")
    private String namesrvAddr;
    @Value("${spring.rocketmq.topic}")
    private String topic;
    @Autowired
    private AccountMessageListener accountMessageListener;
    @Value("${spring.rocketmq.consume-failure-retry-times}")
    private Integer retryTimes;

    @PostConstruct
    public void init() throws MQClientException {
        this.consumer = new DefaultMQPushConsumer(groupName);
        this.consumer.setNamesrvAddr(namesrvAddr);
        // 啟動後從佇列頭部開始消費
        this.consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        this.consumer.subscribe(topic, "*");
        this.consumer.registerMessageListener(accountMessageListener);
        this.consumer.start();
        log.info("consumer started!");
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29

AccountMessageListener(訊息接收方)

@Component
@Slf4j
public classAccountMessageListenerimplementsMessageListenerConcurrently {
    @Autowired
    private OrderSe