1. 程式人生 > >手把手帶你瞭解訊息中介軟體(3)——RocketMQ

手把手帶你瞭解訊息中介軟體(3)——RocketMQ

一、RocketMQ簡介

  RocketMQ作為一款純java、分散式、佇列模型的開源訊息中介軟體,支援事務訊息、順序訊息、批量訊息、定時訊息、訊息回溯等。

二、RocketMQ架構


  如圖所示為RocketMQ基本的部署結構,主要分為NameServer叢集、Broker叢集、Producer叢集和Consumer叢集四個部分。

  Broker在啟動的時候會去向NameServer註冊並且定時傳送心跳,Producer在啟動的時候會到NameServer上去拉取Topic所屬的Broker具體地址,然後向具體的Broker傳送訊息

1、NameServer

  NameServer的作用是Broker的註冊中心。

  每個NameServer節點互相之間是獨立的,沒有任何資訊互動,也就不存在任何的選主或者主從切換之類的問題,因此NameServer是很輕量級的。單個NameServer節點中儲存了活躍的Broker列表(包括master和slave),這裡活躍的定義是與NameServer保持有心跳。

2、Topic、Tag、Queue、GroupName

  Topic 與 Tag 都是業務上用來歸類的標識,區分在於 Topic 是一級分類,而 Tag 可以理解為是二級分類

1) Topic(話題)

  Topic是生產者在傳送訊息和消費者在拉取訊息的類別。Topic與生產者和消費者之間的關係非常鬆散。一個生產者可以傳送不同型別Topic的訊息。消費者組可以訂閱一個或多個主題,只要該組的例項保持其訂閱一致即可。

  我們可以理解為Topic是第一級訊息型別,比如一個電商系統的訊息可以分為:交易訊息、物流訊息等,一條訊息必須有一個Topic。

2) Tag(標籤)

  意思就是子主題,為使用者提供了額外的靈活性。有了標籤,方便RocketMQ提供的查詢功能。

  可以理解為第二級訊息型別,交易建立訊息,交易完成訊息..... 一條訊息可以沒有Tag

3) Queue(佇列)

  一個topic下,可以設定多個queue(訊息佇列),預設4個佇列。當我們傳送訊息時,需要要指定該訊息的topic。

  RocketMQ會輪詢該topic下的所有佇列,將訊息傳送出去。

  在 RocketMQ 中,所有訊息佇列都是持久化,長度無限的資料結構,所謂長度無限是指佇列中的每個儲存單元都是定長,訪問其中的儲存單元使用 Offset 來訪問,offset 為 java long 型別,64 位,理論上在 100年內不會溢位,所以認為是長度無限。

  也可以認為 Message Queue 是一個長度無限的陣列,Offset 就是下標。

4) groupName(組名稱)

  RocketMQ中也有組的概念。代表具有相同角色的生產者組合或消費者組合,稱為生產者組或消費者組。

  作用是在叢集HA的情況下,一個生產者down之後,本地事務回滾後,可以繼續聯絡該組下的另外一個生產者例項,不至於導致業務走不下去。在消費者組中,可以實現訊息消費的負載均衡和訊息容錯目標。

  有了GroupName,在叢集下,動態擴充套件容量很方便。只需要在新加的機器中,配置相同的GroupName。啟動後,就立即能加入到所在的群組中,參與訊息生產或消費。

