1. 程式人生 > >分散式事務之可靠訊息

分散式事務之可靠訊息

什麼是可靠訊息?

為什麼我們需要它,為什麼我們要強調可靠?

生產方 訊息傳送出去了,如果生產方收到了訊息的正常反饋,那麼我們就可以知道訊息的確切的狀態。 如果訊息無響應 或者超時了呢? 有多個情況,

1 訊息未到達mq,傳送途中 就某些原因丟失了,

2 訊息送達mq,但是mq處理未完成就丟失(這裡又可以細分為:mq未記錄日誌,已記錄日誌但未落盤訊息,已落盤但未來得及響應請求,已落盤但未完成推送(僅僅針對推的情況))。

3 訊息送達mq,訊息也已經被mq 處理完畢,但是響應在 網路途中 丟失。

4 生產方對傳送的訊息設定超時時間。 雖然訊息送達mq,訊息也已經被mq處理,也返回來了,但是由於此時已經超時,生產方已經斷開了網路連線,從而丟棄了響應。

儘管我們可以儘量的確保MQ可靠,讓mq 可靠的持久化訊息,但是網路 是不可靠的, 幾乎沒有辦法確保 網路 可靠。。。 ( 網路可靠就這麼難嗎??)

如果知道是情況1、2,我們可以重新發送訊息即可,也就是重試。(當然,如果網路問題,或者mq掛掉了,重試也沒有,只有等待 這些問題回覆才重試才有意義,因此,我們可以設定一個 比較長的、“按照指數爆炸” 的  “重試間隔時間”)

如果知道是情況3,如果我們不需要訊息id,那麼我們可以認為 訊息傳送成功,業務也處理成功。不用重試了!

對於前面3個情況,生產方是無法判斷 訊息到底mq 是否已經處理好了, 這就顯得 “不可靠”了, 除了量子力學,沒人喜歡不確定性。 有可能1 、 2  也有可能是3,怎麼辦? 或許我們可以 通過查詢mq 的方式(也就是peek 一下,但是不消費)判斷 是否是3。 

 

所以,我們期望有一個可靠訊息,能夠避免任何問題,包括網路問題。 如果訊息不可靠,那麼我們就需要採取其他的措施,比如之前講的 本地訊息表。。。

 

分散式事務大致可以分為以下四種( 不知道是什麼樣的一個分類 準則):

  • 兩階段型
  • 補償型
  • 非同步確保型
  • 最大努力通知型

可靠訊息, 屬於 非同步確保型。 why?後面會說明。

 

可靠訊息 的實現

可靠訊息 可能有很多實現方式,但一般就是指事務型訊息。可靠訊息 一般也是基於MQ的。 前面說過了基於本地訊息表的分散式事務。基於本地訊息表的分散式事務 其實也可以認為是 基於MQ的分散式事務的 一種情況。

基於MQ的分散式事務:

生產方處理過程:

1 主動方應用先把訊息發給訊息中介軟體,訊息狀態標記為“待確認”;
2 訊息中介軟體收到訊息後,把訊息持久化到訊息儲存中,但並不向被動方應用投遞訊息;
3 訊息中介軟體返回訊息持久化結果(成功/失敗),主動方應用根據返回結果進行判斷如何進行業務操作處理:
      失敗:放棄業務操作處理,結束(必要時向上層返回失敗結果);
      成功:執行業務操作處理;
4 業務操作完成後,把業務操作結果(成功/失敗)傳送給訊息中介軟體;
5 訊息中介軟體收到業務操作結果後,根據業務結果進行處理;
      失敗:刪除訊息儲存中的訊息,結束;
      成功:更新訊息儲存中的訊息狀態為“待發送(可傳送)”,緊接著執行
訊息投遞;

6 前面的正向流程都成功後,向被動方應用投遞訊息;


訊息傳送一致性方案的正向流程是可行的,但異常流程怎麼處理呢?
訊息傳送到訊息中介軟體中能得到保障了,但訊息的準確消費(投遞)又如何保障呢?
有沒有支援這種傳送一致性流程的現成訊息中介軟體?
—— 其實是有的,RocketMQ, 另外我認為, 可以消費方自己去消費,而不是推訊息給 消費方,
會不會更好? 推的話 會有一些延遲,但是 這樣也降低了 MQ的壓力。
--------------------- 
作者:chenshiying007 
來源:CSDN 
原文:    https://blog.csdn.net/qq_27384769/article/details/79305402 
版權宣告:本文為博主原創文章,轉載請附上博文連結!

 

基於RocketMQ的分散式事務:

在RocketMQ中實現了分散式事務,實際上其實是對本地訊息表的一個封裝,將本地訊息表移動到了MQ內部。

下面簡單介紹一下MQ事務,如果想對其詳細瞭解可以參考: https://www.jianshu.com/p/453c6e7ff81c。 

 

