1. 程式人生 > >RocketMQ 4.7.1 環境搭建、叢集、MQ整合SpringBoot

RocketMQ 4.7.1 環境搭建、叢集、MQ整合SpringBoot

導讀

  之前學過ActiveMQ但是併發量不是很大點我直達,所以又學阿里開源的RocketMQ,據說佇列可以堆積億級別。下面是網上找的訊息佇列對比圖,僅供參考

部署

官網

點我直達

前置條件

  1. 推薦使用64位作業系統,建議使用Linux / Unix / Mac;
  2. 64位JDK 1.8+;
  3. Maven 3.2.x;
  4. Git;
  5. 適用於Broker伺服器的記憶體4G +可用磁碟

下載

地址:https://downloads.apache.org/rocketmq/4.7.1/rocketmq-all-4.7.1-source-release.zip 

百度雲盤:

連結: https://pan.baidu.com/s/1luq_MwxSn8k_bugrnQSJWg  密碼: varj

安裝依賴項

  1. jdk:點我直達
  2. maven:點我直達
  3. git安裝:yum install -y git
export JAVA_HOME=/opt/soft/jdk1.8.0_202
export PATH=$JAVA_HOME/bin:$PATH
export CLASPATH=.:$JAVA_home/lib/dt.jar:$JAVA_HOME/lib/tools.jar
export JAVA_HOME PATH CLASSPATH
export MAVEN_HOME=/opt/soft/apache-maven-3.6.3
export PATH=$PATH:$MAVEN_HOME/bin

mq上傳至linux

解壓

 

maven編譯

 

啟動NameServer

後臺啟動方式

nohup sh bin/mqnamesrv &

NameServer啟動時記憶體不足(問題解決)

找到runserver.sh 修改JAVA_OPT

vim /bin/runserver.sh配置

啟動Broker

nohup sh bin/mqbroker -n localhost:9876 &

語法:nohup sh bin/mqbroker -n NameServer服務ip地址

 

Broker記憶體不足(問題解決)

找到runbroker.sh 修改JAVA_OPT

vim /bin/runbroker.sh配置

服務都啟動成功

 

模擬消費

export NAMESRV_ADDR=localhost:9876

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer

sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

開2個控制檯,連線通一臺linux

注意

  NameServer預設埠號:9876;broker預設埠號:10911

視覺化控制檯

官網地址

點我直達

百度雲盤

連結: https://pan.baidu.com/s/1mdEGkq-JBTy1wtNmFPkmDg  密碼: v6bq

解壓

安裝編譯

進入:/opt/soft/rocketmq-externals-master/rocketmq-console
編譯: mvn clean package -Dmaven.test.skip=true

修改appliccation.properties的rocketmq.config.namesrvAddr

編譯打包

啟動

  進入target目錄,啟動java -jar

守護程序啟動: nohup java -jar rocketmq-console-ng-2.0.0.jar &

SpringBoot整合RocketMQ(生產者)

建立SpringBoot專案

點我直達

專案結構

加入依賴

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.1.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.ybchen</groupId>
    <artifactId>ybchen-mq</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>ybchen-mq</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <!--注意: 這裡的版本,要和部署在伺服器上的版本號一致-->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.7.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-common</artifactId>
            <version>4.7.1</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

PayProducer.java

package com.ybchen.ybchenmq.jms;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.springframework.stereotype.Component;

/**
 * 訊息生產者
 */
@Component
public class PayProducer {
    /**
     * 生產者所屬的組
     */
    private String producerGroup = "pay_group";
    /**
     * MQ的地址,注意需開放埠號或者關閉防火牆
     */
    private String nameServerAddr = "192.168.199.100:9876";
    private DefaultMQProducer producer;

    public PayProducer() {
        producer = new DefaultMQProducer(producerGroup);
        //指定NameServer地址,多個地址以;隔開
        //如 producer.setNamesrvAddr("192.168.199.100:9876;192.168.199.101:9876;192.168.199.102:9876")
        producer.setNamesrvAddr(nameServerAddr);
        start();
    }

    /**
     * 獲取生產者
     * @return
     */
    public DefaultMQProducer getProducer() {
        return this.producer;
    }

