1. 程式人生 > >RocketMQ(二)叢集配置

RocketMQ(二)叢集配置

這裡寫圖片描述

這裡寫圖片描述
RocketMQ 網路部署特點

  • Name Server是一個幾乎無狀態節點,可叢集部署,節點之間無任何資訊同步。

  • Broker部署相對複雜,Broker 分為Master與Slave,一個Master 可以對應多個Slave,但是一個Slave只能對應一個Master,Master與Slave 的對應關係通過指定相同的BrokerName,不同的BrokerId來定 義,BrokerId為0 表示Master,非0 表示Slave。Master也可以部署多個。每個 Broker與 Name Server叢集中的所有節點建立長連線,定時註冊 Topic資訊到所有 Name Server。

  • Producer與Name Server叢集中的其中一個節點(隨機選擇)建立長連線,定期從Name Server取Topic 路由資訊,並向提供Topic服務的Master 建立長連線,且定時向Master傳送心跳。Producer完全無 狀態,可叢集部署。

  • Consumer與Name Server叢集中的其中一個節點(隨機選擇)建立長連線,定期從Name Server 取Topic路由資訊,並向提供Topic服務的Master、Slave建立長連線,且定時向Master、Slave傳送心跳。Consumer既可以從Master訂閱訊息,也可以從Slave訂閱訊息,訂閱規則由Broker 配置決定。

這裡寫圖片描述

NameServer叢集 IP地址
NameServer-1 192.168.1.101
NameServer-2 192.168.1.102

分別啟動

nohup sh mqnamesrv &

tail -f -n 500 $ROCKETMQ_HOME/logs/rocketmqlogs/namesrv.log

這裡寫圖片描述

眾所周知,RocketMQ有多種叢集部署方式,它們的配置檔案也是分開的,如下:

這裡寫圖片描述

  • 2m-noslave: 多Master模式
  • 2m-2s-sync: 多Master多Slave模式,同步雙寫
  • 2m-2s-async
    :多Master多Slave模式,非同步複製

RocketMQ預設提供的配置檔案都是最基本的,很多配置都是預設值,如下:

這裡寫圖片描述

這些肯定不滿足我們的要求,我們可以自己手動來配置

#所屬叢集名字
brokerClusterName=rocketmq-cluster
#broker名字,注意此處不同的配置檔案填寫的不一樣
brokerName=broker-a|broker-b
#0 表示 Master,>0 表示 Slave
brokerId=0
#nameServer地址,分號分割
namesrvAddr=192.168.1.101:9876;192.168.1.102:9876
#在傳送訊息時,自動建立伺服器不存在的topic,預設建立的佇列數
defaultTopicQueueNums=4
#是否允許 Broker 自動建立Topic,建議線下開啟,線上關閉
autoCreateTopicEnable=true
#是否允許 Broker 自動建立訂閱組,建議線下開啟,線上關閉
autoCreateSubscriptionGroup=true
#Broker 對外服務的監聽埠
listenPort=10911
#刪除檔案時間點,預設凌晨 4點
deleteWhen=04
#檔案保留時間,預設 48 小時
fileReservedTime=120
#commitLog每個檔案的大小預設1G
mapedFileSizeCommitLog=1073741824
#ConsumeQueue每個檔案預設存30W條,根據業務情況調整
mapedFileSizeConsumeQueue=300000
#destroyMapedFileIntervalForcibly=120000
#redeleteHangedFileInterval=120000
#檢測物理檔案磁碟空間
diskMaxUsedSpaceRatio=88
#儲存路徑
storePathRootDir=/usr/local/alibaba-rocketmq/store
#commitLog 儲存路徑
storePathCommitLog=/usr/local/alibaba-rocketmq/store/commitlog
#消費佇列儲存路徑儲存路徑
storePathConsumeQueue=/usr/local/alibaba-rocketmq/store/consumequeue
#訊息索引儲存路徑
storePathIndex=/usr/local/alibaba-rocketmq/store/index
#checkpoint 檔案儲存路徑
storeCheckpoint=/usr/local/alibaba-rocketmq/store/checkpoint
#abort 檔案儲存路徑
abortFile=/usr/local/alibaba-rocketmq/store/abort
#限制的訊息大小
maxMessageSize=65536

#flushCommitLogLeastPages=4
#flushConsumeQueueLeastPages=2
#flushCommitLogThoroughInterval=10000
#flushConsumeQueueThoroughInterval=60000
#Broker 的角色
#- ASYNC_MASTER 非同步複製Master
#- SYNC_MASTER 同步雙寫Master
#- SLAVE
brokerRole=ASYNC_MASTER

#刷盤方式
#- ASYNC_FLUSH 非同步刷盤
#- SYNC_FLUSH 同步刷盤
flushDiskType=ASYNC_FLUSH
#checkTransactionMessageEnable=false
#發訊息執行緒池數量
#sendMessageThreadPoolNums=128
#拉訊息執行緒池數量
#pullMessageThreadPoolNums=128

