專題目錄

RocketMQ詳解(一)原理概覽

RocketMQ詳解(二)安裝使用詳解

RocketMQ詳解(三)啟動執行原理

RocketMQ詳解(四)核心設計原理

RocketMQ詳解(五)總結提高

引子

本節比較輕鬆,做2個事:

  • 1.在本地安裝:RocketMQ NameServer名字服務+Broker代理服務+Dashboard看板。
  • 2.使用簡單樣例,實現訊息傳送、訊息消費。並在Dashboard上驗證。

一、安裝流程

目標:

  1. W10上本地安裝啟動RocketMQ
  2. RocketMQ DashBoard 看板(即老的console)

1.1 安裝環境

JDK8(jdk7+),驗證如下:

maven 3.8.2,驗證如下:

1.2 安裝RocketMQ

1.下載bin壓縮包並解壓縮

去官網下載安裝包:https://rocketmq.apache.org/dowloading/releases/

點選進入https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.8.0/rocketmq-all-4.8.0-bin-release.zip,如下圖:

如上圖,點選連結下載rocketmq-all-4.8.0-bin-release.zip,解壓縮如下:

2.配置系統環境變數

變數名:ROCKETMQ_HOME
變數值:D:\rocketmq-all-4.8.0-bin-release

3.啟動nameserver

進入D:\rocketmq-all-4.8.0-bin-release\bin目錄下,執行:start mqnamesrv.cmd,如下圖:

如上圖,name server啟動成功。 注:不要關閉視窗。

4.啟動broker

進入D:\rocketmq-all-4.8.0-bin-release\bin目錄下,執行:start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true,如下圖:

如上圖,broker啟動成功,broker name=GZYZFA00071414(這個名字記住,後面驗證要用), 關聯的name server是127.0.0.1:9876。注:不要關閉視窗。

1.3 安裝RocketMQ DashBoard看板

1.下載zip包並解壓縮

注意:最新的官方後臺看板只有 git: RocketMQ DashBoard,(https://github.com/apache/rocketmq-externals這個專案已經沒有看板了):

rocketmq-dashboard-master.zip 下載完後解壓縮。如下圖:

2.修改nameserver地址

到D:\rocketmq-dashboard-master\src\main\resources下修改application.properties,修改nameserver地址。

3.maven編譯並啟動spring boot

進入D:\rocketmq-dashboard-master 目錄,執行  mvn spring-boot:run,幾分鐘後,熟悉的Spring Boot banner出來了:

4.訪問驗證

訪問http://localhost:8080,看板如下圖所示,此時還沒訊息。

檢視nameserver, 如下圖:

安裝完畢,下一節我們來看看怎麼使用。

二、簡單使用

我們使用官方簡單樣例來驗證:https://github.com/apache/rocketmq/blob/master/docs/cn/Example_Simple_cn.md。

2.1 訊息傳送(生產者)

 1     @Test
2 public void sandMessageTest() throws InterruptedException, RemotingException, MQClientException, MQBrokerException, UnsupportedEncodingException {
3
4 // 例項化訊息生產者Producer
5 DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
6 // 設定NameServer的地址
7 producer.setNamesrvAddr("localhost:9876");
8 // 啟動Producer例項
9 producer.start();
10 for (int i = 0; i < 10; i++) {
11 // 建立訊息,並指定Topic,Tag和訊息體
12 Message msg = new Message("TopicTest" , "TagA" ,""+i,
13 ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
14 );
15 // 傳送訊息到一個Broker
16 SendResult sendResult = producer.send(msg);
17 // 通過sendResult返回訊息是否成功送達
18 System.out.printf("%s%n", sendResult);
19 }
20 // 如果不再發送訊息,關閉Producer例項。
21 producer.shutdown();
22 }

如上圖,

1)先啟動producer

2)同步傳送10條訊息,topic='TopicTest', tag="TagA",keys=""+i, 訊息體=Hello RocketMQ +i。

3)關閉生產者。

執行日誌:

SendResult [sendStatus=SEND_OK, msgId=AC133C2C0E0018B4AAC2515ED2840000, offsetMsgId=AC133C2C00002A9F0000000000000000, messageQueue=MessageQueue [topic=TopicTest, brokerName=GZYZFA00071414, queueId=8], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=AC133C2C0E0018B4AAC2515ED2AB0001, offsetMsgId=AC133C2C00002A9F00000000000000D0, messageQueue=MessageQueue [topic=TopicTest, brokerName=GZYZFA00071414, queueId=9], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=AC133C2C0E0018B4AAC2515ED2AC0002, offsetMsgId=AC133C2C00002A9F00000000000001A0, messageQueue=MessageQueue [topic=TopicTest, brokerName=GZYZFA00071414, queueId=10], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=AC133C2C0E0018B4AAC2515ED2AE0003, offsetMsgId=AC133C2C00002A9F0000000000000270, messageQueue=MessageQueue [topic=TopicTest, brokerName=GZYZFA00071414, queueId=11], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=AC133C2C0E0018B4AAC2515ED2B00004, offsetMsgId=AC133C2C00002A9F0000000000000340, messageQueue=MessageQueue [topic=TopicTest, brokerName=GZYZFA00071414, queueId=12], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=AC133C2C0E0018B4AAC2515ED2B10005, offsetMsgId=AC133C2C00002A9F0000000000000410, messageQueue=MessageQueue [topic=TopicTest, brokerName=GZYZFA00071414, queueId=13], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=AC133C2C0E0018B4AAC2515ED2B30006, offsetMsgId=AC133C2C00002A9F00000000000004E0, messageQueue=MessageQueue [topic=TopicTest, brokerName=GZYZFA00071414, queueId=14], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=AC133C2C0E0018B4AAC2515ED2B40007, offsetMsgId=AC133C2C00002A9F00000000000005B0, messageQueue=MessageQueue [topic=TopicTest, brokerName=GZYZFA00071414, queueId=15], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=AC133C2C0E0018B4AAC2515ED2B60008, offsetMsgId=AC133C2C00002A9F0000000000000680, messageQueue=MessageQueue [topic=TopicTest, brokerName=GZYZFA00071414, queueId=0], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=AC133C2C0E0018B4AAC2515ED2B70009, offsetMsgId=AC133C2C00002A9F0000000000000750, messageQueue=MessageQueue [topic=TopicTest, brokerName=GZYZFA00071414, queueId=1], queueOffset=0]
19:12:48.835 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[172.19.60.44:10911] result: true