基本流程如下: 第一階段Prepared訊息,會拿到訊息的地址。

第二階段執行本地事務。

第三階段通過第一階段拿到的地址去訪問訊息,並修改狀態。訊息接受者就能使用這個訊息。

如果確認訊息失敗,在RocketMq Broker中提供了定時掃描沒有更新狀態的訊息,如果有訊息沒有得到確認,會向訊息傳送者傳送訊息,來判斷是否提交,在rocketmq中是以listener的形式給傳送者,用來處理。 

 

如果消費超時,則需要一直重試,訊息接收端需要保證冪等。如果訊息消費失敗,這個就需要人工進行處理,因為這個概率較低,如果為了這種小概率時間而設計這個複雜的流程反而得不償失。

===========================================================================

上面的說明 摘抄於 ,我看了後還是有些懵。仔細看了https://blog.csdn.net/u010425776/article/details/79516298, 之後,我明白了一些。

訊息生產過程的可靠性保證

在系統A處理任務A前,首先向訊息中介軟體傳送一條訊息
訊息中介軟體收到後將該條訊息持久化,但並不投遞。此時下游系統B仍然不知道該條訊息的存在。
訊息中介軟體持久化成功後,便向系統A返回一個確認應答;
系統A收到確認應答後,則可以開始處理任務A;
任務A處理完成後,向訊息中介軟體傳送Commit請求。該請求傳送完成後,對系統A而言,該事務的處理過程就結束了,此時它可以處理別的任務了。 
但commit訊息可能會在傳輸途中丟失,從而訊息中介軟體並不會向系統B投遞這條訊息,從而系統就會出現不一致性。這個問題由訊息中介軟體的事務回查機制完成,下文會介紹。
訊息中介軟體收到Commit指令後,便向系統B投遞該訊息,從而觸發任務B的執行;
當任務B執行完成後,系統B向訊息中介軟體返回一個確認應答,告訴訊息中介軟體該訊息已經成功消費,此時,這個分散式事務完成。
--------------------- 
作者:凌瀾星空 
來源:CSDN 
原文:https://blog.csdn.net/u010425776/article/details/79516298 
版權宣告:本文為博主原創文章,轉載請附上博文連結!

 

 

上述過程中,如果任務A處理失敗,那麼需要進入回滾流程,如下圖所示:  

  • 若系統A在處理任務A時失敗,那麼就會向訊息中介軟體傳送Rollback請求。和傳送Commit請求一樣,系統A發完之後便可以認為回滾已經完成,它便可以去做其他的事情。
  • 訊息中介軟體收到回滾請求後,直接將該訊息丟棄,而不投遞給系統B,從而不會觸發系統B的任務B。

上面所介紹的Commit和Rollback都屬於理想情況,但在實際系統中,Commit和Rollback指令都有可能在傳輸途中丟失。那麼當出現這種情況的時候,訊息中介軟體是如何保證資料一致性呢?——答案就是超時詢問機制。

 

系統A除了實現正常的業務流程外,還需提供一個事務詢問的介面,供訊息中介軟體呼叫。當訊息中介軟體收到一條事務型訊息後便開始計時,如果到了超時時間也沒收到系統A發來的Commit或Rollback指令的話,就會主動呼叫系統A提供的事務詢問介面詢問該系統目前的狀態。該介面會返回三種結果:

    • 提交 
      若獲得的狀態是“提交”,則將該訊息投遞給系統B。
    • 回滾 
      若獲得的狀態是“回滾”,則直接將條訊息丟棄。
    • 處理中 
      若獲得的狀態是“處理中”,則繼續等待
訊息中介軟體的超時詢問機制能夠防止上游系統因在傳輸過程中丟失Commit/Rollback指令而導致的系統不一致情況,而且能降低上游系統的阻塞時間,
上游系統只要發出Commit/Rollback指令後便可以處理其他任務,無需等待確認應答。而Commit/Rollback指令丟失的情況通過超時詢問機制來彌補,
這樣大大降低上游系統的阻塞時間,提升系統的併發度。
--------------------- 作者:凌瀾星空 來源:CSDN 原文:https://blog.csdn.net/u010425776/article/details/79516298 版權宣告:本文為博主原創文章,轉載請附上博文連結!

系統A傳送訊息的操作應該是同步的,因為我們需要獲取訊息的地址,否則後面就無法進行訊息更新和確認或取消了。 但是呢,這一步驟,如前所述,也是可能出現問題的,也就是無法區分前述情況1、2、3。 但是呢,這個也不要緊的, 因為 訊息必須要確認後, 後面的系統才會進行消費。 如果出現情況3,那麼我們 儘可以的把 這個待確認的訊息丟棄。 而系統A 因為無法收到mq 的反饋, 不會進行下一步, 也可以保證整個系統的 一致性。

 