這裡寫圖片描述

Broker叢集部署方式主要有以下幾種:(Slave不可寫,但可讀)

這裡寫圖片描述

這種方式風險較大,一旦Broker重啟或者宕機時,會導致整個服務不可用,不建議線上環境使用。

這裡寫圖片描述

一個叢集無 Slave,全是 Master,例如 2 個 Master 或者 3 個 Master。

brokerName brokerId brokerRole IP地址
broker-a 0 ASYNC_MASTER 192.168.1.101
broker-b 0 ASYNC_MASTER 192.168.1.103

優點:配置簡單,單個Master 宕機或重啟維護對應用無影響,在磁碟配置為 RAID10 時,即使機器宕機不可恢復情況下,由於RAID10 磁碟非常可靠,訊息也不會丟(非同步刷盤丟失少量訊息,同步刷盤一條不丟)。效能最高。

缺點:單臺機器宕機期間,這臺機器上未被消費的訊息在機器恢復之前不可訂閱,訊息實時性會受到受到影響。

  • 在192.168.1.101,啟動第一個 Master(-n 192.168.1.101:9876 可省略)
nohup sh mqbroker -c $ROCKETMQ_HOME/conf/2m-noslave/broker-a.properties >/dev/null 2>&1 &

tail -f -n 500 $ROCKETMQ_HOME/logs/rocketmqlogs/broker.log
  • 在192.168.1.103,啟動第二個 Master(-n 192.168.1.101:9876 可省略)
nohup sh mqbroker -c $ROCKETMQ_HOME/conf/2m-noslave/broker-b.properties >/dev/null 2>&1 &

tail -f -n 500 $ROCKETMQ_HOME/logs/rocketmqlogs/broker.log

這裡寫圖片描述

每個 Master 配置一個 Slave,有多對Master-Slave,HA 採用非同步複製方式,主備有短暫訊息延遲,毫秒級。

brokerName brokerId brokerRole IP地址
broker-a 0 ASYNC_MASTER 192.168.1.101
broker-a 1 SLAVE 192.168.1.102
broker-b 0 ASYNC_MASTER 192.168.1.103
broker-b 1 SLAVE 192.168.1.104

優點:即使磁碟損壞,訊息丟失的非常少,且訊息實時性不會受影響,因為 Master 宕機後,消費者仍然可以從 Slave 消費,此過程對應用透明。不需要人工干預。效能同多 Master 模式幾乎一樣。

缺點:Master宕機,磁碟損壞情況,會丟失少量訊息。

  • 在192.168.1.101,啟動第一個 Master(-n 192.168.1.101:9876 可省略)
nohup sh mqbroker -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-a.properties >$ROCKETMQ_HOME/log/mq.log >/dev/null 2>&1 &

tail -f -n 500 $ROCKETMQ_HOME/logs/rocketmqlogs/broker.log
  • 在192.168.1.102,啟動第一個 Slave(-n 192.168.1.101:9876 可省略)
nohup sh mqbroker -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-a-s.properties >$ROCKETMQ_HOME/log/mq.log >/dev/null 2>&1 &

tail -f -n 500 $ROCKETMQ_HOME/logs/rocketmqlogs/broker.log
  • 在192.168.1.103,啟動第二個 Master(-n 192.168.1.101:9876 可省略)
nohup sh mqbroker -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-b.properties >$ROCKETMQ_HOME/log/mq.log >/dev/null 2>&1 &

tail -f -n 500 $ROCKETMQ_HOME/logs/rocketmqlogs/broker.log
  • 在機器 192.168.1.104,啟動第二個 Slave(-n 192.168.1.101:9876 可省略)
nohup sh mqbroker -c $ROCKETMQ_HOME/conf/2m-2s-async/broker-b-s.properties >$ROCKETMQ_HOME/log/mq.log >/dev/null 2>&1 &

這裡寫圖片描述

每個 Master 配置一個 Slave,有多對Master-Slave,HA 採用同步雙寫方式,主備都寫成功,嚮應用才返回成功。

brokerName brokerId brokerRole IP地址
broker-a 0 SYNC_MASTER 192.168.1.101
broker-a 1 SLAVE 192.168.1.102
broker-b 0 SYNC_MASTER 192.168.1.103
broker-b 1 SLAVE 192.168.1.104

優點:資料與服務都無單點,Master宕機情況下,訊息無延遲,服務可用性與資料可用性都非常高

缺點:效能比非同步複製模式略低,大約低10%左右,傳送單個訊息的 RT 會略高。目前主宕機後,備機不能自動切換為主機,後續會支援自動切換功能。

  • 在192.168.1.101,啟動第一個 Master(-n 192.168.1.101:9876 可省略)
