1. 程式人生 > >RocketMq簡介及重要概念

RocketMq簡介及重要概念

目錄

RocketMq簡介

RocketMq重要概念

單機RocketMq搭建

RocketMq總體結構

客戶端定址方式


RocketMq簡介

rocketmq是一個訊息中介軟體,基於Kafka的設計思想,但不是kafka的拷貝,它具有高吞吐量,高可用性,適用於大規模分散式系統的特點

RocketMq重要概念

1.producer

訊息生產者

2.Consumer

訊息消費者

3.PushConsumer

broker推送訊息到consumer

4.PullConsumer

consumer主動從broker定時pull訊息

5.ProducerGroup

一類producer的集合名稱,這類producer通常傳送一類訊息,且傳送邏輯一致

6.ConsumerGroup

一類consumer的集合名稱,這類consumer通常消費一類訊息,且消費邏輯一致

7.broker

訊息中轉站

8.nameserve

無狀態的資料節點,記錄broker的路由資訊,以及topic,佇列等

9.廣播訊息

一個訊息被多個consumer消費,即使這些consumer屬於同一個組,也會被組內的每個consumer都消費一次

10.叢集訊息

一個consumergroup中的consumer例項平均分攤消費訊息,即不需要自己做訊息消費的負載均衡,只需要擴充套件機器即可

11.Topic

表示一個類別

12.tag

Topic下的一個子類訊息,更進一步細分訊息型別

單機RocketMq搭建

1.上傳rocketmq jar包

2.tar zxvf 解壓rocketmq

3.修改虛擬機器引數

/bin/runserver.sh /bin/runbroker.sh 中的虛擬機器引數,因為預設4g,可以改小點

JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m -XX:PermSize=128m -XX:MaxPermSize=320m"

4.建立log日誌

在conf下更改日誌列印路徑

5.啟動nameserver和broker

啟動nameserver

nohup sh bin/mqnamesrv &

啟動broker

nohup sh bin/mqbroker -n localhost:9876 &

檢視連線此nameserve的broker

sh mqadmin clusterList -n localhost:9876

結果

可以看到此nameserve只有一個broker註冊

測試訊息的傳送和接受

生產者程式碼:

public class FirstSyncProducer {
    public static  void main(String[] args) throws Exception{
        //Instantiate with a producer group name.
        Defau ltMQProducer producer = new
                DefaultMQProducer("please_rename_unique_group_name");
        // Specify name server addresses.
        producer.setNamesrvAddr("192.168.122.10:9876");
        producer.setVipChannelEnabled(false);
        //Launch the instance.
        producer.start();
        for (int i = 0; i < 10; i++) {
            //Create a message instance, specifying topic, tag and message body.
            Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " +
                            i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
            );
            //Call send message to deliver message to one of brokers.
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        }
        //Shut down once the producer instance is not longer in use.
        producer.shutdown();
    }
}

傳送結果:

​
SendResult [sendStatus=SEND_OK, msgId=0A011949442418B4AAC27EC7255F0000, offsetMsgId=C0A87A0A00002A9F0000000000000000, messageQueue=MessageQueue [topic=TopicTest, brokerName=bogon, queueId=2], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=0A011949442418B4AAC27EC725AB0001, offsetMsgId=C0A87A0A00002A9F00000000000000B2, messageQueue=MessageQueue [topic=TopicTest, brokerName=bogon, queueId=3], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=0A011949442418B4AAC27EC725B50002, offsetMsgId=C0A87A0A00002A9F0000000000000164, messageQueue=MessageQueue [topic=TopicTest, brokerName=bogon, queueId=0], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=0A011949442418B4AAC27EC725C20003, offsetMsgId=C0A87A0A00002A9F0000000000000216, messageQueue=MessageQueue [topic=TopicTest, brokerName=bogon, queueId=1], queueOffset=0]
SendResult [sendStatus=SEND_OK, msgId=0A011949442418B4AAC27EC725CE0004, offsetMsgId=C0A87A0A00002A9F00000000000002C8, messageQueue=MessageQueue [topic=TopicTest, brokerName=bogon, queueId=2], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=0A011949442418B4AAC27EC725D60005, offsetMsgId=C0A87A0A00002A9F000000000000037A, messageQueue=MessageQueue [topic=TopicTest, brokerName=bogon, queueId=3], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=0A011949442418B4AAC27EC725DE0006, offsetMsgId=C0A87A0A00002A9F000000000000042C, messageQueue=MessageQueue [topic=TopicTest, brokerName=bogon, queueId=0], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=0A011949442418B4AAC27EC725E70007, offsetMsgId=C0A87A0A00002A9F00000000000004DE, messageQueue=MessageQueue [topic=TopicTest, brokerName=bogon, queueId=1], queueOffset=1]
SendResult [sendStatus=SEND_OK, msgId=0A011949442418B4AAC27EC725F50008, offsetMsgId=C0A87A0A00002A9F0000000000000590, messageQueue=MessageQueue [topic=TopicTest, brokerName=bogon, queueId=2], queueOffset=2]
SendResult [sendStatus=SEND_OK, msgId=0A011949442418B4AAC27EC725FF0009, offsetMsgId=C0A87A0A00002A9F0000000000000642, messageQueue=MessageQueue [topic=TopicTest, brokerName=bogon, queueId=3], queueOffset=2]

