1. 程式人生 > >《RocketMQ技術內幕:RocketMQ架構設計與實現原理》—1.1.2 Eclipse除錯RocketMQ原始碼

《RocketMQ技術內幕:RocketMQ架構設計與實現原理》—1.1.2 Eclipse除錯RocketMQ原始碼

1.1.2 Eclipse除錯RocketMQ原始碼

本節將展示在Eclipse中啟動NameServer、Broker,並執行訊息傳送與訊息消費示例程式。
1.啟動NameServer
Step1:展開namesrv模組,右鍵NamesrvStartup.java,移動到Debug As,選中Debug Configurations,彈出Debug Configurations對話方塊,如圖1-14所示。
Step2:選中Java Application條目並單擊右鍵,選擇New彈出Debug Configurations對話方塊,如圖1-15所示。
Step3:設定RocketMQ執行主目錄。選擇Environment選項卡,新增環境變數ROCKET_HOME。
Step4:在RocketMQ執行主目錄中建立conf、logs、store三個資料夾,如圖1-16所示。
image


image
image
Step5:從RocketMQ distribution部署目錄中將broker.conf、logback_broker.xml檔案複製到conf目錄中,logback_namesrv.xml檔案則只需修改日誌檔案的目錄,broker.conf檔案內容如下所示。
程式碼清單1-3 broker.conf檔案
brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0

nameServer地址,分號分割

namesrvAddr=127.0.0.1:9876
deleteWhen=04
fileReservedTime=48
brokerRole=ASYNC_MASTER
flushDiskType=ASYNC_FLUSH

儲存路徑

storePathRootDir=D:\rocketmq\store

commitLog 儲存路徑

storePathCommitLog=D:\rocketmq\store\commitlog

消費佇列儲存路徑

storePathConsumeQueue=D:\rocketmq\store\consumequeue

訊息索引儲存路徑

storePathIndex=D:\rocketmq\store\index

checkpoint 檔案儲存路徑

storeCheckpoint=D:\rocketmq\store\checkpoint

abort 檔案儲存路徑

abortFile=D:\rocketmq\store\abort
Step6:在Eclipse Debug中執行NamesrvStartup,並輸出“The Name Server boot success. Serializetype=JSON”。
2.啟動Broker
Step1:展開broker模組,右鍵BrokerStartup.java,移動到Debug As,選中Debug Configurations,彈出如圖1-17所示的對話方塊,選擇arguments選項卡,配置-c屬性指定broker配置檔案路徑。

image
Step2:切換選項卡Environment,配置RocketMQ主目錄,如圖1-18所示。

image
Step3:以Debug模式執行BrokerStartup.java,檢視${ROCKET_HOME}/logs/broker.log檔案,未報錯則表示啟動成功。
程式碼清單1-4 broker啟動日誌截圖
2018-03-22 20:47:29 INFO main - register broker to name server 127.0.0.1:9876 OK
2018-03-22 20:47:29 INFO main - The broker[broker-a, 192.168.1.3:10911] boot success. serializeType=JSON and name server is 127.0.0.1:9876
2018-03-22 20:47:38 INFO BrokerControllerScheduledThread1 - dispatch behind commit log 0 bytes
2018-03-22 20:47:38 INFO BrokerControllerScheduledThread1 - Slave fall behind master: 0 bytes
2018-03-22 20:47:39 INFO BrokerControllerScheduledThread1 - register broker to name server 127.0.0.1:9876 OK
2018-03-22 20:48:09 INFO BrokerControllerScheduledThread1 - register broker to name server 127.0.0.1:9876 OK
2018-03-22 20:48:37 INFO BrokerControllerScheduledThread1 - dispatch behind commit log 0 bytes
2018-03-22 20:48:37 INFO BrokerControllerScheduledThread1 - Slave fall behind master: 0 bytes
2018-03-22 20:48:39 INFO BrokerControllerScheduledThread1 - register broker to name server 127.0.0.1:9876 OK
2018-03-22 20:49:09 INFO BrokerControllerScheduledThread1 - register broker to name server 127.0.0.1:9876 OK
3.使用RocketMQ提供的例項驗證訊息傳送與訊息消費
Step1:修改org.apache.rocketmq.example.quickstart.Producer示例程式,設定訊息生產者NameServer地址。
程式碼清單1-5 訊息傳送示例程式
public class Producer {

public static void main(String[] args) throws MQClientException, 
                InterruptedException {
    DefaultMQProducer producer = new 
                DefaultMQProducer("please_rename_unique_group_name");
    producer.setNamesrvAddr("127.0.0.1:9876");
    producer.start();
    for (int i = 0; i < 1; i++) {
        try {
            Message msg = new Message("TopicTest"/* Topic */,"TagA"/* Tag */,
                ("Hello RocketMQ " + i).getBytes
                    (RemotingHelper.DEFAULT_CHARSET)/* Message body */
                );
            SendResult sendResult = producer.send(msg);
            System.out.printf("%s%n", sendResult);
        } catch (Exception e) {
            e.printStackTrace();
            Thread.sleep(1000);
        }
    }
    producer.shutdown();
}

}
Step2:執行該示例程式,檢視執行結果,如果輸出程式碼清單1-6所示結果則表示訊息傳送成功。
程式碼清單1-6 訊息傳送結果
SendResult [sendStatus=SEND_OK, msgId=C0A8010325B46D06D69C70A211400000,
offsetMsgId=C0A8010300002A9F0000000000000000, messageQueue=MessageQueue
[topic=TopicTest, brokerName=broker-a, queueId=0], queueOffset=0]
Step3:修改org.apache.rocketmq.example.quickstart.Consumer示例程式,設定訊息消費者NameServer地址。
程式碼清單1-7 訊息消費示例程式
public class Consumer {

public static void main(String[] args) throws InterruptedException, 
        MQClientException {
    DefaultMQPushConsumer consumer = new 
        DefaultMQPushConsumer("please_rename_unique_group_name_4");
    consumer.setNamesrvAddr("127.0.0.1:9876");
    consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    consumer.subscribe("TopicTest", "*");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
            System.out.printf("%s Receive New Messages: %s %n", 
                Thread.currentThread().getName(), msgs);
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
consumer.start();
System.out.printf("Consumer Started.%n");
}

}
Step4:執行訊息消費者程式,如果輸出如下所示則表示訊息消費成功。
程式碼清單1-8 訊息消費結果
Consumer Started.
ConsumeMessageThread_1 Receive New Messages: [MessageExt [queueId=0,
storeSize=178, queueOffset=0, sysFlag=0, bornTimestamp=1521723269443,
bornHost=/192.168.1.3:57034, storeTimestamp=1521723269510,
storeHost=/192.168.1.3:10911, msgId=C0A8010300002A9F0000000000000000,
commitLogOffset=0, bodyCRC=613185359, reconsumeTimes=0,
preparedTransactionOffset=0, toString()=Message [topic=TopicTest, flag=0,
properties={MIN_OFFSET=0, MAX_OFFSET=1, CONSUME_START_TIME=1521723841419,
UNIQ_KEY=C0A8010325B46D06D69C70A211400000, WAIT=true, TAGS=TagA}, body=16]]]
訊息傳送與訊息消費都成功,則說明RocketMQ除錯環境已經成功搭建了,可以直接Debug原始碼,探知RocketMQ的實現奧祕了。