1. 程式人生 > >分散式訊息佇列RocketMQ--事務訊息--解決分散式事務的最佳實踐

分散式訊息佇列RocketMQ--事務訊息--解決分散式事務的最佳實踐

說到分散式事務,就會談到那個經典的”賬號轉賬”問題:2個賬號,分佈處於2個不同的DB,或者說2個不同的子系統裡面,A要扣錢,B要加錢,如何保證原子性?

一般的思路都是通過訊息中介軟體來實現“最終一致性”:A系統扣錢,然後發條訊息給中介軟體,B系統接收此訊息,進行加錢。

但這裡面有個問題:A是先update DB,後傳送訊息呢? 還是先發送訊息,後update DB?

假設先update DB成功,傳送訊息網路失敗,重發又失敗,怎麼辦? 
假設先發送訊息成功,update DB失敗。訊息已經發出去了,又不能撤回,怎麼辦?

所以,這裡下個結論: 只要傳送訊息和update DB這2個操作不是原子的,無論誰先誰後,都是有問題的。

那這個問題怎麼解決呢??

錯誤的方案0

有人可能想到了,我可以把“傳送訊息”這個網路呼叫和update DB放在同1個事務裡面,如果傳送訊息失敗,update DB自動回滾。這樣不就保證2個操作的原子性了嗎?

這個方案看似正確,其實是錯誤的,原因有2:

(1)網路的2將軍問題:傳送訊息失敗,傳送方並不知道是訊息中介軟體真的沒有收到訊息呢?還是訊息已經收到了,只是返回response的時候失敗了?

如果是已經收到訊息了,而傳送端認為沒有收到,執行update db的回滾操作。則會導致A賬號的錢沒有扣,B賬號的錢卻加了。

(2)把網路呼叫放在DB事務裡面,可能會因為網路的延時,導致DB長事務。嚴重的,會block整個DB。這個風險很大。

基於以上分析,我們知道,這個方案其實是錯誤的!

方案1–業務方自己實現

假設訊息中介軟體沒有提供“事務訊息”功能,比如你用的是Kafka。那如何解決這個問題呢?

解決方案如下: 
(1)Producer端準備1張訊息表,把update DB和insert message這2個操作,放在一個DB事務裡面。

(2)準備一個後臺程式,源源不斷的把訊息表中的message傳送給訊息中介軟體。失敗了,不斷重試重傳。允許訊息重複,但訊息不會丟,順序也不會打亂。

(3)Consumer端準備一個判重表。處理過的訊息,記在判重表裡面。實現業務的冪等。但這裡又涉及一個原子性問題:如果保證訊息消費 + insert message到判重表這2個操作的原子性?

消費成功,但insert判重表失敗,怎麼辦?關於這個,在Kafka的原始碼分析系列,第1篇, exactly once問題的時候,有過討論。

通過上面3步,我們基本就解決了這裡update db和傳送網路訊息這2個操作的原子性問題。

但這個方案的一個缺點就是:需要設計DB訊息表,同時還需要一個後臺任務,不斷掃描本地訊息。導致訊息的處理和業務邏輯耦合額外增加業務方的負擔。

方案2 – RocketMQ 事務訊息

為了能解決該問題,同時又不和業務耦合,RocketMQ提出了“事務訊息”的概念。

具體來說,就是把訊息的傳送分成了2個階段:Prepare階段和確認階段。

具體來說,上面的2個步驟,被分解成3個步驟: 
(1) 傳送Prepared訊息 
(2) update DB 
(3) 根據update DB結果成功或失敗,Confirm或者取消Prepared訊息。

可能有人會問了,前2步執行成功了,最後1步失敗了怎麼辦?這裡就涉及到了RocketMQ的關鍵點:RocketMQ會定期(預設是1分鐘)掃描所有的Prepared訊息,詢問傳送方,到底是要確認這條訊息發出去?還是取消此條訊息?

具體程式碼實現如下:

也就是定義了一個checkListener,RocketMQ會回撥此Listener,從而實現上面所說的方案。

// 也就是上文所說的,當RocketMQ發現`Prepared訊息`時,會根據這個Listener實現的策略來決斷事務
TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();
// 構造事務訊息的生產者
TransactionMQProducer producer = new TransactionMQProducer("groupName");
// 設定事務決斷處理類
producer.setTransactionCheckListener(transactionCheckListener);
// 本地事務的處理邏輯,相當於示例中檢查Bob賬戶並扣錢的邏輯
TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();
producer.start()
// 構造MSG,省略構造引數
Message msg = new Message(......);
// 傳送訊息
SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);
producer.shutdown();
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
public TransactionSendResult sendMessageInTransaction(.....)  {
    // 邏輯程式碼,非實際程式碼
    // 1.傳送訊息
    sendResult = this.send(msg);
    // sendResult.getSendStatus() == SEND_OK
    // 2.如果訊息傳送成功,處理與訊息關聯的本地事務單元
    LocalTransactionState localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);
    // 3.結束事務
    this.endTransaction(sendResult, localTransactionState, localException);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