下面來說一說訊息投遞(消費)過程的可靠性保證。

當上遊系統執行完任務並向訊息中介軟體提交了Commit指令後,便可以處理其他任務了,此時它可以認為事務已經完成,接下來訊息中介軟體一定會保證訊息被下游系統成功消費掉!那麼這是怎麼做到的呢?這由訊息中介軟體的投遞流程來保證。

訊息中介軟體向下遊系統投遞完訊息後便進入阻塞等待狀態,下游系統便立即進行任務的處理,任務處理完成後便向訊息中介軟體返回應答。訊息中介軟體收到確認應答後便認為該事務處理完畢!

如果訊息在投遞過程中丟失,

 

或訊息的確認應答在返回途中丟失,

 

那麼訊息中介軟體在等待確認應答超時之後就會重新投遞,直到下游消費者返回消費成功響應為止。當然,一般訊息中介軟體可以設定訊息重試的次數和時間間隔,比如:當第一次投遞失敗後,每隔五分鐘重試一次,一共重試3次。如果重試3次之後仍然投遞失敗,那麼這條訊息就需要人工干預。

 


有的同學可能要問:訊息投遞失敗後為什麼不回滾訊息,而是不斷嘗試重新投遞?

這就涉及到整套分散式事務系統的實現成本問題。
我們知道,當系統A將向訊息中介軟體傳送Commit指令後,它便去做別的事情了。如果此時訊息投遞失敗,需要回滾的話,就需要讓系統A事先提供回滾介面,這無疑增加了額外的開發成本,業務系統的複雜度也將提高。對於一個業務系統的設計目標是,在保證效能的前提下,最大限度地降低系統複雜度,從而能夠降低系統的運維成本。

————  如果不斷重試, 還是失敗了, 那麼就需要想想其他方法了,比如發通知然後人工介入啊等等。。

 

 

不知大家是否發現,上游系統A向訊息中介軟體提交Commit/Rollback訊息採用的是非同步方式,也就是當上遊系統提交完訊息後便可以去做別的事情,接下來提交、回滾就完全交給訊息中介軟體來完成,並且完全信任訊息中介軟體,認為它一定能正確地完成事務的提交或回滾。然而,訊息中介軟體向下遊系統投遞訊息的過程是同步的。也就是訊息中介軟體將訊息投遞給下游系統後,它會阻塞等待,等下游系統成功處理完任務返回確認應答後才取消阻塞等待。為什麼這兩者在設計上是不一致的呢?

首先,上游系統和訊息中介軟體之間採用非同步通訊是為了提高系統併發度。業務系統直接和使用者打交道,使用者體驗尤為重要,因此這種非同步通訊方式能夠極大程度地降低使用者等待時間。此外,非同步通訊相對於同步通訊而言,沒有了長時間的阻塞等待,因此係統的併發性也大大增加。但非同步通訊可能會引起Commit/Rollback指令丟失的問題,這就由訊息中介軟體的超時詢問機制來彌補。

那麼,訊息中介軟體和下游系統之間為什麼要採用同步通訊呢?

非同步能提升系統性能,但隨之會增加系統複雜度;而同步雖然降低系統併發度,但實現成本較低。因此,在對併發度要求不是很高的情況下,或者伺服器資源較為充裕的情況下,我們可以選擇同步來降低系統的複雜度。
我們知道,訊息中介軟體是一個獨立於業務系統的第三方中介軟體,它不和任何業務系統產生直接的耦合,它也不和使用者產生直接的關聯,它一般部署在獨立的伺服器叢集上,具有良好的可擴充套件性,所以不必太過於擔心它的效能,如果處理速度無法滿足我們的要求,可以增加機器來解決。而且,即使訊息中介軟體處理速度有一定的延遲那也是可以接受的,因為前面所介紹的BASE理論就告訴我們了,我們追求的是最終一致性,而非實時一致性,因此訊息中介軟體產生的時延導致事務短暫的不一致是可以接受的。
---------------------

 

為什麼可靠訊息屬於 非同步確保? 我們可以看到 系統A傳送commit、rollback 都是 非同步傳送的, 也就是直接傳送,但不獲取任何反饋結果。 也大概就是為什麼稱作 “非同步確保” 的原因吧!

 

示例

