1. 程式人生 > >RocketMQ入門到入土(二)事務訊息&順序訊息

RocketMQ入門到入土(二)事務訊息&順序訊息

接上一篇:RocketMQ入門到入土(一)新手也能看懂的原理和實戰!

一、事務訊息的由來

1、案例

引用官方的購物案例:

小明購買一個100元的東西,賬戶扣款100元的同時需要保證在下游的積分系統給小明這個賬號增加100積分。賬號系統和積分系統是兩個獨立是系統,一個要減少100元,一個要增加100積分。如下圖:

 

2、問題

  • 賬號服務扣款成功了,通知積分系統也成功了,但是積分增加的時候失敗了,資料不一致了。

  • 賬號服務扣款成功了,但是通知積分系統失敗了,所以積分不會增加,資料不一致了。

3、方案

RocketMQ針對第一個問題解決方案是:如果消費失敗了,是會自動重試的,如果重試幾次後還是消費失敗,那麼這種情況就需要人工解決了,比如放到死信佇列裡然後手動查原因進行處理等。

RocketMQ針對第二個問題解決方案是:如果你扣款成功了,但是往mq寫訊息的時候失敗了,那麼RocketMQ會進行回滾訊息的操作,這時候我們也能回滾我們扣款的操作。

二、事務訊息的原理

1、原理圖解

 

2、詳細過程

1.Producer傳送半訊息(Half Message)到broker。

我真想吐槽一句為啥叫半訊息,難以理解,其實這就是prepare message,預傳送訊息。

  • Half Message傳送成功後開始執行本地事務。

  • 如果本地事務執行成功的話則返回commit,如果執行失敗則返回rollback。(這個是在事務訊息的回撥方法裡由開發者自己決定commit or rollback)

Producer傳送上一步的commit還是rollback到broker,這裡有兩種情況:

1.如果broker收到了commit/rollback訊息 :

  • 如果收到了commit,則broker認為整個事務是沒問題的,執行成功的。那麼會下發訊息給Consumer端消費。

  • 如果收到了rollback,則broker認為本地事務執行失敗了,broker將會刪除Half Message,不下發給Consumer端。

2.如果broker未收到訊息(如果執行本地事務突然宕機了,相當本地事務執行結果返回unknow,則和broker未收到確認訊息的情況一樣處理。):

  • broker會定時回查本地事務的執行結果:如果回查結果是本地事務已經執行則返回commit,若未執行,則返回rollback。

  • Producer端回查的結果傳送給Broker。Broker接收到的如果是commit,則broker視為整個事務執行成功,如果是rollback,則broker視為本地事務執行失敗,broker刪除Half Message,不下發給consumer。如果broker未接收到回查的結果(或者查到的是unknow),則broker會定時進行重複回查,以確保查到最終的事務結果。重複回查的時間間隔和次數都可配。

三、事務訊息實現流程

1、實現流程

簡單來看就是:事務訊息是個監聽器,有回撥函式,回撥函式裡我們進行業務邏輯的操作,比如給賬戶-100元,然後發訊息到積分的mq裡,這時候如果賬戶-100成功了,且傳送到mq成功了,則設定訊息狀態為commit,這時候broker會將這個半訊息傳送到真正的topic中。一開始傳送他是存到半訊息佇列裡的,並沒存在真實topic的佇列裡。只有確認commit後才會轉移。

2、補救方案

如果事務因為中斷,或是其他的網路原因,導致無法立即響應的,RocketMQ當做UNKNOW處理,RocketMQ事務訊息還提供了一個補救方案:定時查詢事務訊息的事務狀態。這也是一個回撥函式,這裡面可以做補償,補償邏輯開發者自己寫,成功的話自己返回commit就完事了。

四、事務訊息程式碼例項

1、程式碼

package com.chentongwei.mq.rocketmq;

import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.Date;

/**
 * Description:
 *
 * @author TongWei.Chen 2020-06-21 11:32:58
 */