3、Broker-存放訊息

  Broker是具體提供業務的伺服器,單個Broker節點與所有的NameServer節點保持長連線及心跳,定時(每隔30s)註冊Topic資訊到所有Name Server。Name Server定時(每隔10s)掃描所有存活broker的連線,如果Name Server超過2分鐘沒有收到心跳,則Name Server斷開與Broker的連線。底層的通訊和連線都是基於Netty實現的。

  負載均衡:Broker上存Topic資訊,Topic由多個佇列組成,佇列會平均分散在多個Broker上,會自動輪詢當前所有可傳送的broker ,儘量平均分佈到所有佇列中,最終效果就是所有訊息都平均落在每個Broker上。

  高可用:Broker中分master和slave兩種角色,每個master可以對應多個slave,但一個slave只能對應一個master,master和slave通過指定相同的Brokername組成,其中不同的BrokerId==0 是master,非0是slave。

  高可靠併發讀寫服務:master和slave之間的同步方式分為同步雙寫和非同步複製,非同步複製方式master和slave之間雖然會存在少量的延遲,但效能較同步雙寫方式要高出10%左右。

Topic、Broker、queue三者間的關係

4、Producer-生產訊息

1) 與nameserver的關係

  單個Producer和一臺NameServer節點(隨機選擇)保持長連線,定時查詢topic配置資訊,如果該NameServer掛掉,生產者會自動連線下一個NameServer,直到有可用連線為止,並能自動重連。與NameServer之間沒有心跳。

2) 與broker的關係

  單個Producer和與其關聯的所有broker保持長連線,並維持心跳。預設情況下訊息傳送採用輪詢方式,會均勻發到對應Topic的所有queue中。

5、Consumer-消費訊息

1) 與nameserver的關係

  單個Consumer和一臺NameServer保持長連線,定時查詢topic配置資訊,如果該NameServer掛掉,消費者會自動連線下一個NameServer,直到有可用連線為止,並能自動重連。與NameServer之間沒有心跳。

2) 與broker的關係

  單個Consumer和與其關聯的所有broker保持長連線,並維持心跳,失去心跳後,則關閉連線,並向該消費者分組的所有消費者發出通知,分組內消費者重新分配佇列繼續消費。

5.1 消費者型別
  • 1) pull consume
      Consumer 的一種,應用通常通過 Consumer 物件註冊一個 Listener 介面,一旦收到訊息,Consumer 物件立刻回撥 Listener 介面方法,類似於activemq的方式
  • 2) push consume
      Consumer 的一種,應用通常主動呼叫 Consumer 的拉訊息方法從 Broker 拉訊息,主動權由應用控制
5.2 消費模式
  • 1) 叢集模式

  在預設情況下,就是叢集消費,此時訊息發出去後將只有一個消費者能獲取訊息。

  • 2) 廣播模式

  廣播消費,一條訊息被多個Consumer消費。訊息會發給Consume Group中的所有消費者進行消費。

三、RocketMQ的特性

1、訊息順序

  訊息的順序指的是訊息消費時,能按照發送的順序來消費。

  RocketMQ是通過將“相同ID的訊息傳送到同一個佇列,而一個佇列的訊息只由一個消費者處理“來實現順序訊息

2、訊息重複

1) 訊息重複的原因

  訊息領域有一個對訊息投遞的QoS(服務質量)定義,分為:最多一次(At most once)、至少一次(At least once)、僅一次( Exactly once)。

  MQ產品都聲稱自己做到了At least once。既然是至少一次,就有可能發生訊息重複。

  有很多原因導致,比如:網路原因閃斷,ACK返回失敗等等故障,確認資訊沒有傳送到訊息佇列,導致訊息佇列不知道自己已經消費過該訊息了,再次將該訊息分發給其他的消費者

  不同的訊息佇列傳送的確認資訊形式不同:RocketMQ返回一個CONSUME_SUCCESS成功標誌,RabbitMQ是傳送一個ACK確認訊息

2) 訊息去重
  • 1) 去重原則:使用業務端邏輯保持冪等性

  冪等性:就是使用者對於同一操作發起的一次請求或者多次請求的結果是一致的,不會因為多次點選而產生了副作用,資料庫的結果都是唯一的,不可變的。

  • 2) 只要保持冪等性,不管來多少條重複訊息,最後處理的結果都一樣,需要業務端來實現。

  去重策略:保證每條訊息都有唯一編號(比如唯一流水號),且保證訊息處理成功與去重表的日誌同時出現。

