1. 程式人生 > >RocketMq在閹割訊息回查checkTransactionState後實現分散式事務

RocketMq在閹割訊息回查checkTransactionState後實現分散式事務

利用rocketMQ解決分散式事務

在rocketMQ中生產者有三種角色 NormalProducer(普通)、OrderProducer(順序)、TransactionProducer(事務)
根據名字大概可以看出各個代表著什麼作用,我們這裡用 TransactionProducer(事務)來解決問題。

先舉個列子來說明下我們解決方案的設計方式吧:最經典的莫過於銀行轉賬了,網上到處都有,時序圖如下
這裡寫圖片描述

下面貼一下測試程式碼:

(1) 執行業務邏輯的部分

/**
 * @Date: Created in  2018/2/12 15:55
    執行本地事務
 */
public class
TransactionExecuterimpl implements LocalTransactionExecuter{
@Override public LocalTransactionState executeLocalTransactionBranch(final Message message, final Object o) { try{ //DB操作 應該帶上事務 service -> dao //如果資料操作失敗 需要回滾 同事返回RocketMQ一個失敗訊息 意味著 消費者無法消費到這條失敗的訊息 //如果成功 就要返回一個rocketMQ成功的訊息,意味著消費者將讀取到這條訊息
//o就是attachment //測試程式碼 if(new Random().nextInt(3) == 2){ int a = 1 / 0; } System.out.println(new Date()+"===> 本地事務執行成功,傳送確認訊息"); }catch (Exception e){ System.out.println(new Date()+"===> 本地事務執行失敗!!!"); return
LocalTransactionState.ROLLBACK_MESSAGE; } return LocalTransactionState.COMMIT_MESSAGE; } }

(2) 處理事務回查的程式碼部分

/**
 * @Date: Created in  2018/2/12 15:48
 * 未決事務,伺服器端回查客戶端
 */
public class TransactionCheckListenerImpl implements TransactionCheckListener {
    @Override
    public LocalTransactionState checkLocalTransactionState(MessageExt messageExt) {

        System.out.println("伺服器端回查事務訊息: "+messageExt.toString());
        //由於RocketMQ遲遲沒有收到訊息的確認訊息,因此主動詢問這條prepare訊息,是否正常?
        //可以查詢資料庫看這條資料是否已經處理

        return LocalTransactionState.COMMIT_MESSAGE;
    }
}

(3) 啟動生產者

/**
 * @Date: Created in  2018/2/12 15:24
 * 測試本地事務
 */
public class TestTransactionProducer {
    public static void main(String[] args){
        //事務回查監聽器
        TransactionCheckListenerImpl checkListener = new TransactionCheckListenerImpl();
        //事務訊息生產者
        TransactionMQProducer producer = new TransactionMQProducer("transactionProducerGroup");
        //MQ伺服器地址
        producer.setNamesrvAddr("192.168.56.105:9876;192.168.106:9876");
        //註冊事務回查監聽
        producer.setTransactionCheckListener(checkListener);
        //本地事務執行器
        TransactionExecuterimpl executerimpl = null;
        try {
            //啟動生產者
            producer.start();
            executerimpl = new TransactionExecuterimpl();
            Message msg1 = new Message("TransactionTopic", "tag", "KEY1", "hello RocketMQ 1".getBytes());
            Message msg2 = new Message("TransactionTopic", "tag", "KEY2", "hello RocketMQ 2".getBytes());

            SendResult sendResult = producer.sendMessageInTransaction(msg1, executerimpl, null);
            System.out.println(new Date() + "msg1"+sendResult);

            sendResult = producer.sendMessageInTransaction(msg1, executerimpl, null);
            System.out.println(new Date() + "msg2"+sendResult);

        } catch (MQClientException e) {
            e.printStackTrace();
        }
        producer.shutdown();
    }
}

(4) 消費之消費訊息

/**
 * @Date: Created in  2018/2/11 15:37
 */
public class TestConsumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("ConsumerGroup");
        consumer.setNamesrvAddr("192.168.56.105:9876;192.168.56.106:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //消費普通訊息
       // consumer.subscribe("TopicTest","*");
        //消費事務訊息
        consumer.subscribe("TransactionTopic","*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                for (MessageExt ext:msgs) {
                    try {
                        System.out.println(new Date() + new String(ext.getBody(),"UTF-8"));
                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                    }
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.println("Consumer Start............");
    }
}

重點來了:3.2.6之前的版本這樣寫就可以了,但是之後的版本被關於事務回查這個藉口被閹割了,不會在進行事務回查操作。
那麼第五步向MQ傳送訊息如果失敗的話,會造成A銀行扣款成功而B銀行收款未成功的資料不一致的情況

解決辦法

這裡只需要考慮本地事務執行成功後的情況(因為本地事務失敗不管確認訊息傳送成功與失敗MQ叢集都不會再發送訊息到消費者):

  1. 本地事務成功後宕機,確認訊息沒有發出,分散式事務只執行一半。
  2. 確認訊息COMMIT_MESSGE發出,但因網路不可達RocketMQ叢集沒收到。
  3. 確認訊息COMMIT_MESSGE發出,RocketMQ叢集收到COMMIT_MESSGE訊息,但rocketmq取消了回查機制,
    生產者還是不知道COMMIT_MESSGE發出是否成功。

上面三種情況的本質是一樣的,就是生產者本地事務成功後,COMMIT_MESSGE訊息是否送達rocketmq叢集;所以可以看做同一種情況.

下面是按key值查詢事務成功提交COMMIT_MESSGE訊息後的返回資訊:

