rocketmq-producer之傳送事物訊息
rocketmq支援普通訊息、順序訊息,此外,還支援事物訊息。實現方式是將一個大事務拆分成多個小事物非同步執行,事物訊息在其中起著橋樑作用。
rocketmq在傳送事物訊息時,會先發送一個prepared訊息,返回訊息所在地址。然後再執行本地事物,根據事物執行結果去更新prepared訊息狀態。訊息接收者只能消費訊息叢集中訊息狀態為已提交的訊息。
事物訊息demo:
TransactionMQProducer producer = new TransactionMQProducer("unique_group_name");
TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();
producer.setTransactionCheckListener(transactionCheckListener);
TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();
producer.start();
Message msg =new Message(......);
SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null );
當確認訊息傳送失敗時,rocketmq會定期掃描訊息叢集中的事物訊息,如果發現了Prepared訊息,它會向訊息傳送者確認,如果此時事物執行成功了是回滾還是繼續傳送確認訊息?rocketmq會呼叫TransactionCheckListener 的實現類來做出相應的操作。
TransactionExecuterImpl –>本地事物的處理邏輯
傳送事物訊息原始碼(虛擬碼)
public TransactionSendResult sendMessageInTransaction(.....) {
// 1.傳送訊息
SendResult sendResult = this .send(msg);
LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW;
switch (sendResult.getSendStatus()) {
case SEND_OK: {
// 2.如果訊息傳送成功,處理與訊息關聯的本地事務單元
localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);
}
// 3.結束事務
this.endTransaction(sendResult, localTransactionState, localException);
}
本地事物狀態LocalTransactionState:
COMMIT_MESSAGE,
ROLLBACK_MESSAGE,
UNKNOW
public void endTransaction(SendResult sendResult, LocalTransactionState localTransactionState, ......){
final String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(sendResult.getMessageQueue().getBrokerName());
EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
switch (localTransactionState) {
case COMMIT_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TransactionCommitType);
break;
case ROLLBACK_MESSAGE:
requestHeader.setCommitOrRollback(MessageSysFlag.TransactionRollbackType);
break;
case UNKNOW:
requestHeader.setCommitOrRollback(MessageSysFlag.TransactionNotType);
break;
default:
break;
}
this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, requestHeader, ......);
}
endTransaction從sendResult中獲取訊息地址,將請求傳送至broker,根據localTransactionState更新事物訊息的最終狀態。
如果endTransaction方法執行失敗,導致資料沒有傳送到broker,broker會有回查執行緒定時(預設1分鐘)掃描每個儲存事務狀態的表格檔案,如果是已經提交或者回滾的訊息直接跳過,如果是TransactionNotType狀態則會向Producer發起CheckTransaction請求,Producer會呼叫DefaultMQProducerImpl.checkTransactionState()方法來處理broker的定時回撥請求,而checkTransactionState會呼叫我們的事務設定的決斷方法,最後呼叫endTransactionOneway讓broker來更新訊息的最終狀態。
傳送事物訊息時,groupname必須設定,回查時會根據group隨機選擇一臺producer。