    /**
     * 開啟,物件在使用之前必須要呼叫一次,只能初始化一次
     */
    public void start() {
        try {
            this.producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
    }

    /**
     * 關閉,一般在應用上下文,使用上下文監聽器,進行關閉
     */
    public void shutdown() {
        this.producer.shutdown();
    }
}

PayController.java

package com.ybchen.ybchenmq.controller;

import com.ybchen.ybchenmq.jms.PayProducer;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @ClassName:PayController
 * @Description:支付
 * @Author:chenyb
 * @Date:2020/10/18 2:47 下午
 * @Versiion:1.0
 */
@RestController
@RequestMapping("/api/v1")
public class PayController {
    @Autowired
    private PayProducer payProducer;

    private static final String TOPIC = "ybchen_pay_topic";

    /**
     * 支付回撥
     *
     * @param text
     * @return
     */
    @RequestMapping("pay_cb")
    public Object callback(String text) {
        /**
         * String topic:話題
         * String tags:二級分類
         * byte[] body:body訊息位元組陣列
         */
        Message message = new Message(TOPIC,"tag_a",("hello ybchen ==>"+text).getBytes());
        try {
            SendResult send = payProducer.getProducer().send(message);
            System.out.println("send------>"+send);
        } catch (MQClientException e) {
            e.printStackTrace();
        } catch (RemotingException e) {
            e.printStackTrace();
        } catch (MQBrokerException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return "ok";
    }
}

測試

常見錯誤

錯誤一

org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException:
sendDefaultImpl call timeout
原因:阿里雲存在多網絡卡,rocketmq會根據當前網絡卡選擇一個IP使用,當你的機器有多塊網絡卡時,很可能會有問題,比如,機器上有兩個ip,一個公網ip,一個私網ip,因此需要配置broker.conf指定當前公網的ip,然後重啟broker


修改配置:/opt/soft/rocketmq-all-4.7.1-source-release/distribution/target/rocketmq-4.7.1/rocketmq-4.7.1/conf/broker.conf
新增這個配置:brokerIP1=xxx.xxx.xxx.xxx

啟動命令:nohup sh bin/mqbroker -n localhost:9876 -c ./conf/broker.conf &

錯誤2

MQClientException: No route info of this topic, TopicTest1

原因:Broker 緊追自動建立Topic,且使用者沒有通過手工方式建立此Topic,或者broker和Nameserver網路不通

解決:
    通過sh bin/mqbroker -m 檢視配置
    autoCreateTopicEnable=true 則自動建立Topic

Centos 7 關閉防火牆:systemctl stop firewalld

錯誤3

控制檯檢視不了資料,提示連線10909錯誤

原因:Rocket預設開啟了VIP通道,VPI通道埠號為10911-2=10909

解決:阿里雲安全組新增一個埠:10909

錯誤4

  無法自動建立topic:客戶端版本要和服務端版本保持一致

伺服器上裝的是4.7.1

引入依賴項時
        <!--注意: 這裡的版本,要和部署在伺服器上的版本號一致-->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.7.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-common</artifactId>
            <version>4.7.1</version>
        </dependency>

檢索訊息傳送

SpringBoot整合RocketMQ(消費者)

建立SpringBoot專案

 

專案結構

加入依賴

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.1.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.ybchen</groupId>
    <artifactId>ybchen-mq</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>ybchen-mq</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <!--注意: 這裡的版本,要和部署在伺服器上的版本號一致-->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.7.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-common</artifactId>
            <version>4.7.1</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

PayConsumer.java

package com.ybchen.ybchenmqconsumer.jms;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.stereotype.Component;

import java.io.UnsupportedEncodingException;
import java.util.List;

/**
 * @ClassName:PayConsumer
 * @Description:消費者
 * @Author:chenyb
 * @Date:2020/10/18 4:13 下午
 * @Versiion:1.0
 */
@Component
public class PayConsumer {
    /**
     * 生產者所屬的組
     */
    private String producerGroup = "pay_consumer_group";
    /**
     * MQ的地址,注意需開放埠號或者關閉防火牆
     */
    private String nameServerAddr = "192.168.199.100:9876";
    /**
     * 訂閱主題
     */
    private String topic = "ybchen_pay_topic";
    private DefaultMQPushConsumer consumer;