​

消費者程式碼:

public class FirstConsumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {

        // Instantiate with specified consumer group name.
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");

        // Specify name server addresses.
        consumer.setNamesrvAddr("192.168.122.10:9876");
        consumer.setVipChannelEnabled(false);
        // Subscribe one more more topics to consume.
        consumer.subscribe("TopicTest", "*");
        // Register callback to execute on arrival of messages fetched from brokers.
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        //Launch the consumer instance.
        consumer.start();

        System.out.printf("Consumer Started.%n");
    }
}

結果:

ConsumeMessageThread_9 Receive New Messages: [MessageExt [queueId=2, storeSize=178, queueOffset=1, sysFlag=0, bornTimestamp=1540450180558, bornHost=/192.168.122.1:5393, storeTimestamp=1540450180559, storeHost=/192.168.122.10:10911, msgId=C0A87A0A00002A9F00000000000002C8, commitLogOffset=712, bodyCRC=601994070, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=3, CONSUME_START_TIME=1540450248729, UNIQ_KEY=0A011949442418B4AAC27EC725CE0004, WAIT=true, TAGS=TagA}, body=16]]] 
ConsumeMessageThread_8 Receive New Messages: [MessageExt [queueId=3, storeSize=178, queueOffset=2, sysFlag=0, bornTimestamp=1540450180607, bornHost=/192.168.122.1:5393, storeTimestamp=1540450180613, storeHost=/192.168.122.10:10911, msgId=C0A87A0A00002A9F0000000000000642, commitLogOffset=1602, bodyCRC=1565577195, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=3, CONSUME_START_TIME=1540450248729, UNIQ_KEY=0A011949442418B4AAC27EC725FF0009, WAIT=true, TAGS=TagA}, body=16]]] 
ConsumeMessageThread_5 Receive New Messages: [MessageExt [queueId=3, storeSize=178, queueOffset=1, sysFlag=0, bornTimestamp=1540450180566, bornHost=/192.168.122.1:5393, storeTimestamp=1540450180568, storeHost=/192.168.122.10:10911, msgId=C0A87A0A00002A9F000000000000037A, commitLogOffset=890, bodyCRC=1424393152, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=3, CONSUME_START_TIME=1540450248729, UNIQ_KEY=0A011949442418B4AAC27EC725D60005, WAIT=true, TAGS=TagA}, body=16]]] 
ConsumeMessageThread_6 Receive New Messages: [MessageExt [queueId=0, storeSize=178, queueOffset=1, sysFlag=0, bornTimestamp=1540450180574, bornHost=/192.168.122.1:5393, storeTimestamp=1540450180576, storeHost=/192.168.122.10:10911, msgId=C0A87A0A00002A9F000000000000042C, commitLogOffset=1068, bodyCRC=1307562618, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=2, CONSUME_START_TIME=1540450248729, UNIQ_KEY=0A011949442418B4AAC27EC725DE0006, WAIT=true, TAGS=TagA}, body=16]]] 
ConsumeMessageThread_4 Receive New Messages: [MessageExt [queueId=1, storeSize=178, queueOffset=1, sysFlag=0, bornTimestamp=1540450180583, bornHost=/192.168.122.1:5393, storeTimestamp=1540450180584, storeHost=/192.168.122.10:10911, msgId=C0A87A0A00002A9F00000000000004DE, commitLogOffset=1246, bodyCRC=988340972, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=2, CONSUME_START_TIME=1540450248729, UNIQ_KEY=0A011949442418B4AAC27EC725E70007, WAIT=true, TAGS=TagA}, body=16]]] 
ConsumeMessageThread_1 Receive New Messages: [MessageExt [queueId=1, storeSize=178, queueOffset=0, sysFlag=0, bornTimestamp=1540450180546, bornHost=/192.168.122.1:5393, storeTimestamp=1540450180547, storeHost=/192.168.122.10:10911, msgId=C0A87A0A00002A9F0000000000000216, commitLogOffset=534, bodyCRC=1032136437, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=2, CONSUME_START_TIME=1540450248729, UNIQ_KEY=0A011949442418B4AAC27EC725C20003, WAIT=true, TAGS=TagA}, body=16]]] 
ConsumeMessageThread_3 Receive New Messages: [MessageExt [queueId=0, storeSize=178, queueOffset=0, sysFlag=0, bornTimestamp=1540450180533, bornHost=/192.168.122.1:5393, storeTimestamp=1540450180538, storeHost=/192.168.122.10:10911, msgId=C0A87A0A00002A9F0000000000000164, commitLogOffset=356, bodyCRC=1250039395, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=2, CONSUME_START_TIME=1540450248729, UNIQ_KEY=0A011949442418B4AAC27EC725B50002, WAIT=true, TAGS=TagA}, body=16]]] 
ConsumeMessageThread_7 Receive New Messages: [MessageExt [queueId=2, storeSize=178, queueOffset=0, sysFlag=0, bornTimestamp=1540450180448, bornHost=/192.168.122.1:5393, storeTimestamp=1540450180486, storeHost=/192.168.122.10:10911, msgId=C0A87A0A00002A9F0000000000000000, commitLogOffset=0, bodyCRC=613185359, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=3, CONSUME_START_TIME=1540450248729, UNIQ_KEY=0A011949442418B4AAC27EC7255F0000, WAIT=true, TAGS=TagA}, body=16]]] 
ConsumeMessageThread_2 Receive New Messages: [MessageExt [queueId=3, storeSize=178, queueOffset=0, sysFlag=0, bornTimestamp=1540450180523, bornHost=/192.168.122.1:5393, storeTimestamp=1540450180522, storeHost=/192.168.122.10:10911, msgId=C0A87A0A00002A9F00000000000000B2, commitLogOffset=178, bodyCRC=1401636825, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=3, CONSUME_START_TIME=1540450248729, UNIQ_KEY=0A011949442418B4AAC27EC725AB0001, WAIT=true, TAGS=TagA}, body=16]]] 
ConsumeMessageThread_10 Receive New Messages: [MessageExt [queueId=2, storeSize=178, queueOffset=2, sysFlag=0, bornTimestamp=1540450180597, bornHost=/192.168.122.1:5393, storeTimestamp=1540450180600, storeHost=/192.168.122.10:10911, msgId=C0A87A0A00002A9F0000000000000590, commitLogOffset=1424, bodyCRC=710410109, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=3, CONSUME_START_TIME=1540450248730, UNIQ_KEY=0A011949442418B4AAC27EC725F50008, WAIT=true, TAGS=TagA}, body=16]]]