總結:對比方案2和方案1,RocketMQ最大的改變,其實就是把“掃描訊息表”這個事情,不讓業務方做,而是訊息中介軟體幫著做了。

至於訊息表,其實還是沒有省掉。因為訊息中介軟體要詢問傳送方,事物是否執行成功,還是需要一個“變相的本地訊息表”,記錄事物執行狀態。

人工介入

可能有人又要說了,無論方案1,還是方案2,傳送端把訊息成功放入了佇列,但消費端消費失敗怎麼辦?

消費失敗了,重試,還一直失敗怎麼辦?是不是要自動回滾整個流程?

答案是人工介入。從工程實踐角度講,這種整個流程自動回滾的代價是非常巨大的,不但實現複雜,還會引入新的問題。比如自動回滾失敗,又怎麼處理?

對應這種極低概率的case,採取人工處理,會比實現一個高複雜的自動化回滾系統,更加可靠,也更加簡單。

相關推薦

分析redis訊息佇列和kafka來解決分散式事務場景

1、系統A(扣減托盤)【訊息生產者】 2、系統B(扣減押金)【訊息消費者】 業務描述: 兩套系統,A中扣減托盤,B中對應的要扣減押金;A中托盤歸還,B中押金返還 利用訊息佇列來解決分散式事務過程: 傳送方【生產者】:(不關心接收方狀態,只需要確定本地OK,訊息推送即可)

分散式訊息佇列RocketMQ&Kafka -- 訊息的“順序消費”-- 一個看似簡單的複雜問題

在說到訊息中介軟體的時候,我們通常都會談到一個特性:訊息的順序消費問題。這個問題看起來很簡單:Producer傳送訊息1, 2, 3。。。 Consumer按1, 2, 3。。。順序消費。 但實際情況卻是:無論RocketMQ,還是Kafka,預設都不保證訊息

分散式訊息佇列RocketMQ--事務訊息--解決分散式事務

說到分散式事務,就會談到那個經典的”賬號轉賬”問題:2個賬號,分佈處於2個不同的DB,或者說2個不同的子系統裡面,A要扣錢,B要加錢,如何保證原子性? 一般的思路都是通過訊息中介軟體來實現“最終一致性”:A系統扣錢,然後發條訊息給中介軟體,B系統接收此訊息,進行加錢。 但這裡面有個問題:A是先update D

分散式訊息佇列RocketMQ--事務訊息--解決分散式事務最佳實踐

說到分散式事務,就會談到那個經典的”賬號轉賬”問題:2個賬號,分佈處於2個不同的DB,或者說2個不同的子系統裡面,A要扣錢,B要加錢,如何保證原子性? 一般的思路都是通過訊息中介軟體來實現“最終一致性”:A系統扣錢,然後發條訊息給中介軟體,B系統接收此訊息,進行加錢。

分散式訊息佇列 RocketMQ原始碼解析:事務訊息

摘要: 原創出處 http://www.iocoder.cn/RocketMQ/message-

分散式訊息佇列RocketMQ與Kafka的18項差異之“撥亂反正”

我們知道,阿里的RocketMQ其實源自Kafka。同時網路上一直流傳著1篇阿里中介軟體團隊所寫的RocketMQ與Kafka的18項差異的文章,並且被廣泛轉發。比如: http://blog.csdn.net/damacheng/article/detail

分散式訊息佇列RocketMQ原始碼分析之3 -- Consumer負載均衡機制 -- Rebalance

同Kafka一樣,RocketMQ也需要探討一個問題:如何把一個topic的多個queue分攤給不同的consumer,也就是負載均衡問題。 有興趣朋友可以關注公眾號“架構之道與術”, 獲取最新文章。 或掃描如下二維碼: 在討論這個問題之前,我們先看一

分散式訊息佇列RocketMQ原始碼分析之2 -- Broker與NameServer心跳機制

我們知道,Kafka是通過ZK的臨時節點來監測Broker的死亡的。當一個Broker掛了之後,ZK上面對應的臨時節點被刪除,同時其他Broker收到通知。 那麼在RocketMQ中,對應的NameServer是如何判斷一個Broker的死亡呢? 有興趣朋友

分散式訊息佇列 RocketMQ 原始碼分析 —— Message 順序傳送與消費