四、RocketMQ的應用場景

1、削峰填谷

  比如如秒殺等大型活動時會帶來較高的流量脈衝,如果沒做相應的保護,將導致系統超負荷甚至崩潰。如果因限制太過導致請求大量失敗而影響使用者體驗,可以利用MQ 超高效能的訊息處理能力來解決。

2、非同步解耦

  通過上、下游業務系統的鬆耦合設計,比如:交易系統的下游子系統(如積分等)出現不可用甚至宕機,都不會影響到核心交易系統的正常運轉。

3、順序訊息

  FIFO原理類似,MQ提供的順序訊息即保證訊息的先進先出,可以應用於交易系統中的訂單建立、支付、退款等流程。

4、分散式事務訊息

  比如阿里的交易系統、支付紅包等場景需要確保資料的最終一致性,需要引入 MQ 的分散式事務,既實現了系統之間的解耦,又可以保證最終的資料一致性。

五、RocketMQ叢集部署方式

1、單Mater模式

  優點:配置簡單,方便部署

  缺點:風險較大,一旦Broker重啟或者宕機,會導致整個服務不可用

2、多Master模式

  一個叢集無 Slave,全是 Master,例如 2 個 Master 或者 3 個 Master

  優點:配置簡單,單個Master宕機重啟對應用沒有影響。訊息不會丟失

  缺點:單臺機器宕機期間,這臺機器上沒有被消費的訊息在恢復之前不可訂閱,訊息實時性會受到影響。

3、多Master多Slave模式(非同步)

  每個Master配置一個Slave,採用非同步複製方式,主備有短暫訊息延遲

  優點:因為Master 宕機後,消費者仍然可以從 Slave消費,此過程對應用透明。不需要人工干預。效能同多 Master 模式幾乎一樣。

  缺點:Master宕機後,會丟失少量資訊

4、多Master多Slave模式(同步)

  每個Master配置一個Slave,採用同步雙寫方式,只有主和備都寫成功,才返回成功

  優點:資料與服務都無單點, Master宕機情況下,訊息無延遲,服務可用性與資料可用性都非常高

  缺點:效能比非同步複製模式略低,大約低 10%左右,傳送單個訊息的 RT會略高。目前主宕機後,備機不能自動切換為主機,後續會支援自動切換功能

六、RocketMQ的訊息型別

訊息傳送步驟:

訊息消費步驟:


  建立一個maven工程,匯入依賴

    <dependencies>
        <!--rocket-->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.4.0</version>
        </dependency>
        <dependency>
       <!--順序訊息中,模擬了一個訊息集合,加入了lombok-->
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.22</version>
        </dependency>
    </dependencies>

1、普通訊息


點選檢視生產者程式碼

/**
 * 普通訊息生產者
 */
public class Producer {

    public static void main(String[] args) throws Exception {
//        建立一個訊息傳送入口物件,主要用於訊息傳送,指定生產者組
        DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
//        設定NameServe地址,如果是叢集環境,用分號隔開
        producer.setNamesrvAddr("127.0.0.1:9876");
//        啟動並建立訊息傳送元件
        producer.start();
//      topic的名字
        String topic = "rocketDemo1";
//      標籤名
        String taget = "tag";
//      要傳送的資料
        String body = "hello,RocketMq";
        Message message = new Message(topic,taget,body.getBytes());
        // 傳送訊息
        SendResult result = producer.send(message);
        System.out.println(result);
//        關閉訊息傳送物件
        producer.shutdown();
    }
}


點選檢視消費者程式碼

/**
 * 普通訊息消費者
 */
public class Consumer {

