1. 程式人生 > >rocketmq-producer之傳送事物訊息

rocketmq-producer之傳送事物訊息

rocketmq支援普通訊息、順序訊息,此外,還支援事物訊息。實現方式是將一個大事務拆分成多個小事物非同步執行,事物訊息在其中起著橋樑作用。

這裡寫圖片描述

rocketmq在傳送事物訊息時,會先發送一個prepared訊息,返回訊息所在地址。然後再執行本地事物,根據事物執行結果去更新prepared訊息狀態。訊息接收者只能消費訊息叢集中訊息狀態為已提交的訊息。

事物訊息demo:

TransactionMQProducer producer = new TransactionMQProducer("unique_group_name");
TransactionCheckListener transactionCheckListener =
new TransactionCheckListenerImpl(); producer.setTransactionCheckListener(transactionCheckListener); TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl(); producer.start(); Message msg =new Message(......); SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null
);

當確認訊息傳送失敗時,rocketmq會定期掃描訊息叢集中的事物訊息,如果發現了Prepared訊息,它會向訊息傳送者確認,如果此時事物執行成功了是回滾還是繼續傳送確認訊息?rocketmq會呼叫TransactionCheckListener 的實現類來做出相應的操作。

TransactionExecuterImpl –>本地事物的處理邏輯

傳送事物訊息原始碼(虛擬碼)

public TransactionSendResult sendMessageInTransaction(.....)  {
    // 1.傳送訊息
    SendResult sendResult = this
.send(msg); LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW; switch (sendResult.getSendStatus()) { case SEND_OK: { // 2.如果訊息傳送成功,處理與訊息關聯的本地事務單元 localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg); } // 3.結束事務 this.endTransaction(sendResult, localTransactionState, localException); }

本地事物狀態LocalTransactionState:
COMMIT_MESSAGE,
ROLLBACK_MESSAGE,
UNKNOW

 public void endTransaction(SendResult sendResult, LocalTransactionState localTransactionState, ......){

       final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
       EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();

       switch (localTransactionState) {
            case COMMIT_MESSAGE:
                requestHeader.setCommitOrRollback(MessageSysFlag.TransactionCommitType);
                break;
            case ROLLBACK_MESSAGE:
                requestHeader.setCommitOrRollback(MessageSysFlag.TransactionRollbackType);
                break;
            case UNKNOW:
                requestHeader.setCommitOrRollback(MessageSysFlag.TransactionNotType);
                break;
            default:
                break;
        }

        this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, ......);
    }

endTransaction從sendResult中獲取訊息地址,將請求傳送至broker,根據localTransactionState更新事物訊息的最終狀態。
如果endTransaction方法執行失敗,導致資料沒有傳送到broker,broker會有回查執行緒定時(預設1分鐘)掃描每個儲存事務狀態的表格檔案,如果是已經提交或者回滾的訊息直接跳過,如果是TransactionNotType狀態則會向Producer發起CheckTransaction請求,Producer會呼叫DefaultMQProducerImpl.checkTransactionState()方法來處理broker的定時回撥請求,而checkTransactionState會呼叫我們的事務設定的決斷方法,最後呼叫endTransactionOneway讓broker來更新訊息的最終狀態。

傳送事物訊息時,groupname必須設定,回查時會根據group隨機選擇一臺producer。