RocketMQ 4.7.1 環境搭建、叢集、MQ整合SpringBoot
導讀
之前學過ActiveMQ但是併發量不是很大點我直達,所以又學阿里開源的RocketMQ,據說佇列可以堆積億級別。下面是網上找的訊息佇列對比圖,僅供參考
部署
官網
點我直達
前置條件
- 推薦使用64位作業系統,建議使用Linux / Unix / Mac;
- 64位JDK 1.8+;
- Maven 3.2.x;
- Git;
- 適用於Broker伺服器的記憶體4G +可用磁碟
下載
地址:https://downloads.apache.org/rocketmq/4.7.1/rocketmq-all-4.7.1-source-release.zip
百度雲盤:
連結: https://pan.baidu.com/s/1luq_MwxSn8k_bugrnQSJWg 密碼: varj
安裝依賴項
- jdk:點我直達
- maven:點我直達
- git安裝:yum install -y git
export JAVA_HOME=/opt/soft/jdk1.8.0_202 export PATH=$JAVA_HOME/bin:$PATH export CLASPATH=.:$JAVA_home/lib/dt.jar:$JAVA_HOME/lib/tools.jar export JAVA_HOME PATH CLASSPATH export MAVEN_HOME=/opt/soft/apache-maven-3.6.3 export PATH=$PATH:$MAVEN_HOME/bin
mq上傳至linux
解壓
maven編譯
啟動NameServer
後臺啟動方式
nohup sh bin/mqnamesrv &
NameServer啟動時記憶體不足(問題解決)
找到runserver.sh 修改JAVA_OPT vim /bin/runserver.sh配置
啟動Broker
nohup sh bin/mqbroker -n localhost:9876 & 語法:nohup sh bin/mqbroker -n NameServer服務ip地址
Broker記憶體不足(問題解決)
找到runbroker.sh 修改JAVA_OPT vim /bin/runbroker.sh配置
服務都啟動成功
模擬消費
export NAMESRV_ADDR=localhost:9876 sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
開2個控制檯,連線通一臺linux
注意
NameServer預設埠號:9876;broker預設埠號:10911
視覺化控制檯
官網地址
點我直達
百度雲盤
連結: https://pan.baidu.com/s/1mdEGkq-JBTy1wtNmFPkmDg 密碼: v6bq
解壓
安裝編譯
進入:/opt/soft/rocketmq-externals-master/rocketmq-console 編譯: mvn clean package -Dmaven.test.skip=true
修改appliccation.properties的rocketmq.config.namesrvAddr
編譯打包
啟動
進入target目錄,啟動java -jar
守護程序啟動: nohup java -jar rocketmq-console-ng-2.0.0.jar &
SpringBoot整合RocketMQ(生產者)
建立SpringBoot專案
點我直達
專案結構
加入依賴
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.1.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.ybchen</groupId> <artifactId>ybchen-mq</artifactId> <version>0.0.1-SNAPSHOT</version> <name>ybchen-mq</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <!--注意: 這裡的版本,要和部署在伺服器上的版本號一致--> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.7.1</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-common</artifactId> <version>4.7.1</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
PayProducer.java
package com.ybchen.ybchenmq.jms; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.springframework.stereotype.Component; /** * 訊息生產者 */ @Component public class PayProducer { /** * 生產者所屬的組 */ private String producerGroup = "pay_group"; /** * MQ的地址,注意需開放埠號或者關閉防火牆 */ private String nameServerAddr = "192.168.199.100:9876"; private DefaultMQProducer producer; public PayProducer() { producer = new DefaultMQProducer(producerGroup); //指定NameServer地址,多個地址以;隔開 //如 producer.setNamesrvAddr("192.168.199.100:9876;192.168.199.101:9876;192.168.199.102:9876") producer.setNamesrvAddr(nameServerAddr); start(); } /** * 獲取生產者 * @return */ public DefaultMQProducer getProducer() { return this.producer; } /** * 開啟,物件在使用之前必須要呼叫一次,只能初始化一次 */ public void start() { try { this.producer.start(); } catch (MQClientException e) { e.printStackTrace(); } } /** * 關閉,一般在應用上下文,使用上下文監聽器,進行關閉 */ public void shutdown() { this.producer.shutdown(); } }
PayController.java
package com.ybchen.ybchenmq.controller; import com.ybchen.ybchenmq.jms.PayProducer; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.exception.RemotingException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * @ClassName:PayController * @Description:支付 * @Author:chenyb * @Date:2020/10/18 2:47 下午 * @Versiion:1.0 */ @RestController @RequestMapping("/api/v1") public class PayController { @Autowired private PayProducer payProducer; private static final String TOPIC = "ybchen_pay_topic"; /** * 支付回撥 * * @param text * @return */ @RequestMapping("pay_cb") public Object callback(String text) { /** * String topic:話題 * String tags:二級分類 * byte[] body:body訊息位元組陣列 */ Message message = new Message(TOPIC,"tag_a",("hello ybchen ==>"+text).getBytes()); try { SendResult send = payProducer.getProducer().send(message); System.out.println("send------>"+send); } catch (MQClientException e) { e.printStackTrace(); } catch (RemotingException e) { e.printStackTrace(); } catch (MQBrokerException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } return "ok"; } }
測試
常見錯誤
錯誤一
org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout
原因:阿里雲存在多網絡卡,rocketmq會根據當前網絡卡選擇一個IP使用,當你的機器有多塊網絡卡時,很可能會有問題,比如,機器上有兩個ip,一個公網ip,一個私網ip,因此需要配置broker.conf指定當前公網的ip,然後重啟broker 修改配置:/opt/soft/rocketmq-all-4.7.1-source-release/distribution/target/rocketmq-4.7.1/rocketmq-4.7.1/conf/broker.conf 新增這個配置:brokerIP1=xxx.xxx.xxx.xxx 啟動命令:nohup sh bin/mqbroker -n localhost:9876 -c ./conf/broker.conf &
錯誤2
MQClientException: No route info of this topic, TopicTest1 原因:Broker 緊追自動建立Topic,且使用者沒有通過手工方式建立此Topic,或者broker和Nameserver網路不通 解決: 通過sh bin/mqbroker -m 檢視配置 autoCreateTopicEnable=true 則自動建立Topic Centos 7 關閉防火牆:systemctl stop firewalld
錯誤3
控制檯檢視不了資料,提示連線10909錯誤 原因:Rocket預設開啟了VIP通道,VPI通道埠號為10911-2=10909 解決:阿里雲安全組新增一個埠:10909
錯誤4
無法自動建立topic:客戶端版本要和服務端版本保持一致
伺服器上裝的是4.7.1 引入依賴項時 <!--注意: 這裡的版本,要和部署在伺服器上的版本號一致--> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.7.1</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-common</artifactId> <version>4.7.1</version> </dependency>
檢索訊息傳送
SpringBoot整合RocketMQ(消費者)
建立SpringBoot專案
專案結構
加入依賴
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.1.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.ybchen</groupId> <artifactId>ybchen-mq</artifactId> <version>0.0.1-SNAPSHOT</version> <name>ybchen-mq</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <!--注意: 這裡的版本,要和部署在伺服器上的版本號一致--> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.7.1</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-common</artifactId> <version>4.7.1</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
PayConsumer.java
package com.ybchen.ybchenmqconsumer.jms; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; import org.springframework.stereotype.Component; import java.io.UnsupportedEncodingException; import java.util.List; /** * @ClassName:PayConsumer * @Description:消費者 * @Author:chenyb * @Date:2020/10/18 4:13 下午 * @Versiion:1.0 */ @Component public class PayConsumer { /** * 生產者所屬的組 */ private String producerGroup = "pay_consumer_group"; /** * MQ的地址,注意需開放埠號或者關閉防火牆 */ private String nameServerAddr = "192.168.199.100:9876"; /** * 訂閱主題 */ private String topic = "ybchen_pay_topic"; private DefaultMQPushConsumer consumer; public PayConsumer() throws MQClientException { consumer = new DefaultMQPushConsumer(producerGroup); //指定NameServer地址,多個地址以;隔開 //如 producer.setNamesrvAddr("192.168.199.100:9876;192.168.199.101:9876;192.168.199.102:9876") consumer.setNamesrvAddr(nameServerAddr); //設定消費地點,從最後一個開始消費 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); //訂閱主題,監聽主題下的那些標籤 consumer.subscribe(topic, "*"); //註解一個監聽器 //lambda方式 // consumer.registerMessageListener((MessageListenerConcurrently) (msg, context) -> { // try { // Message message = msg.get(0); // System.out.printf("%s Receive New Messages: %s %n", // Thread.currentThread().getName(), new String(msg.get(0).getBody())); // //主題 // String topic = message.getTopic(); // //訊息內容 // String body = null; // body = new String(message.getBody(), "utf-8"); // //二級分類 // String tags = message.getTags(); // //鍵 // String keys = message.getKeys(); // System.out.println("topic=" + topic + ", tags=" + tags + ", keys=" + keys + ", msg=" + body); // return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // } catch (UnsupportedEncodingException e) { // e.printStackTrace(); // return ConsumeConcurrentlyStatus.RECONSUME_LATER; // } // }); //一般方式 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { try { Message message = list.get(0); System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), new String(list.get(0).getBody(),"utf-8")); //主題 String topic = message.getTopic(); //訊息內容 String body = null; body = new String(message.getBody(), "utf-8"); //二級分類 String tags = message.getTags(); //鍵 String keys = message.getKeys(); System.out.println("topic=" + topic + ", tags=" + tags + ", keys=" + keys + ", msg=" + body); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } catch (UnsupportedEncodingException e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } }); consumer.start(); System.out.println("consumer start .........."); } }
application.properties
server.port=8081
測試生產者消費者
MQ叢集架構模式分析
單節點
優點
本地開發測試,配置簡單,同步刷盤訊息一條都不會丟
缺點
不可靠,如果宕機,會導致服務不可用
主從(非同步、同步雙寫)
優點
同步雙寫訊息不丟失,非同步複製存在少量丟失你,主節點宕機,從節點可以對外提供訊息的消費,但是不支援寫入
缺點
主備有短暫訊息延遲,毫秒級,目前不支援自動切換,需要指令碼或者其他程式進行檢測然後停止broker,重啟讓從節點成為主節點
雙主
優點
配置簡單,可以靠配置RAID磁碟陣列保證訊息可靠,非同步刷盤丟失少量訊息
缺點
master宕機期間,未被消費的訊息在機器恢復之前不可訊息,實時性會受到影響
雙主雙從,多主多從模式(非同步複製)
優點
磁碟損壞,訊息丟失的非常小,訊息實時性不會受影響,Master宕機後,消費者仍然可以從Slave消費
缺點
主備有短暫訊息延遲,毫秒級,如果Master宕機,磁碟損壞情況,會丟失你少量訊息
雙主雙從,多主多從模式(同步雙寫)
優點
同步雙寫方式,主備都寫成功,才嚮應用返回成功,服務可用性與資料可用性非常高
缺點
效能比非同步複製模式略低,主宕機後,備機不能自動切換為主機
推薦
- 主從(非同步、同步雙寫)
- 雙主雙從,多主多從模式(非同步複製)
- 雙主雙從,多主多從模式(同步雙寫)
主從叢集搭建
準備工作
準備2臺機器,ip地址分別為:192.168.199.100;192.168.199.101;
環境:RocketMQ4.7.1+jdk8+Maven+Centos 7
啟動兩臺nameserver
啟動兩個機器的nameserver
路徑:/opt/soft/rocketmq-all-4.7.1-source-release/distribution/target/rocketmq-4.7.1/rocketmq-4.7.1 啟動:nohup sh bin/mqnamesrc &
編輯並啟動roccketmq
主節點 進入:/opt/soft/rocketmq-all-4.7.1-source-release/distribution/target/rocketmq-4.7.1/rocketmq-4.7.1/conf/2m-2s-async 編輯並修改如下:vim broker-a.properties namesrvAddr=192.168.199.100:9876;192.168.199.101:9876 brokerClusterName=YbChenCluster brokerName=broker-a brokerId=0 deleteWhen=04 fileReservedTime=48 brokerRole=ASYNC_MASTER flushDiskType=ASYNC_FLUSH 啟動:nohup sh bin/mqbroker -c conf/2m-2s-async/broker/broker-a.properties &
從節點 進入:/opt/soft/rocketmq-all-4.7.1-source-release/distribution/target/rocketmq-4.7.1/rocketmq-4.7.1/conf/2m-2s-async 編輯並修改如下:vim broker-a-s.properties namesrvAddr=192.168.199.100:9876;192.168.199.101:9876 brokerClusterName=YbChenCluster brokerName=broker-a brokerId=1 deleteWhen=04 fileReservedTime=48 brokerRole=SLAVE flushDiskType=ASYNC_FLUSH 啟動:nohup sh bin/mqbroker -c conf/2m-2s-async/broker/broker-a-s.properties &
注意事項
- namesrvAddr:相同
- brokerClusterName:相同
- brokerName:相同
- brokerId:不同,0是主節點
- deleteWhen:相同
- fileReservedTime:相同
- brokerRole:不同,分ASYNC_MASTER、SLAVE
- flushDiskType:相同
啟動broker
使用管控臺
使用192.168.199.100這臺伺服器,修改配置
192.168.199.100這臺伺服器 進入:/opt/soft/rocketmq-externals-master/rocketmq-console/src/main/resources 修改配置檔案:vim application.properties rocketmq.config.namesrvAddr=192.168.199.100:9876;192.168.199.101:9876 編譯 切換到:/opt/soft/rocketmq-externals-master/rocketmq-console 打包: mvn clean mvn install -Dmaven.test.skip=true 啟動 進入:/opt/soft/rocketmq-externals-master/rocketmq-console/target 守護程序方式啟動:nohup java -jar rocketmq-console-ng-2.0.0.jar &
叢集測試
故障演練
模擬主掛了,但是從還可以被消費,此時不能寫入,等主重啟後,可以繼續寫入(資料不會被重複消費),以下內容是連續的
總結
好了,到目前為止,主從已經搭建完成了。
Broker分為Master和Slave,一個Master可以對應多個Slave,但一個Slave只能對應一個Master,Master與Slave通過相同的Broker Name來匹配,不同的Broker id來定義時Master還是Slave
Broker向所有的NameServer節點建立長連線,定時註冊Topic和傳送元資料資訊
NameServer定時掃描(預設2分鐘)所有存活Broker的連線,如果超過時間沒響應,則斷開連線(心跳檢測),但是Consumer客戶端不能感知,Consumer定時(30秒)從NameServer獲取topic的最新資訊,所以broker不可用時,Consumer最多需要30秒才能發現
只有Master才能進行寫入操作,Slave不允許寫入只能同步,同步策略取決於Master配置
客戶端消費可以從Master和Slave消費,預設消費者都從Master消費,如果在Master掛了之後,客戶端從NameServer中感知Broker宕機,就會從Slave消費,感知非實時,存在一定的滯後性,Slave不能保證Master的100%都同步過來,會有少量的訊息丟失。一旦Master恢復,未同步過去的訊息會被最終消費掉。
如果Consumer例項的數量比Message Queue的總數量還多的話,多出來的Consumer例項將無法分到Queue,也就無法消費到訊息,也就無法起到分攤負載的作用,所以需要控制讓Queue的總數量大於Consumer的數量。
場景模擬
生產和消費重試及處理
生產者重試
- 訊息重試(保證資料的高可靠性),本身內部支援重試,預設次數是2
- 如果網路情況較差,或者跨叢集則建議多改幾次
生產者設定重試次數,並設定唯一的key(一般唯一識別符號)
消費者重試
- 原因:訊息處理異常,broker端到consumer端各種問題,如網路原因閃斷,消費處理失敗,ACK返回失敗等
- 注意
- 重試間隔時間配置,預設每條訊息最多重試16次
- 超過重試次數人工補償
- 消費端去重
- 一條訊息無論重試多少次,這些重試訊息的Message ID,key不會改變
- 消費重試只針對叢集消費方式生效;廣播方式不提供失敗重試特性,即消費失敗後,失敗訊息不再重試,繼續消費新的訊息
設定廣播方式
模擬訊息重發
非同步傳送訊息和回撥實戰
應用場景
比如12306付完錢