    public static void main(String[] args) throws Exception {
//        建立一個消費管理物件,並建立消費者組名字
        DefaultMQPushConsumer consumerGroup = new DefaultMQPushConsumer("ConsumerGroup");
//        設定NameServer地址,如果是叢集環境,用逗號分隔
        consumerGroup.setNamesrvAddr("127.0.0.1:9876");
//        設定要讀取的訊息主題和標籤
        consumerGroup.subscribe("rocketDemo1", "*");
//      設定回撥函式,處理訊息
        //注意:MessageListenerConcurrently     -- 並行消費監聽
        consumerGroup.registerMessageListener(new MessageListenerConcurrently() {
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                try {
                    //讀取訊息記錄
                    for (MessageExt messageExt : msgs) {
                        //獲取訊息主題
                        String topic = messageExt.getTopic();
                        //獲取訊息標籤
                        String tags = messageExt.getTags();
                        //獲取訊息體內容
                        String body = new String(messageExt.getBody(), "UTF-8");
                        System.out.println("topic:" + topic + ",tags:" + tags + ",body:" + body);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
                //返回消費成功
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
//      執行訊息消費物件
        consumerGroup.start();
    }
}

2、順序訊息

  訊息有序指的是可以按照訊息的傳送順序來消費。RocketMQ是通過將“相同ID的訊息傳送到同一個佇列,而一個佇列的訊息只由一個消費者處理“來實現順序訊息 。
如何保證順序

  • 1) 訊息被髮送時保持順序:傳送時保持順序意味著對於有順序要求的訊息,使用者應該在同一個執行緒中採用同步的方式傳送。
  • 2) 訊息被儲存時保持和傳送的順序一致:儲存保持和傳送的順序一致則要求在同一執行緒中被髮送出來的訊息A和B,儲存時在空間上A一定在B之前。
  • 3) 訊息被消費時保持和儲存的順序一致:消費保持和儲存一致則要求訊息A、B到達Consumer之後必須按照先A後B的順序被處理。


點選檢視模擬訊息程式碼

/**
 * 模擬訊息
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Order {

    private Long orderId;
    private String desc;

    public static List<Order> buildOrders(){
        List<Order> list = new ArrayList<Order>();
        Order order1001a = new Order(1001L,"建立");
        Order order1004a = new Order(1004L,"建立");
        Order order1006a = new Order(1006L,"建立");
        Order order1009a = new Order(1009L,"建立");
        list.add(order1001a);
        list.add(order1004a);
        list.add(order1006a);
        list.add(order1009a);
        Order order1001b = new Order(1001L,"付款");
        Order order1004b = new Order(1004L,"付款");
        Order order1006b = new Order(1006L,"付款");
        Order order1009b = new Order(1009L,"付款");
        list.add(order1001b);
        list.add(order1004b);
        list.add(order1006b);
        list.add(order1009b);
        Order order1001c = new Order(1001L,"完成");
        Order order1006c = new Order(1006L,"完成");
        list.add(order1001c);
        list.add(order1006c);
        return list;
    }
}


點選檢視生產者程式碼

/**
 * Producer端確保訊息順序唯一要做的事情就是將訊息路由到特定的佇列,
 * 在RocketMQ中,通過MessageQueueSelector來實現分割槽的選擇
 */
public class ProducerOrder {
        //nameserver地址
        private static String namesrvaddress="127.0.0.1:9876;";

        public static void main(String[] args) throws Exception {
            //建立DefaultMQProducer
            DefaultMQProducer producer = new DefaultMQProducer("order_producer_name");
            //設定namesrv地址
            producer.setNamesrvAddr(namesrvaddress);
            //啟動Producer
            producer.start();
            List<Order> orderList = Order.buildOrders();
            for (Order order : orderList) {
                String body = order.toString();
                //建立訊息
                Message message = new Message("orderTopic","order",body.getBytes());
                //傳送訊息
                SendResult sendResult = producer.send(
                        message,
                        new MessageQueueSelector() {
                            /**
                             *
                             * @param mqs topic中的佇列集合
                             * @param msg 訊息物件
                             * @param arg 業務引數
                             * @return
                             */
                            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                                //引數是訂單id號
                                Long orderId = (Long) arg;
                                //確定選擇的佇列的索引
                                long index = orderId % mqs.size();
                                return mqs.get((int) index);
                            }
                        },
                        order.getOrderId());
                System.out.println("傳送結果="+sendResult);
            }
            //關閉Producer
            producer.shutdown();
        }
    }