public class ProducerTransaction2 {
    public static void main(String[] args) throws Exception {
        TransactionMQProducer producer = new TransactionMQProducer("my-transaction-producer");
        producer.setNamesrvAddr("124.57.180.156:9876");

        // 回撥
        producer.setTransactionListener(new TransactionListener() {
            @Override
            public LocalTransactionState executeLocalTransaction(Message message, Object arg) {
                LocalTransactionState state = null;
                //msg-4返回COMMIT_MESSAGE
                if(message.getKeys().equals("msg-1")){
                    state = LocalTransactionState.COMMIT_MESSAGE;
                }
                //msg-5返回ROLLBACK_MESSAGE
                else if(message.getKeys().equals("msg-2")){
                    state = LocalTransactionState.ROLLBACK_MESSAGE;
                }else{
                    //這裡返回unknown的目的是模擬執行本地事務突然宕機的情況(或者本地執行成功傳送確認訊息失敗的場景)
                    state = LocalTransactionState.UNKNOW;
                }
                System.out.println(message.getKeys() + ",state:" + state);
                return state;
            }

            /**
             * 事務訊息的回查方法
             */
            @Override
            public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
                if (null != messageExt.getKeys()) {
                    switch (messageExt.getKeys()) {
                        case "msg-3":
                            System.out.println("msg-3 unknow");
                            return LocalTransactionState.UNKNOW;
                        case "msg-4":
                            System.out.println("msg-4 COMMIT_MESSAGE");
                            return LocalTransactionState.COMMIT_MESSAGE;
                        case "msg-5":
                            //查詢到本地事務執行失敗,需要回滾訊息。
                            System.out.println("msg-5 ROLLBACK_MESSAGE");
                            return LocalTransactionState.ROLLBACK_MESSAGE;
                    }
                }
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        });

        producer.start();

        //模擬傳送5條訊息
        for (int i = 1; i < 6; i++) {
            try {
                Message msg = new Message("transactionTopic", null, "msg-" + i, ("測試,這是事務訊息! " + i).getBytes());
                producer.sendMessageInTransaction(msg, null);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

 

2、結果

msg-1,state:COMMIT_MESSAGE
msg-2,state:ROLLBACK_MESSAGE
msg-3,state:UNKNOW
msg-4,state:UNKNOW
msg-5,state:UNKNOW

msg-3 unknow
msg-3 unknow
msg-5 ROLLBACK_MESSAGE
msg-4 COMMIT_MESSAGE

msg-3 unknow
msg-3 unknow
msg-3 unknow
msg-3 unknow

3、管控臺

4、結果分析

  • 只有msg-1和msg-4傳送成功了。msg-4在msg-1前面是因為msg-1先成功的,msg-4是回查才成功的。按時間倒序來的。

  • 先來輸出五個結果,對應五條訊息

msg-1,state:COMMIT_MESSAGE
msg-2,state:ROLLBACK_MESSAGE
msg-3,state:UNKNOW
msg-4,state:UNKNOW
msg-5,state:UNKNOW

  • 然後進入了回查,msg-3還是unknow,msg-5回滾了,msg-4提交了事務。所以這時候msg-4在管控臺裡能看到了。

  • 過了一段時間再次回查msg-3,發現還是unknow,所以一直回查。

回查的時間間隔和次數都是可配的,預設是回查15次還失敗的話就會把這個訊息丟掉了。

五、疑問

疑問:Spring事務、常規的分散式事務不行嗎?Rocketmq的事務是否多此一舉了呢?

MQ用於解耦,之前是分散式事務直接操作了賬號系統和積分系統。但是他兩就是強耦合的存在,如果中間插了個mq,賬號系統操作完發訊息到mq,這時候只要保證傳送成功就提交,傳送失敗則回滾,這步怎麼保證,就是靠事務了。而且用RocketMQ做分散式事務的也蠻多的。

六、順序訊息解釋

1、概述

RocketMQ的訊息是儲存到Topic的queue裡面的,queue本身是FIFO(First Int First Out)先進先出佇列。所以單個queue是可以保證有序性的。

但問題是1個topic有N個queue,作者這麼設計的好處也很明顯,天然支援叢集和負載均衡的特性,將海量資料均勻分配到各個queue上,你發了10條訊息到同一個topic上,這10條訊息會自動分散在topic下的所有queue中,所以消費的時候不一定是先消費哪個queue,後消費哪個queue,這就導致了無序消費。

2、圖解

 

3、再次分析

一個Producer傳送了m1、m2、m3、m4四條訊息到topic上,topic有四個佇列,由於自帶的負載均衡策略,四個佇列上分別儲存了一條訊息。queue1上儲存的m1,queue2上儲存的m2,queue3上儲存的m3,queue4上儲存的m4,Consumer消費的時候是多執行緒消費,所以他無法保證先消費哪個佇列或者哪個訊息,比如傳送的時候順序是m1,m2,m3,m4,但是消費的時候由於Consumer內部是多執行緒消費的,所以可能先消費了queue4佇列上的m4,然後才是m1,這就導致了無序。

七、順序訊息解決方案

1、方案一

很簡單,問題產生的關鍵在於多個佇列都有訊息,我消費的時候又不知道哪個佇列的訊息是最新的。那麼思路就有了,發訊息的時候你要想保證有序性的話,就都給我發到一個queue上,然後消費的時候因為只有那一個queue上有訊息且queue是FIFO,先進先出,所以正常消費就完了。

很完美。而且RocketMQ也給我們提供了這種發訊息的時候選擇queue的api(MessageQueueSelector)。直接上程式碼。

2、程式碼一

2.1、生產者

import java.util.List;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;

/**
 * 訊息傳送者
 */
public class Producer5 {
    public static void main(String[] args)throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("my-order-producer");
        producer.setNamesrvAddr("124.57.180.156:9876");     
        producer.start();
        for (int i = 0; i < 5; i++) {
            Message message = new Message("orderTopic", ("hello!" + i).getBytes());
            producer.send(
                    // 要發的那條訊息
                    message,
                    // queue 選擇器 ,向 topic中的哪個queue去寫訊息
                    new MessageQueueSelector() {
                        // 手動 選擇一個queue
                        @Override
                        public MessageQueue select(
                                // 當前topic 裡面包含的所有queue
                                List<MessageQueue> mqs, 
                                // 具體要發的那條訊息
                                Message msg,
                                // 對應到 send() 裡的 args,也就是2000前面的那個0
                                // 實際業務中可以把0換成實際業務系統的主鍵,比如訂單號啥的,然後這裡做hash進行選擇queue等。能做的事情很多,我這裡做演示就用第一個queue,所以不用arg。
                                Object arg) {
                            // 向固定的一個queue裡寫訊息,比如這裡就是向第一個queue裡寫訊息
                            MessageQueue queue = mqs.get(0);
                            // 選好的queue
                            return queue;
                        }
                    },
                    // 自定義引數:0
                    // 2000代表2000毫秒超時時間
                    0, 2000);
        }
    }
}

 

2.2、消費者

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.*;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * Description:
 *
 * @author TongWei.Chen 2020-06-22 11:17:47
 */
public class ConsumerOrder {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my-consumer");
        consumer.setNamesrvAddr("124.57.180.156:9876");
        consumer.subscribe("orderTopic", "*");
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println(new String(msg.getBody()) + " Thread:" + Thread.currentThread().getName() + " queueid:" + msg.getQueueId());
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        consumer.start();
        System.out.println("Consumer start...");
    }
}

 

2.3、輸出結果

Consumer start...
hello!0 Thread:ConsumeMessageThread_1 queueid:0
hello!1 Thread:ConsumeMessageThread_1 queueid:0
hello!2 Thread:ConsumeMessageThread_1 queueid:0
hello!3 Thread:ConsumeMessageThread_1 queueid:0
hello!4 Thread:ConsumeMessageThread_1 queueid:0

很完美,有序輸出!

3、情況二

比如你新需求:把未支付的訂單都放到queue1裡,已支付的訂單都放到queue2裡,支付異常的訂單都放到queue3裡,然後你消費的時候要保證每個queue是有序的,不能消費queue1一條直接跑到queue2去了,要逐個queue去消費。

這時候思路是發訊息的時候利用自定義引數arg,訊息體裡肯定包含支付狀態,判斷是未支付的則選擇queue1,以此類推。這樣就保證了每個queue裡只包含同等狀態的訊息。那麼消費者目前是多執行緒消費的,肯定亂序。三個queue隨機消費。解決方案更簡單,直接將消費端的執行緒數改為1個,這樣佇列是FIFO,他就逐個消費了。RocketMQ也為我們提供了這樣的api,如下兩句:

// 最大執行緒數1
consumer.setConsumeThreadMax(1);
// 最小執行緒數
consumer.setConsumeThreadMin(1);

&n