從上面看到,訊息是消費成功的,並且預設的topic是四個佇列

RocketMq總體結構

Broker

a.連線

broker與每個nameserve保持長連線

b.心跳

心跳間隔:broker每隔30秒向所有的nameserve傳送心跳,心跳包含topic資訊

心跳超時:nameserver間隔10秒鐘掃描所有還存活的broker連線,若某個broker2分鐘內沒有傳送資料,則斷開連線

c.可用性

rocketmq一般都是使用master/slave結構,slave定期從master讀取資料,一旦master掛掉,消費者定期從slave消費資料,但slave不能寫入資料

d.可靠性

傳送到broker的資料有同步刷盤和非同步刷盤兩種機制,同步刷盤是資料寫道磁碟後,返回確認值,非同步刷盤是先返回在把資料寫入磁碟

e.讀寫效能

利用linux的sendfile機制,將訊息內容直接輸出到sokect管道,避免系統呼叫

採用零拷貝技術,資料操作很快

零拷貝:主要是從檔案的讀取,寫入,傳輸方面考慮,減少了資料的複製,可以參考下篇文章

https://www.linuxjournal.com/article/6345

客戶端定址方式

1.程式碼指定

Producer.setNamesrvAddr(“192.168.122.1:9876”);

Consumer.setNamesrvAddr(“192.168.122.1:9876”);

2.Java啟動時指定引數

-Drocketmq.namesrv.addr=192.168.122.1:9876

3.環境變數指定NameServe地址

export NAMESRV_ADDR=192.168.122.1:987

4.http靜態伺服器定址

客戶端啟動後,會定時訪問一個http靜態伺服器:http://jmenv.tbsite.net:8080/rocketmq/msaddr,該服務響應nameserver地址

客戶端預設每隔2分鐘訪問一次這個HTTP伺服器,並更新本地的NameServer地址。URL已經在程式碼中寫死,可通過修改/etc/hosts檔案來改變要訪問的伺服器,例如在/etc/hosts增加如下配置:

10.232.22.67     jmenv.tbsite.net