點選檢視消費者程式碼

/**
* 消費者端實現MessageListenerOrderly介口監聽訊息來實現順序訊息
*/
public class ConsumerOrder {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        //從第一個開始消費
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

        consumer.subscribe("orderTopic","*");
        //MessageListenerOrderly 順序消費
        consumer.registerMessageListener(new MessageListenerOrderly() {
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("當前執行緒:"+Thread.currentThread().getName()+",接收訊息:"+new String(msg.getBody()));
                }
                return ConsumeOrderlyStatus.SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer started.%n");
    }
}

3、延遲訊息

  RocketMQ 支援定時(延遲)訊息,但是不支援任意時間精度,僅支援特定的 level,例如定時 5s, 10s, 1m 等。其中,level=0 級表示不延時,level=1 表示 1 級延時,level=2 表示 2 級延時,以此類推。

  延遲訊息可以在生產者中直接設定,也可以在rocketmq的配置檔案broker.conf中配置:messageDelayLevel=1s|5s|1m|2m|1h|2h......


點選檢視生產者程式碼

/**
 * 延遲訊息 生產者
 */
public class ProducerDelay {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("delay_producer");
        //設定nameserver
        producer.setNamesrvAddr("127.0.0.1:9876");
        //生產者開啟
        producer.start();
        //建立訊息物件
        Message message = new Message("delayTopic","delay","hello world".getBytes());
        //設定延遲時間級別
        message.setDelayTimeLevel(2);
        //傳送訊息
        SendResult sendResult = producer.send(message);
        System.out.println(sendResult);
        //生產者關閉
        producer.shutdown();
    }
}


點選檢視消費者程式碼

/**
 * 延遲訊息 消費者
 */
public class ConsumerDelay {

    public static void main(String[] args) throws Exception {
        //建立消費者物件
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("delay_consumer");
        //設定nameserver
        consumer.setNamesrvAddr("127.0.0.1:9876");
        //設定主題和tag
        consumer.subscribe("delayTopic","*");
        //註冊訊息監聽
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("訊息ID:"+msg.getMsgId()+"傳送時間:"+new Date(msg.getStoreTimestamp())+",延遲時間:"+(System.currentTimeMillis()-msg.getStoreTimestamp()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //開啟消費者
        consumer.start();
        System.out.println("消費者啟動");
    }
}

4、批量傳送訊息


點選檢視生產者程式碼

/**
 * 批量 生產者
 */
public class ProducerBatch {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("delay_producer");
        //設定nameserver
        producer.setNamesrvAddr("127.0.0.1:9876");
        //生產者開啟
        producer.start();
        //建立訊息物件  集合
        String topic = "batchTopic";
        String tag = "batch";
        List<Message> messageList = new ArrayList<Message>();
        Message message1 = new Message(topic,tag,"hello world1".getBytes());
        Message message2 = new Message(topic,tag,"hello world2".getBytes());
        Message message3 = new Message(topic,tag,"hello world3".getBytes());
        messageList.add(message1);
        messageList.add(message2);
        messageList.add(message3);
        //傳送訊息
        SendResult sendResult = producer.send(messageList);
        System.out.println(sendResult);
        //生產者關閉
        producer.shutdown();
    }
}


點選檢視消費者程式碼

/**
 * 批量消費者
 */
public class ConsumerBatch {
    public static void main(String[] args) throws Exception {
        //建立消費者物件
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("delay_consumer");
        //設定nameserver
        consumer.setNamesrvAddr("127.0.0.1:9876");
        //設定主題和tag
        consumer.subscribe("batchTopic","*");
        //註冊訊息監聽
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("訊息ID:"+msg.getMsgId());
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //開啟消費者
        consumer.start();
        System.out.println("消費者啟動");
    }
}

5、廣播訊息