    public PayConsumer() throws MQClientException {
        consumer = new DefaultMQPushConsumer(producerGroup);
        //指定NameServer地址,多個地址以;隔開
        //如 producer.setNamesrvAddr("192.168.199.100:9876;192.168.199.101:9876;192.168.199.102:9876")
        consumer.setNamesrvAddr(nameServerAddr);
        //設定消費地點,從最後一個開始消費
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        //訂閱主題,監聽主題下的那些標籤
        consumer.subscribe(topic, "*");
        //註解一個監聽器
        //lambda方式
//        consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -> {
//            try {
//                Message message = msg.get(0);
//                System.out.printf("%s Receive New Messages: %s %n",
//                        Thread.currentThread().getName(), new String(msg.get(0).getBody()));
//                //主題
//                String topic = message.getTopic();
//                //訊息內容
//                String body = null;
//                body = new String(message.getBody(), "utf-8");
//                //二級分類
//                String tags = message.getTags();
//                //鍵
//                String keys = message.getKeys();
//                System.out.println("topic=" + topic + ", tags=" + tags + ", keys=" + keys + ", msg=" + body);
//                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
//            } catch (UnsupportedEncodingException e) {
//                e.printStackTrace();
//                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
//            }
//        });

        //一般方式
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                try {
                    Message message = list.get(0);
                    System.out.printf("%s Receive New Messages: %s %n",
                            Thread.currentThread().getName(), new String(list.get(0).getBody(),"utf-8"));
                    //主題
                    String topic = message.getTopic();
                    //訊息內容
                    String body = null;
                    body = new String(message.getBody(), "utf-8");
                    //二級分類
                    String tags = message.getTags();
                    //鍵
                    String keys = message.getKeys();
                    System.out.println("topic=" + topic + ", tags=" + tags + ", keys=" + keys + ", msg=" + body);
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }
        });
        consumer.start();
        System.out.println("consumer start ..........");
    }
}

application.properties

server.port=8081

測試生產者消費者

MQ叢集架構模式分析

單節點

優點

  本地開發測試,配置簡單,同步刷盤訊息一條都不會丟

缺點

  不可靠,如果宕機,會導致服務不可用

主從(非同步、同步雙寫)

優點

  同步雙寫訊息不丟失,非同步複製存在少量丟失你,主節點宕機,從節點可以對外提供訊息的消費,但是不支援寫入

缺點

  主備有短暫訊息延遲,毫秒級,目前不支援自動切換,需要指令碼或者其他程式進行檢測然後停止broker,重啟讓從節點成為主節點

雙主

優點

  配置簡單,可以靠配置RAID磁碟陣列保證訊息可靠,非同步刷盤丟失少量訊息

缺點

  master宕機期間,未被消費的訊息在機器恢復之前不可訊息,實時性會受到影響

雙主雙從,多主多從模式(非同步複製)

優點

  磁碟損壞,訊息丟失的非常小,訊息實時性不會受影響,Master宕機後,消費者仍然可以從Slave消費

缺點

  主備有短暫訊息延遲,毫秒級,如果Master宕機,磁碟損壞情況,會丟失你少量訊息

雙主雙從,多主多從模式(同步雙寫)

優點

  同步雙寫方式,主備都寫成功,才嚮應用返回成功,服務可用性與資料可用性非常高

缺點

  效能比非同步複製模式略低,主宕機後,備機不能自動切換為主機

推薦

  1. 主從(非同步、同步雙寫)
  2. 雙主雙從,多主多從模式(非同步複製)
  3. 雙主雙從,多主多從模式(同步雙寫)

主從叢集搭建

準備工作

  準備2臺機器,ip地址分別為:192.168.199.100;192.168.199.101;

  環境:RocketMQ4.7.1+jdk8+Maven+Centos 7

啟動兩臺nameserver

  啟動兩個機器的nameserver

路徑:/opt/soft/rocketmq-all-4.7.1-source-release/distribution/target/rocketmq-4.7.1/rocketmq-4.7.1

啟動:nohup sh bin/mqnamesrc &

編輯並啟動roccketmq

主節點

進入:/opt/soft/rocketmq-all-4.7.1-source-release/distribution/target/rocketmq-4.7.1/rocketmq-4.7.1/conf/2m-2s-async


編輯並修改如下:vim broker-a.properties 
namesrvAddr=192.168.199.100:9876;192.168.199.101:9876
brokerClusterName=YbChenCluster
brokerName=broker-a
brokerId=0
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH


啟動:nohup sh bin/mqbroker -c conf/2m-2s-async/broker/broker-a.properties &

從節點

進入:/opt/soft/rocketmq-all-4.7.1-source-release/distribution/target/rocketmq-4.7.1/rocketmq-4.7.1/conf/2m-2s-async


編輯並修改如下:vim broker-a-s.properties 
namesrvAddr=192.168.199.100:9876;192.168.199.101:9876
brokerClusterName=YbChenCluster
brokerName=broker-a
brokerId=1
deleteWhen=04
fileReservedTime=48
brokerRole=SLAVE
flushDiskType=ASYNC_FLUSH


啟動:nohup sh bin/mqbroker -c conf/2m-2s-async/broker/broker-a-s.properties &

注意事項

  1. namesrvAddr:相同
  2. brokerClusterName:相同
  3. brokerName:相同
  4. brokerId:不同,0是主節點
  5. deleteWhen:相同
  6. fileReservedTime:相同
  7. brokerRole:不同,分ASYNC_MASTER、SLAVE
  8. flushDiskType:相同

啟動broker

使用管控臺

  使用192.168.199.100這臺伺服器,修改配置

192.168.199.100這臺伺服器

進入:/opt/soft/rocketmq-externals-master/rocketmq-console/src/main/resources


修改配置檔案:vim application.properties

rocketmq.config.namesrvAddr=192.168.199.100:9876;192.168.199.101:9876


編譯

切換到:/opt/soft/rocketmq-externals-master/rocketmq-console
打包:
mvn clean
mvn install -Dmaven.test.skip=true


啟動

進入:/opt/soft/rocketmq-externals-master/rocketmq-console/target
守護程序方式啟動:nohup java -jar rocketmq-console-ng-2.0.0.jar &

叢集測試

故障演練

  模擬主掛了,但是從還可以被消費,此時不能寫入,等主重啟後,可以繼續寫入(資料不會被重複消費),以下內容是連續的

總結

  好了,到目前為止,主從已經搭建完成了。

  Broker分為Master和Slave,一個Master可以對應多個Slave,但一個Slave只能對應一個Master,Master與Slave通過相同的Broker Name來匹配,不同的Broker id來定義時Master還是Slave

    Broker向所有的NameServer節點建立長連線,定時註冊Topic和傳送元資料資訊

    NameServer定時掃描(預設2分鐘)所有存活Broker的連線,如果超過時間沒響應,則斷開連線(心跳檢測),但是Consumer客戶端不能感知,Consumer定時(30秒)從NameServer獲取topic的最新資訊,所以broker不可用時,Consumer最多需要30秒才能發現

  只有Master才能進行寫入操作,Slave不允許寫入只能同步,同步策略取決於Master配置

  客戶端消費可以從Master和Slave消費,預設消費者都從Master消費,如果在Master掛了之後,客戶端從NameServer中感知Broker宕機,就會從Slave消費,感知非實時,存在一定的滯後性,Slave不能保證Master的100%都同步過來,會有少量的訊息丟失。一旦Master恢復,未同步過去的訊息會被最終消費掉。

  如果Consumer例項的數量比Message Queue的總數量還多的話,多出來的Consumer例項將無法分到Queue,也就無法消費到訊息,也就無法起到分攤負載的作用,所以需要控制讓Queue的總數量大於Consumer的數量。

場景模擬

生產和消費重試及處理

生產者重試

  • 訊息重試(保證資料的高可靠性),本身內部支援重試,預設次數是2
  • 如果網路情況較差,或者跨叢集則建議多改幾次

生產者設定重試次數,並設定唯一的key(一般唯一識別符號)

消費者重試

  • 原因:訊息處理異常,broker端到consumer端各種問題,如網路原因閃斷,消費處理失敗,ACK返回失敗等
  • 注意
    • 重試間隔時間配置,預設每條訊息最多重試16次
    • 超過重試次數人工補償
    • 消費端去重
    • 一條訊息無論重試多少次,這些重試訊息的Message ID,key不會改變
    • 消費重試只針對叢集消費方式生效;廣播方式不提供失敗重試特性,即消費失敗後,失敗訊息不再重試,繼續消費新的訊息

設定廣播方式

模擬訊息重發

非同步傳送訊息和回撥實戰

應用場景

  比如12306付完錢