QueryResult 
[indexLastUpdateTimestamp=1516002830440,
    messageList=[
        MessageExt [queueId=1, storeSize=291, queueOffset=0, sysFlag=4, bornTimestamp=1516002831147, bornHost=/192.168.88.1:6313, storeTimestamp=1516002830323, storeHost=/192.168.88.133:10911, msgId=C0A8588500002A9F0000000000000246, commitLogOffset=582, bodyCRC=1229495611, reconsumeTimes=0, preparedTransactionOffset=0, toString()=
            Message 
            [topic=pay, flag=0, properties=
                {KEYS=5cf5dd03-5811-4b7f-b97c-186598e6d08b, TRAN_MSG=true, UNIQ_KEY=C0A8080B2080085EDE7B4B824F2A0000, PGROUP=transaction-pay, TAGS=tag}, 
            body=67]], 

        MessageExt [queueId=1, storeSize=291, queueOffset=0, sysFlag=8, bornTimestamp=1516002831147, bornHost=/192.168.88.1:6313, storeTimestamp=1516002830440, storeHost=/192.168.88.133:10911, msgId=C0A8588500002A9F0000000000000369, commitLogOffset=873, bodyCRC=1229495611, reconsumeTimes=0, preparedTransactionOffset=582, toString()=
            Message 
            [topic=pay, flag=0, properties=
                {KEYS=5cf5dd03-5811-4b7f-b97c-186598e6d08b, TRAN_MSG=true, UNIQ_KEY=C0A8080B2080085EDE7B4B824F2A0000, PGROUP=transaction-pay, TAGS=tag},
            body=67]]
    ]
]

共返回兩條訊息:兩條訊息中大部分資料是一樣的,但sysFlagstoreTimestampmsgIdcommitLogOffsetpreparedTransactionOffset欄位是不一樣的:其中第1條為prepared傳送的訊息,第2條只有在提交COMMIT_MESSGE訊息成功後產生。

注意sysFlagpreparedTransactionOffset欄位與prepared訊息的區別,當提交COMMIT_MESSGE訊息成功後,推測MQ叢集做了如下動作:1. 讀取prepared訊息,修改sysFlagpreparedTransactionOffset值,2. 在存入commitlog日誌檔案,設定consumerqueue序列;因為當作一條新的訊息處理,所以toreTimestampmsgIdcommitLogOffset欄位自然也就變了。所以按照發送的prepared訊息的返回結果顯示的msgId檢視sysFlag狀態只是prepared訊息的sysFlag狀態,RocketMQ4.2版本的話要用key值去查詢,才能檢視事務提交成功的訊息標誌sysFlag=8
下面是按key值查詢事務失敗提交ROLLBACK_MESSAGE訊息後的返回資訊:

QueryResult 
[indexLastUpdateTimestamp=1516002830440,
    messageList=[
        MessageExt [queueId=1, storeSize=291, queueOffset=0, sysFlag=4, bornTimestamp=1516002831147, bornHost=/192.168.88.1:6313, storeTimestamp=1516002830323, storeHost=/192.168.88.133:10911, msgId=C0A8588500002A9F0000000000000246, commitLogOffset=582, bodyCRC=1229495611, reconsumeTimes=0, preparedTransactionOffset=0, toString()=
            Message 
            [topic=pay, flag=0, properties=
                {KEYS=5cf5dd03-5811-4b7f-b97c-186598e6d08b, TRAN_MSG=true, UNIQ_KEY=C0A8080B2080085EDE7B4B824F2A0000, PGROUP=transaction-pay, TAGS=tag}, 
            body=67]]
    ]
]

由此我們可以根據syyFlag判斷我們在提交事務以後, 訊息傳送是否成功

下面是邏輯分析圖

這裡寫圖片描述

一、在執行本地事務commit前向回查表插入訊息的KEY值。

二、在生產者叢集上設定一個定時任務(根據自身分散式事務流程執行的時間設定)。

  1. 從回查表獲取CONFIRM為0的記錄列表,從記錄列表中獲取COUNT為3的記錄,當count列達到指定閥值(假定是3)時:
    此時記錄的COUNT為3,如果CONFIRM還是為0,那麼說明對此事務的回查次數為3,但RocketMQ叢集還未收到COMMIT_MESSAGE訊息,說明發送COMMIT_MESSAGE訊息失敗,但本地事務已經執行成功,那麼必須要重發與此條記錄中KEY值相對應的Perpared訊息的確認訊息。根據KEY值向MQ叢集查詢訊息,根據獲取的訊息重新用同步的方式傳送此條訊息到MQ叢集,並更新此記錄的CONFIRM為1,COUNT+1
  2. 根據第1步獲取的記錄列表,取出CONFIRM為0且COUNT小於3的記錄,根據KEY值向MQ叢集查詢訊息。
  3. 根據第2步獲取的訊息判斷是否是sysFlag為8的訊息;如果是,更新回查表對應KEY記錄的CONFIRM為1,COUNT為count+1,如果不是,更新回查表對應KEY記錄的COUNT為count+1。

這裡是用COUNT來避免剛剛傳送訊息就開始判斷sysFlag的情況,相當於預留了一點rabbitmq的訊息傳送延遲時間,這個也可以使用CREATE_TIME來代替,比如判斷記錄列表中CREATE_TIME在當前時間前3s以前,未成功傳送的訊息

這隻能算其中一種,還有一些手動從生產者和消費者兩端進行資料庫查詢的方法,有興趣的自己可以去了解