  rocketmq預設採用的是叢集消費,我們想要使用廣播消費,只需在消費者中加入consumer.setMessageModel(MessageModel.BROADCASTING)這段配置,MessageModel.CLUSTERING為叢集模式,是預設的;


點選檢視生產者程式碼

/**
 * 生產者
 */
public class ProducerBroadcast {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("delay_producer");
        //設定nameserver
        producer.setNamesrvAddr("127.0.0.1:9876");
        //生產者開啟
        producer.start();
        //建立訊息物件  集合
        String topic = "broadcastTopic";
        String tag = "broad";
        List<Message> messageList = new ArrayList<Message>();
        Message message1 = new Message(topic,tag,"hello world1".getBytes());
        Message message2 = new Message(topic,tag,"hello world2".getBytes());
        Message message3 = new Message(topic,tag,"hello world3".getBytes());
        messageList.add(message1);
        messageList.add(message2);
        messageList.add(message3);
        //傳送訊息
        SendResult sendResult = producer.send(messageList);
        System.out.println(sendResult);
        //生產者關閉
        producer.shutdown();
    }
}


點選檢視消費者1程式碼

/**
 * 消費者1
 */
public class ConsumerBroadcast1 {
    
    public static void main(String[] args) throws Exception {
        //建立消費者物件
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("delay_consumer");
        //設定nameserver
        consumer.setNamesrvAddr("127.0.0.1:9876");
        //設定主題和tag
        consumer.subscribe("broadcastTopic","*");
        //設定訊息模式 為 廣播模式
        consumer.setMessageModel(MessageModel.BROADCASTING);
        //註冊訊息監聽
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("消費者1:訊息ID:"+msg.getMsgId()+",內容"+new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //開啟消費者
        consumer.start();
        System.out.println("消費者1啟動");
    }
}


點選檢視消費者2程式碼

/**
 * 消費者2
 */
public class ConsumerBroadcast2 {

    public static void main(String[] args) throws Exception {
        //建立消費者物件
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("delay_consumer");
        //設定nameserver
        consumer.setNamesrvAddr("127.0.0.1:9876");
        //設定主題和tag
        consumer.subscribe("broadcastTopic","*");
        //設定訊息模式 為 廣播模式
        consumer.setMessageModel(MessageModel.BROADCASTING);
        //註冊訊息監聽
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println("消費者2:訊息ID:"+msg.getMsgId()+",內容"+new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //開啟消費者
        consumer.start();
        System.out.println("消費者2啟動");
    }
}

七、SpringBoot整合RocketMQ

  建立一個maven工程,匯入依賴

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>2.2.1.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.0.2</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <version>2.2.1.RELEASE</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.22</version>
        </dependency>
    </dependencies>


點選檢視模擬訊息程式碼

/**
 * 模擬訊息
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class Order {

    private Long orderId;
    private String desc;

    public static List<Order> buildOrders(){
        List<Order> list = new ArrayList<Order>();
        Order order1001a = new Order(1001L,"1001建立");
        Order order1004a = new Order(1004L,"1004建立");
        Order order1006a = new Order(1006L,"1006建立");
        Order order1009a = new Order(1009L,"1009建立");
        list.add(order1001a);
        list.add(order1004a);
        list.add(order1006a);
        list.add(order1009a);
        Order order1001b = new Order(1001L,"1001付款");
        Order order1004b = new Order(1004L,"1004付款");
        Order order1006b = new Order(1006L,"1006付款");
        Order order1009b = new Order(1009L,"1009付款");
        list.add(order1001b);
        list.add(order1004b);
        list.add(order1006b);
        list.add(order1009b);
        Order order1001c = new Order(1001L,"1001完成");
        Order order1006c = new Order(1006L,"1006完成");
        list.add(order1001c);
        list.add(order1006c);
        return list;
    }
}


點選檢視訊息生產者程式碼

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public class RocketMQTest {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    /**
     * 普通訊息生產者
     */
    @Test
    public void testSend(){
        rocketMQTemplate.convertAndSend("testTopic","這是測試訊息!");
    }
    