如上圖所示,接收到10條傳送結果日誌,msgId訊息ID,topic=TopicTest, brokerName=GZYZFA00071414,queueId=均勻分佈在0-15一共預設的16個佇列中的10個。

開啟dashboard檢視訊息如下圖:

點選訊息詳情按鈕,檢視一個訊息:

如上圖所示,topic、tag、key、訊息體都是正確的。checked!

2.2 消費訊息(消費者)

官方樣例,一個簡單的併發消費,如下:

 1     @Test
2 public void consumerMessageTest() throws MQClientException, InterruptedException {
3 // 例項化消費者
4 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
5
6 // 設定NameServer的地址
7 consumer.setNamesrvAddr("localhost:9876");
8
9 // 訂閱一個或者多個Topic,以及Tag來過濾需要消費的訊息
10 consumer.subscribe("TopicTest", "*");
11 // 註冊回撥實現類來處理從broker拉取回來的訊息
12 consumer.registerMessageListener(new MessageListenerConcurrently() {
13 @Override
14 public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
15 System.out.printf("%s Receive New Messages: %s ,body= %s %n ", Thread.currentThread().getName(), msgs,new String(msgs.get(0).getBody()));
16 // 標記該訊息已經被成功消費
17 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
18 }
19 });
20 // 啟動消費者例項
21 consumer.start();
22 System.out.printf("Consumer Started.%n");
23 // 史前巨坑,這裡官方樣例不帶這個,加上才能來得及消費。
24 Thread.sleep(10000L);
25 }

如上圖,

1)構造了一個消費者例項,並設定了nameserver、訂閱條件(topic+tag表示式)。

2)  註冊了一個併發訊息監聽器,定義如何消費訊息。

3)啟動消費者,執行消費。這裡最後一行Thread.sleep(10000L);要記得加上,不加還沒消費執行緒就被關閉了...

執行後,消費日誌如下:

Consumer Started.
ConsumeMessageThread_1 Receive New Messages: [MessageExt [queueId=8, storeSize=208, queueOffset=0, sysFlag=0, bornTimestamp=1631790768772, bornHost=/172.19.60.44:59863, storeTimestamp=1631790768795, storeHost=/172.19.60.44:10911, msgId=AC133C2C00002A9F0000000000000000, commitLogOffset=0, bodyCRC=613185359, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, KEYS=0, CONSUME_START_TIME=1631839990074, UNIQ_KEY=AC133C2C0E0018B4AAC2515ED2840000, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 48], transactionId='null'}]]
ConsumeMessageThread_3 Receive New Messages: [MessageExt [queueId=14, storeSize=208, queueOffset=0, sysFlag=0, bornTimestamp=1631790768819, bornHost=/172.19.60.44:59863, storeTimestamp=1631790768819, storeHost=/172.19.60.44:10911, msgId=AC133C2C00002A9F00000000000004E0, commitLogOffset=1248, bodyCRC=1307562618, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, KEYS=6, CONSUME_START_TIME=1631839990074, UNIQ_KEY=AC133C2C0E0018B4AAC2515ED2B30006, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 54], transactionId='null'}]]
ConsumeMessageThread_5 Receive New Messages: [MessageExt [queueId=1, storeSize=208, queueOffset=0, sysFlag=0, bornTimestamp=1631790768823, bornHost=/172.19.60.44:59863, storeTimestamp=1631790768824, storeHost=/172.19.60.44:10911, msgId=AC133C2C00002A9F0000000000000750, commitLogOffset=1872, bodyCRC=1565577195, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=1, KEYS=9, CONSUME_START_TIME=1631839990074, UNIQ_KEY=AC133C2C0E0018B4AAC2515ED2B70009, CLUSTER=DefaultCluster, WAIT=true, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111,
...省略其它

消費完後,檢視dashboard:  topic->consumer manager可以看到消費延遲為0了,即全部消費掉了。如下圖:

三、總結

本節我們實現了本地從0到1安裝RocketMQ+Dashboard、並簡單實現了消費傳送和消費,並在Dashboard上驗證消費成功。