nohup sh mqbroker -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a.properties >$ROCKETMQ_HOME/log/mq.log >/dev/null 2>&1 &

tail -f -n 500 $ROCKETMQ_HOME/logs/rocketmqlogs/broker.log
  • 在192.168.1.102,啟動第一個 Slave(-n 192.168.1.101:9876 可省略)
nohup sh mqbroker -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-a-s.properties >$ROCKETMQ_HOME/log/mq.log >/dev/null 2>&1 &

tail -f -n 500 $ROCKETMQ_HOME/logs/rocketmqlogs/broker.log
  • 在192.168.1.103,啟動第二個 Master(-n 192.168.1.101:9876 可省略)
nohup sh mqbroker -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b.properties >$ROCKETMQ_HOME/log/mq.log >/dev/null 2>&1 &

tail -f -n 500 $ROCKETMQ_HOME/logs/rocketmqlogs/broker.log
  • 在192.168.1.104,啟動第二個 Slave(-n 192.168.1.101:9876 可省略)
nohup sh mqbroker -c $ROCKETMQ_HOME/conf/2m-2s-sync/broker-b-s.properties >$ROCKETMQ_HOME/log/mq.log >/dev/null 2>&1 &

tail -f -n 500 $ROCKETMQ_HOME/logs/rocketmqlogs/broker.log

以上 Broker 與 Slave 配對是通過指定相同的brokerName 引數來配對,Master 的 BrokerId必須是 0
Slave的BrokerId必須是大於 0 的數。另外一個 Master 下面可以掛載多個 Slave,同一 Master下的多個 Slave 通過指定不同的 BrokerId 來區分。

這裡寫圖片描述

這裡寫圖片描述

這裡寫圖片描述

這裡寫圖片描述

package com.somnus.rocketmq;

import java.util.concurrent.TimeUnit;

import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.LocalTransactionExecuter;
import com.alibaba.rocketmq.client.producer.LocalTransactionState;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.client.producer.TransactionCheckListener;
import com.alibaba.rocketmq.client.producer.TransactionMQProducer;
import com.alibaba.rocketmq.common.message.Message;
import com.alibaba.rocketmq.common.message.MessageExt;

public class Producer {

    public static void main(String[] args) throws MQClientException, InterruptedException {
        /**
         * 一個應用建立一個Producer,由應用來維護此物件,可以設定為全域性物件或者單例<br>
         * 注意:ProducerGroupName需要由應用來保證唯一,一類Producer集合的名稱,這類Producer通常傳送一類訊息,
         * 且傳送邏輯一致<br>
         * ProducerGroup這個概念傳送普通的訊息時,作用不大,但是傳送分散式事務訊息時,比較關鍵,
         * 因為伺服器會回查這個Group下的任意一個Producer
         */
        final TransactionMQProducer producer = new TransactionMQProducer("ProducerGroupName");
        // nameserver服務
        producer.setNamesrvAddr("172.16.235.77:9876;172.16.235.78:9876");
        producer.setInstanceName("Producer");

        /**
         * Producer物件在使用之前必須要呼叫start初始化,初始化一次即可<br>
         * 注意:切記不可以在每次傳送訊息時,都呼叫start方法
         */
        producer.start();
        // 伺服器回撥Producer,檢查本地事務分支成功還是失敗
        producer.setTransactionCheckListener(new TransactionCheckListener() {

            public LocalTransactionState checkLocalTransactionState(
                    MessageExt msg) {
                System.out.println("checkLocalTransactionState --" + new String(msg.getBody()));
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        });

        /**
         * 下面這段程式碼表明一個Producer物件可以傳送多個topic,多個tag的訊息。
         * 注意:send方法是同步呼叫,只要不拋異常就標識成功。但是傳送成功也可會有多種狀態,<br>
         * 例如訊息寫入Master成功,但是Slave不成功,這種情況訊息屬於成功,但是對於個別應用如果對訊息可靠性要求極高,<br>
         * 需要對這種情況做處理。另外,訊息可能會存在傳送失敗的情況,失敗重試由應用來處理。
         */

        for (int i = 0; i < 10; i++) {
            try {
                {
                    Message msg = new Message("TopicTest1", // topic
                            "TagA",                         // tag
                            "OrderID001",                   // key訊息關鍵詞,多個Key用KEY_SEPARATOR隔開(查詢訊息使用)
                            ("Hello MetaQA").getBytes());   // body
                    SendResult sendResult = producer.sendMessageInTransaction(
                            msg, new LocalTransactionExecuter(){
                                public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
                                    System.out.println("executeLocalTransactionBranch--msg=" + new String(msg.getBody()));
                                    System.out.println("executeLocalTransactionBranch--arg=" + arg);
                                    return LocalTransactionState.COMMIT_MESSAGE;
                                }
                            },
                            "$$$");
                    System.out.println(sendResult);
                }

                {
                    Message msg = new Message("TopicTest2", // topic
                            "TagB",                         // tag
                            "OrderID0034",                  // key 訊息關鍵詞,多個Key用KEY_SEPARATOR隔開(查詢訊息使用)
                            ("Hello MetaQB").getBytes());   // body
                    SendResult sendResult = producer.sendMessageInTransaction(
                            msg, new LocalTransactionExecuter(){
                                public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
                                    System.out.println("executeLocalTransactionBranch--msg=" + new String(msg.getBody()));
                                    System.out.println("executeLocalTransactionBranch--arg=" + arg);
                                    return LocalTransactionState.COMMIT_MESSAGE;
                                }
                            },
                            "$$$");
                    System.out.println(sendResult);
                }

                {
                    Message msg = new Message("TopicTest3", // topic
                            "TagC",                         // tag
                            "OrderID061",                   // key
                            ("Hello MetaQC").getBytes());   // body
                    SendResult sendResult = producer.sendMessageInTransaction(
                            msg, new LocalTransactionExecuter(){
                                public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
                                    System.out.println("executeLocalTransactionBranch--msg=" + new String(msg.getBody()));
                                    System.out.println("executeLocalTransactionBranch--arg=" + arg);
                                    return LocalTransactionState.COMMIT_MESSAGE;
                                }
                            },
                            "$$$");
                    System.out.println(sendResult);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
            TimeUnit.MILLISECONDS.sleep(1000);
        }

        /**
         * 應用退出時,要呼叫shutdown來清理資源,關閉網路連線,從MetaQ伺服器上登出自己
         * 注意:我們建議應用在JBOSS、Tomcat等容器的退出鉤子裡呼叫shutdown方法
         */
        // producer.shutdown();
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
            public void run() {
                producer.shutdown();
            }
        }));
        System.exit(0);
    } // 執行本地事務,由客戶端回撥

}