本文主要基於 RocketMQ 4.0.x 正式版 1. 概述 建議前置閱讀內容: 當然對 Message 傳送與消費已經有一定了解的同學,可以選擇跳過。 RocketMQ 提供了兩種順序級別: 普通順序訊息 :Producer 將相關聯的訊息傳送到相同

關於阿里訊息佇列RocketMQ(安裝、使用和坑),你需要知道的事情

為什麼選擇RocketMQ Apache RocketMQ作為阿里開源的一款高效能、高吞吐量的分散式訊息中介軟體。因為阿里有海量的資料量,無數業務場景的應用,是RocketMQ搶盡風頭風頭,成為不可多得中介軟體專案,加上已經正式加入Apach俱樂部,作為頂級的開源專案! 一、關於

訊息佇列——RocketMQ

訊息佇列技術選型: Kafka 缺陷:叢集資料寫入有可能抖動非常嚴重,經常會有資料寫失敗。原因在於隨著業務增長,Topic的資料增多,叢集負載增大,效能下降;個別版本會出現問題,導致副本重新複製,複製的時候有大量的讀,導致磁碟IO過大,影響寫入。 RocketMQ(該部

訊息佇列常見問題和解決方案

說明:此文是筆者對中華石衫老師對訊息佇列講解的一篇總結包括筆者自己的一些理解 一、為什麼使用訊息佇列? 訊息佇列使用的場景和中介軟體有很多,但解決的核心問題主要是:非同步、解耦、消峰填谷。 二、訊息佇列的優缺點 非同步、解耦、消峰填谷這是訊息佇列最大的優點,除了

Amazon SQS 訊息佇列服務_訊息佇列mq解決方案

Amazon Simple Queue Service (SQS) 是一種完全託管的訊息佇列服務,可讓您分離和擴充套件微服務、分散式系統和無伺服器應用程式。SQS 消除了與管理和運營訊息型中介軟體相關的複雜性和開銷,並使開發人員能夠專注於重要工作。藉助 SQS,您可以在軟體元件之間傳送、儲

Windows訊息佇列、執行緒訊息佇列,視窗訊息的概念與關係

1.視窗 Windows程式是由一系列的視窗構成的,每個視窗都有自己的視窗過程,視窗過程就是一個擁有有固定 Signature 的 C函式,具體格式如下: LRESULT CALLBACK WindowProc(HWND hwnd, UINT uMsg, WPARAM wPa

訊息佇列及常見訊息佇列介紹

一、訊息佇列(MQ)概述 訊息佇列(Message Queue),是分散式系統中重要的元件,其通用的使用場景可以簡單地描述為: 當不需要立即獲得結果,但是併發量又需要進行控制的時候,差不多就是需要使用訊息佇列的時候。 訊息佇列主要解決了應用耦合、

mq 訊息佇列 以及常見訊息佇列的介紹

一、訊息佇列(MQ)概述 訊息佇列(Message Queue),是分散式系統中重要的元件,其通用的使用場景可以簡單地描述為: 當不需要立即獲得結果,但是併發量又需要進行控制的時候,差不多就是需要使用訊息佇列的時候。 訊息佇列主要解決了應用耦合、非同步處理、流量削鋒等問題。

訊息佇列之非同步訊息基本概念以及ActiveMQ整合Spring常用用法介紹

一 簡介 (1)非同步訊息: 所謂非同步訊息,跟RMI遠端呼叫、webservice呼叫是類似的,非同步訊息也是用於應用程式之間的通訊。但是它們之間的區別是: RMI、Hession/Burlap、webservice等遠端呼叫機制是同步的。也就是說,當客戶端呼叫遠端方法時,客戶端

Redis延時訊息佇列、非同步訊息佇列的實現

package list; import java.lang.reflect.Type; import java.util.Set; import java.util.UUID; import com.alibaba.fastjson.JSON; import com.a

無等待地從一個訊息佇列中取得訊息, OSQAccept()

如果試圖從訊息佇列中取出一條訊息,而此時訊息佇列又為空時,也可以不讓呼叫任務等待而直接返回呼叫函式。這個操作可以呼叫OSQAccept()函式來完成。程式清單 L6.25是該函式的原始碼。OSQAccept()函式首先檢視pevent指向的事件控制塊是否是由OSQCreate()函式建立的[L6.25(1)]

使用MongoDB實現訊息佇列的非同步訊息功能

一、訊息佇列概述 訊息佇列中介軟體是分散式系統中重要的元件,主要解決應用耦合,非同步訊息,流量削鋒等問題。實現高效能,高可用,可伸縮和最終一致性架構。是大型分散式系統不可缺少的中介軟體。 目前在生產環境,使用較多的訊息佇列有ActiveMQ,RabbitMQ