    /**
     * 延遲訊息生產者
     */
    @Test
    public void testDelaySend(){
        SendResult sendResult = rocketMQTemplate.syncSend("testTopic",
                new GenericMessage("這是延遲測試訊息!"+new Date()),
                10000,
                4);
        log.info("sendResult=="+sendResult);
    }

    /**
     * 順序訊息 生產者
     */
    @Test
    public void testOrderlySend(){
        List<Order> orderList = Order.buildOrders();
        for (Order order : orderList) {
            //傳送訊息
            rocketMQTemplate.setMessageQueueSelector(new MessageQueueSelector() {

                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    //引數是訂單id號
                    Long orderId = Long.valueOf((String)arg);
                    //確定選擇的佇列的索引
                    long index = orderId % mqs.size();
                    log.info("mqs is ::" + mqs.get((int) index));
                    return mqs.get((int) index);
                }
            });
            SendResult sendOrderly = rocketMQTemplate.syncSendOrderly("testTopicOrderLy",
                    new GenericMessage<String>(order.toString()), order.getOrderId().toString());
            log.info("傳送結果="+sendOrderly+",orderid :"+order.getOrderId());
        }
    }
}


點選檢視普通|延遲消費者程式碼

/**
 * 普通、延遲訊息 消費者程式碼
 */
@Component
@RocketMQMessageListener(consumerGroup = "myConsumer", topic = "testTopic")
public class RocketConsumer implements RocketMQListener<String> {

    public void onMessage(String message) {
        System.out.println("接收到訊息:="+message);
    }
}


點選檢視順序消費者程式碼

/**
 * 順序訊息 ,消費者
 */
@Slf4j
@Component
@RocketMQMessageListener(consumerGroup = "myConsumerOrderly", topic = "testTopicOrderLy",consumeMode = ConsumeMode.ORDERLY)
public class RocketConsumerOrderly implements RocketMQListener<String> {
    
    public void onMessage(String message) {
       log.info("當前執行緒:"+Thread.currentThread().getName()+",接收到訊息:="+message);
    }
}

八、RocketMQ的安裝配置

1、配置系統環境變數;計算機/屬性/高階系統設定/環境變數/系統變數,新建系統變數ROCKETMQ_HOME=RocketMQ安裝路徑

2、進入RocketMQ安裝目錄的bin目錄下,右鍵用記事本開啟修改runserver.cmd檔案

3、修改runbroker.cmd檔案

4、cmd進入到MQ/bin目錄下啟動
1.啟動mqnamesrv.cmd
start mqnamesrv.cmd

成功的彈窗,此框勿關閉。

2.啟動mqbroker.cmd
start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true

成功的彈窗,此框勿關閉。

注意:假如彈出提示框提示‘錯誤: 找不到或無法載入主類 xxxxxx’。開啟runbroker.cmd,然後將‘%CLASSPATH%’加上英文雙引號。儲存並重新執行start語句。

5、下載RocketMQ的視覺化外掛
  • 1) 下載地址: https://github.com/apache/rocketmq-externals/releases

  • 2) 修改rocketmq-console\src\main\resources\application.properties,修改如下:

  • 3) cmd視窗執行:mvn clean package -Dmaven.test.skip=true

  • 4) jar包執行:java -jar rocketmq-console-ng-1.0.0.jar

  • 5) 測試輸入地址: http://127.0.0.1:8080/#/ops