public TransactionSendResult sendMessageInTransaction(Message msg, LocalTransactionExecuter tranExecuter, Object arg) throws MQClientException {
        if (null == tranExecuter) {
            throw new MQClientException("tranExecutor is null", (Throwable)null);
        } else {
            Validators.checkMessage(msg, this.defaultMQProducer);
            SendResult sendResult = null;
            MessageAccessor.putProperty(msg, "TRAN_MSG", "true");
            MessageAccessor.putProperty(msg, "PGROUP", this.defaultMQProducer.getProducerGroup());

            try {
                //這裡執行第一次傳送訊息,也就是預傳送,並獲取sendResult,這裡包含msg的所有訊息
                sendResult = this.send(msg);
            } catch (Exception var10) {
                throw new MQClientException("send message Exception", var10);
            }

            LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
            Throwable localException = null;
            //根據預傳送訊息的狀態做不同的處理,這裡主要看SEND_OK
            switch(sendResult.getSendStatus()) {
            case SEND_OK:
                try {
                    if (sendResult.getTransactionId() != null) {
                        msg.putUserProperty("__transactionId__", sendResult.getTransactionId());
                    }

// 這裡做第二步,執行業務邏輯,即本地事物,
//具體的本地事物在LocalTransactionExecuter引數的實現類中,
//需要根據自己的業務邏輯去寫,下面的//tranExecuter.executeLocalTransactionBranch(msg, arg);會執行實
//現類中的executeLocalTransactionBranch業務。
                    localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);
                    if (null == localTransactionState) {
                        localTransactionState = LocalTransactionState.UNKNOW;
                    }

                    if (localTransactionState != LocalTransactionState.COMMIT_MESSAGE) {
                        this.log.info("executeLocalTransactionBranch return {}", localTransactionState);
                        this.log.info(msg.toString());
                    }
                } catch (Throwable var9) {
                    this.log.info("executeLocalTransactionBranch exception", var9);
                    this.log.info(msg.toString());
                    localException = var9;
                }
                break;
            case FLUSH_DISK_TIMEOUT:
            case FLUSH_SLAVE_TIMEOUT:
            case SLAVE_NOT_AVAILABLE:
                localTransactionState = LocalTransactionState.ROLLBACK_MESSAGE;
            }

            try {
// 這裡的方法,其中的localTransactionState是第二次執行業務邏輯的結果
//可以根據這個結果,知道本地事物執行的成功還是失敗。或者是異常localException,
//這樣可以根據第一次傳送訊息的結果sendResult,去修改mq中第一次傳送訊息的狀態,完成第三步操作。
                this.endTransaction(sendResult, localTransactionState, localException);
            } catch (Exception var8) {
                this.log.warn("local transaction execute " + localTransactionState + ", but end broker transaction failed", var8);
            }

            TransactionSendResult transactionSendResult = new TransactionSendResult();
            transactionSendResult.setSendStatus(sendResult.getSendStatus());
            transactionSendResult.setMessageQueue(sendResult.getMessageQueue());
            transactionSendResult.setMsgId(sendResult.getMsgId());
            transactionSendResult.setQueueOffset(sendResult.getQueueOffset());
            transactionSendResult.setTransactionId(sendResult.getTransactionId());
            transactionSendResult.setLocalTransactionState(localTransactionState);
            return transactionSendResult;
        }
    }

    
public void endTransaction(SendResult sendResult, LocalTransactionState localTransactionState, Throwable localException) throws RemotingException, MQBrokerException, InterruptedException, UnknownHostException {

// 獲取第一次傳送訊息的id
        MessageId id;
        if (sendResult.getOffsetMsgId() != null) {
            id = MessageDecoder.decodeMessageId(sendResult.getOffsetMsgId());
        } else {
            id = MessageDecoder.decodeMessageId(sendResult.getMsgId());
        }

    //獲取事物id
        String transactionId = sendResult.getTransactionId();
        String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
        EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
        requestHeader.setTransactionId(transactionId);
        requestHeader.setCommitLogOffset(id.getOffset());

       //根據本地事物執行狀態localTransactionState,告知mq修改狀態

        switch(localTransactionState) {
        case COMMIT_MESSAGE:
            requestHeader.setCommitOrRollback(Integer.valueOf(8));
            break;
        case ROLLBACK_MESSAGE:
            requestHeader.setCommitOrRollback(Integer.valueOf(12));
            break;
        case UNKNOW:
            requestHeader.setCommitOrRollback(Integer.valueOf(0));
        }

        requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
        requestHeader.setTranStateTableOffset(sendResult.getQueueOffset());
        requestHeader.setMsgId(sendResult.getMsgId());
        String remark = localException != null ? "executeLocalTransactionBranch exception: " + localException.toString() : null;
//具體執行第三步完成整個事務。      
  this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, remark, (long)this.defaultMQProducer.getSendMsgTimeout());
}
 
 
作者:時之令
連結:https://www.jianshu.com/p/8c997d0917c6
來源:簡書
簡書著作權歸作者所有,任何形式的轉載都請聯絡作者獲得授權並註明出處。
View Code

從程式碼看, 這個處理有些複雜,或許我們需要把rocketmq 的文件和 api 仔細看看。

 

參考:

作者:凌瀾星空
來源:CSDN
原文:https://blog.csdn.net/u010425776/article/details/79516298
版權宣告:本文為博主原創文章,轉載請附上博文連結!