這裡寫圖片描述

package com.somnus.rocketmq;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import com.alibaba.rocketmq.client.consumer.DefaultMQPullConsumer;
import com.alibaba.rocketmq.client.consumer.PullResult;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.common.message.MessageExt;  
import com.alibaba.rocketmq.common.message.MessageQueue;

public class Consumer {

    // Java快取  
    private static final Map<MessageQueue, Long> offseTable = new HashMap<MessageQueue, Long>();  

    /** 
     * 主動拉取方式消費 
     *  
     * @throws MQClientException 
     */  
    public static void main(String[] args) throws MQClientException {  
        /** 
         * 一個應用建立一個Consumer,由應用來維護此物件,可以設定為全域性物件或者單例<br> 
         * 注意:ConsumerGroupName需要由應用來保證唯一 ,最好使用服務的包名區分同一服務,一類Consumer集合的名稱, 
         * 這類Consumer通常消費一類訊息,且消費邏輯一致 
         * PullConsumer:Consumer的一種,應用通常主動呼叫Consumer的拉取訊息方法從Broker拉訊息,主動權由應用控制 
         */  
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("ConsumerGroupName");  
        // //nameserver服務  
        consumer.setNamesrvAddr("172.16.235.77:9876;172.16.235.78:9876");  
        consumer.setInstanceName("Consumber");  
        consumer.start();

        // 拉取訂閱主題的佇列,預設佇列大小是4  
        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("TopicTest1");  
        for (MessageQueue mq : mqs) {  
            System.out.println("Consume from the queue: " + mq);  
            SINGLE_MQ: while (true) {  
                try {  
                    PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);  
                    List<MessageExt> list = pullResult.getMsgFoundList();  
                    if (list != null && list.size() < 100) {  
                        for (MessageExt msg : list) {  
                            System.out.println(new String(msg.getBody()));  
                        }  
                    }  
                    System.out.println(pullResult.getNextBeginOffset());  
                    putMessageQueueOffset(mq, pullResult.getNextBeginOffset());  
                    switch (pullResult.getPullStatus()) {
                        case FOUND:  
                            break;  
                        case NO_MATCHED_MSG:  
                            break;  
                        case NO_NEW_MSG:  
                            break SINGLE_MQ;  
                        case OFFSET_ILLEGAL:  
                            break;  
                        default:  
                            break;  
                    }  
                } catch (Exception e) {  
                    e.printStackTrace();  
                }  
            }  
        }  
        consumer.shutdown();  
    }  

    private static void putMessageQueueOffset(MessageQueue mq, long offset) {  
        offseTable.put(mq, offset);  
    }  

    private static long getMessageQueueOffset(MessageQueue mq) {  
        Long offset = offseTable.get(mq);  
        if (offset != null) {  
            System.out.println(offset);  
            return offset;  
        }  
        return 0;  
    }  
}