1. 程式人生 > >架構設計:系統間通訊——ActiveMQ叢集方案(上)

架構設計:系統間通訊——ActiveMQ叢集方案(上)

1、綜述

通過之前的文章,我們討論了ActiveMQ的基本使用,包括單個ActiveMQ服務節點的效能特徵,關鍵調整引數;我們還介紹了單個ActiveMQ節點上三種不同的持久化儲存方案,並討論了這三種不同的持久化儲存方案的配置和效能特點。但是這還遠遠不夠,因為在生產環境中為了保證讓我們設計的訊息服務方案能夠持續工作,我們還需要為訊息中介軟體服務搭建叢集環境,從而在保證訊息中介軟體服務可靠性和處理效能。

2、ActiveMQ多節點方案

叢集方案主要為了解決系統架構中的兩個關鍵問題:高可用和高效能。ActiveMQ服務的高可用性是指,在ActiveMQ服務效能不變、資料不丟失的前提下,確保當系統災難出現時ActiveMQ能夠持續提供訊息服務,高可靠性方案最終目的是減少整個ActiveMQ停止服務的時間

ActiveMQ服務的高效能是指,在保證ActiveMQ服務持續穩定性、資料不丟失的前提下,確保ActiveMQ叢集能夠在單位時間內吞吐更高數量的訊息、確保ActiveMQ叢集處理單條訊息的時間更短、確保ActiveMQ叢集能夠容納更多的客戶端穩定連線。

下面我們分別介紹如何通過多個ActiveMQ服務節點叢集方式,分別提供熱備方案和高效能方案。最後我們討論如何將兩種方案結合在一起,最終形成在生成環境下使用的推薦方案。

2-2、ActiveMQ高效能方案

ActiveMQ的多節點叢集方案,主要有動態叢集和靜態叢集兩種方案。所謂動態叢集就是指,同時提供訊息服務的ActiveMQ節點數量、位置(IP和埠)是不確定的

,當某一個節點啟動後,會通過網路組播的方式向其他節點發送通知(同時接受其他節點的組播資訊)。當網路中其他節點收到組播通知後,就會向這個節點發起連線,最終將新的節點加入ActiveMQ叢集;所謂靜態叢集是指同時提供訊息服務的多個節點的位置(IP和埠)是確定的,每個節點不需要通過廣播的方式發現目標節點,只需要在啟動時按照給定的位置進行連線。

  • 靜態叢集方案

    這裡寫圖片描述

  • 動態叢集方案

這裡寫圖片描述

2-1-1、基於組播(multicast)的節點發現

在使用動態叢集配置時,當某個ActiveMQ服務節點啟動後並不知道整個網路中還存在哪些其他的服務節點。所以ActiveMQ叢集需要規定一種節點與節點間的發現機制,以保證能夠解決上述問題。ActiveMQ叢集中,使用“組播”原理進行其他節點的發現

組播(multicast)基於UDP協議,它是指在一個可連通的網路中,某一個數據報傳送源向一組資料報接收目標進行操作的過程。在這個過程中,資料報傳送者只需要向這個組播地址(一個D類IP)傳送一個數據報,那麼加入這個組播地址的所有接收者都可以收到這個資料報。組播實現了網路中單點到多點的高效資料傳送,能夠節約大量網路頻寬,降低網路負載。

這裡寫圖片描述

在IP協議中,規定的D類IP地址為組播地址。224.0.0.0~239.255.255.255這個範圍內的IP都是D類IP地址,其中有一些IP段是保留的有特殊含義的:

  • 224.0.0.0~224.0.0.255:這個D類IP地址段為保留地址,不建議您在開發過程中使用,因為可能產生衝突。例如224.0.0.5這個組播地址專供OSPF協議(是一種路由策略協議,用於找到最優路徑)使用的組播地址;224.0.0.18這個組播地址專供VRRP協議使用(VRRP協議是虛擬路由器冗餘協議)。

  • 224.0.1.0~224.0.1.255:這個D類IP地址為公用組播地址,用於在整個Internet網路上進行組播。除非您有頂級DNS的控制/改寫許可權,否則不建議在區域網內使用這個組播地址斷。

  • 239.0.0.0~239.255.255.255:這個D類IP地址段為推薦在區域網內使用的組播地址段。注意,如果要在區域網內使用組播功能,需要區域網中的交換機/路由器支援組播功能。幸運的是,目前市面上只要不是太過低端的交換機/路由器,都支援組播功能(組播功能所使用的主要協議為IGMP協議,關於IGMP協議的細節就不再進行深入了)。

下面我們使用java語言,編寫一個區域網內的組播發送和接受過程。以便讓各位讀者對基於組播的節點發現操作有一個直觀的理解。雖然ActiveMQ中關於節點發現的過程,要比以下的示例複雜得多,但是基本原理是不會改變的。

  • 組播資料報傳送者:
package multicast;

import java.net.DatagramPacket;
import java.net.InetAddress;
import java.net.MulticastSocket;
import java.util.Date;

public class SendMulticast {
    public static void main(String[] args) throws Throwable {
        // 組播地址
        InetAddress group = InetAddress.getByName("239.0.0.5");
        // 組播埠,同時也是UDP 資料報的傳送埠
        int port = 19999;
        MulticastSocket mss = null;  

        // 建立一個用於傳送/接收的MulticastSocket組播套接字物件
        mss = new MulticastSocket(port);
        // 建立要傳送的組播資訊和UDP資料報
        // 攜帶的資料內容,就是這個activeMQ服務節點用來提供Network Connectors的TCP/IP地址和埠等資訊
        String message = "我是一個活動的activeMQ服務節點(節點編號:yyyyyyy),我的可用tcp資訊為:XXXXXXXXXX : ";  
        byte[] buffer2 = message.getBytes(); 
        DatagramPacket dp = new DatagramPacket(buffer2, buffer2.length, group, port);
        // 使用組播套接字joinGroup(),將其加入到一個組播
        mss.joinGroup(group);

        // 開始按照一定的週期向加入到224.0.0.5組播地址的其他ActiveMQ服務節點進行廣播
        Thread thread = Thread.currentThread();
        while (!thread.isInterrupted()) {
            // 使用組播套接字的send()方法,將組播資料包物件放入其中,傳送組播資料包
            mss.send(dp);
            System.out.println(new Date() + "發起組播:" + message);
            synchronized (SendMulticast.class) {
                SendMulticast.class.wait(5000);
            }
        }

        mss.close();
    }
}
  • 組播資料報接收者:
package multicast;

import java.net.DatagramPacket;
import java.net.InetAddress;
import java.net.MulticastSocket;

/**
 * 測試接收組播資訊
 * @author yinwenjie
 */
public class AcceptMulticast {
    public static void main(String[] args) throws Throwable {
        // 建立組播套接字,並加入分組
        MulticastSocket multicastSocket = new MulticastSocket(19999);
        // 注意,組播地址和埠必須和傳送者的一直,才能加入正確的組
        InetAddress ad = InetAddress.getByName("239.0.0.5");
        multicastSocket.joinGroup(ad);

        // 準備接收可能的組播訊號
        byte[] datas = new byte[2048];
        DatagramPacket data = new DatagramPacket(datas, 2048 ,ad , 19999);
        Thread thread = Thread.currentThread();

        // 開始接收組播資訊,並打印出來
        System.out.println(".....開始接收組播資訊.....");
        while(!thread.isInterrupted()) {
            multicastSocket.receive(data);
            int leng = data.getLength();
            System.out.println(new String(data.getData() , 0 , leng , "UTF-8"));
        }

        multicastSocket.close();
    }
}

另外,我們之前講過的DUBBO框架中,也有基於“組播”的發現/註冊管理。具體可參考DUBBO框架中的com.alibaba.dubbo.registry.multicast.MulticastRegistry類和其引用類(以下為MulticastRegistry類中,建立組播套接字和接受組播資料報的關鍵程式碼段):

......
mutilcastAddress = InetAddress.getByName(url.getHost());
mutilcastPort = url.getPort() <= 0 ? DEFAULT_MULTICAST_PORT : url.getPort();
mutilcastSocket = new MulticastSocket(mutilcastPort);
mutilcastSocket.setLoopbackMode(false);
mutilcastSocket.joinGroup(mutilcastAddress);
Thread thread = new Thread(new Runnable() {
    public void run() {
        byte[] buf = new byte[2048];
        DatagramPacket recv = new DatagramPacket(buf, buf.length);
        while (! mutilcastSocket.isClosed()) {
            try {
                mutilcastSocket.receive(recv);
                String msg = new String(recv.getData()).trim();
                int i = msg.indexOf('\n');
                if (i > 0) {
                    msg = msg.substring(0, i).trim();
                }
                MulticastRegistry.this.receive(msg, (InetSocketAddress) recv.getSocketAddress());
                Arrays.fill(buf, (byte)0);
            } catch (Throwable e) {
                if (! mutilcastSocket.isClosed()) {
                    logger.error(e.getMessage(), e);
                }
            }
        }
    }
}, "DubboMulticastRegistryReceiver");
thread.setDaemon(true);
thread.start();
......

2-1-2、橋接Network Bridges

為了實現ActiveMQ叢集的橫向擴充套件要求和高穩定性要求,ActiveMQ叢集提供了Network Bridges功能。通過Network Bridges功能,技術人員可以將多個ActiveMQ服務節點連線起來。並讓它們通過配置好的策略作為一個整體對外提供服務。

這樣的服務策略主要包括兩種:主/從模式和負載均衡模式。對於第一種策略我們會在後文進行討論。本節我們要重點討論的是基於Network Bridges的負載均衡模式。

這裡寫圖片描述

2-1-3、動態Network Connectors

既然已經講述了ActiveMQ中的動態節點發現原理和ActiveMQ Network Bridges的概念,那麼關於ActiveMQ怎樣配置叢集的方式就是非常簡單的問題了。我們先來討論如何進行基於組播發現的ActiveMQ負載均衡模式的配置——動態網路連線Network Connectors;再來討論基於固定地址的負載均衡模式配置——靜態網路連線Network Connectors。

要配置基於組播發現的ActiveMQ負載均衡模式,其過程非常簡單。開發人員只需要在每一個ActiveMQ服務節點的主配置檔案中(activemq.xml),新增/更改 以下配置資訊即可:

......
<transportConnectors>
    <!-- 在transportConnector中增加discoveryUri屬性,表示這個transportConnector是要通過組播告知其它節點的:使用這個transportConnector位置連線我 -->
    <transportConnector name="auto" uri="auto+nio://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600&amp;org.apache.activemq.transport.nio.SelectorManager.corePoolSize=20&amp;org.apache.activemq.transport.nio.SelectorManager.maximumPoolSize=50&amp;consumer.prefetchSize=5" discoveryUri="multicast://239.0.0.5" />
</transportConnectors>

......

<!-- 關鍵的networkConnector標籤, uri屬性標示為組播發現-->
<networkConnectors>
    <networkConnector uri="multicast://239.0.0.5" duplex="false"/>
</networkConnectors>

......

2-1-3-1:networkConnector標籤

如果使用ActiveMQ的組播發現功能,請在networkConnector標籤的uri屬性中新增如下格式的資訊:

multicast://[組播地址][:埠]

例如,您可以按照如下方式使用ActiveMQ預設的組播地址來發現網路種其他ActiveMQ服務節點:

#ActiveMQ叢集預設的組播地址(239.255.2.3):
multicast://default

也可以按照如下方式,指定一個組播地址——這在高安全級別的網路中很有用,因為可能其他的組播地址已經被管理員禁用。注意組播地址只能是D類IP地址段:

#使用組播地址239.0.0.5
multicast://239.0.0.5

以下是通過抓包軟體獲得的的組播UDP報文:

這裡寫圖片描述

從上圖中我們可以獲得幾個關鍵資訊:

  • 192.168.61.138和192.168.61.139這兩個IP地址分別按照一定的週期(1秒一次),向組播地址239.0.0.5傳送UDP資料報。以便讓在這個組播地址的其它服務節點能夠感知自己的存在

  • 另外,以上UDP資料報文使用的埠是6155。您也可以更改這個埠資訊通過類似如下的方式:

#使用組播地址239.0.0.5:19999
multicast://239.0.0.5:19999
  • 每個UDP資料報中,包含的主要資訊包括本節點ActiveMQ的版本資訊,以及連線到自己所需要使用的host名字、協議名和埠資訊。類似如下:
default.ActiveMQ-4.ailve%localhost%auto+nio://activemq:61616

2-1-3-2:transportConnector標籤的關聯設定

任何一個ActiveMQ服務節點A,要連線到另外的ActiveMQ服務節點,都需要使用當前節點A已經公佈的transportConnector連線埠,例如以下配置中,能夠供其它服務節點進行連線的就只有兩個transportConnector連線中的任意一個:

......
<transportConnectors>
    <!-- 其它ActiveMQ服務節點,只能使用以下三個連線協議和埠進行連線 -->
    <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
    <transportConnector name="tcp" uri="tcp://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="nio" uri="nio://0.0.0.0:61618?maximumConnections=1000" />
    <transportConnector name="auto" uri="auto://0.0.0.0:61617?maximumConnections=1000" />   
</transportConnectors>
......

那麼要將哪一個連線方式通過UDP資料報向其他ActiveMQ節點進行公佈,就需要在transportConnector標籤上使用discoveryUri屬性進行標識,如下所示:

......
<transportConnectors>
    ......
    <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="auto" uri="auto+nio://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600" discoveryUri="multicast://239.0.0.5" />
</transportConnectors>

......
<networkConnectors>
    <networkConnector uri="multicast://239.0.0.5"/>
</networkConnectors>
......

2-1-3-3:其他注意事項

  • 關於防火牆:請記得關閉您Linux伺服器上對需要公佈的IP和埠的限制;

  • 關於hosts路由資訊:由於基於組播的動態發現機制,能夠找到的是目標ActiveMQ服務節點的機器名,而不是直接找到的IP。所以請設定當前服務節點的hosts檔案,以便當前ActiveMQ節點能夠通過hosts檔案中的IP路由關係,得到機器名與IP的對映:

    
    # hosts檔案
    
    ......
    192.168.61.139          activemq1
    192.168.61.138          activemq2
    ......
  • 關於哪些協議能夠被用於進行Network Bridges連線:根據筆者以往的使用經驗,只有tcp頭的uri格式(openwire協議)能夠被用於Network Bridges連線;當然您可以使用auto頭,因為其相容openwire協議;另外,您還可以指定為附加nio頭。

2-1-4、靜態Network Connectors

相比於基於組播發現方式的動態Network Connectors而言,雖然靜態Network Connectors沒有那樣靈活的橫向擴充套件性,但是卻可以適用於網路環境受嚴格管理的情況。例如:管理員關閉了交換機/路由器的組播功能、埠受到嚴格管控等等。

配置靜態Network Connectors的ActiveMQ叢集的方式也很簡單,只需要更改networkConnectors標籤中的配置即可,而無需關聯改動transportConnectors標籤。但是配置靜態Network Connectors的ActiveMQ叢集時,需要注意非常關鍵的細節:每一個節點都要配置其他所有節點的連線位置

為了演示配置過程,我們假設ActiveMQ叢集由兩個節點構成,分別是activemq1:192.168.61.138 和 activemq2:192.168.61.139。那麼配置情況如下所示:

  • 192.168.61.138:需要配置activemq2的位置資訊以便進行連線:
......
<transportConnectors>
    <transportConnector name="auto" uri="auto+nio://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600&amp;consumer.prefetchSize=5"/>
</transportConnectors>
......

<!-- 請注意,一定需要192.168.61.139(activemq2)提供了這樣的連線協議和埠 -->
<networkConnectors>
    <networkConnector uri="static:(auto+nio://192.168.61.139:61616)"/>
</networkConnectors>
......
  • 192.168.61.139:需要配置activemq1的位置資訊以便進行連線:
......
<transportConnectors>
    <transportConnector name="auto" uri="auto+nio://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600&amp;consumer.prefetchSize=5"/>
</transportConnectors>

......
<!-- 請注意,一定需要192.168.61.138(activemq1)提供了這樣的連線協議和埠 -->
<networkConnectors>
   <networkConnector uri="static:(auto+nio://192.168.61.138:61616)"/>
</networkConnectors>
......

同理,如果您的ActiveMQ叢集規劃中有三個ActiveMQ服務節點,那麼任何一個節點都應該配置其它兩個服務節點的連線方式。在配置格式中使用“,”符號進行分割:

......
<networkConnectors>
    <networkConnector uri="static:(tcp://host1:61616,tcp://host2:61616,tcp://..)"/>
</networkConnectors>
......

以下是配置完成後可能的效果:

  • 192.168.61.138(activemq1):

這裡寫圖片描述

  • 192.168.61.139(activemq2):

這裡寫圖片描述

2-1-5、其他配置屬性

下表列舉了在networkConnector標籤中還可以使用的屬性以及其意義。請特別注意其中的duplex屬性。如果只從字面意義理解該屬性,則被稱為“雙工模式”;如果該屬性為true,當這個節點使用Network Bridge連線到其它目標節點後,將強制目標也建立Network Bridge進行反向連線。其目的在於讓訊息既能傳送到目標節點,又可以通過目標節點接受訊息,但實際上大多數情況下是沒有必要的,因為目標節點一般都會自行建立連線到本節點。所以,該duplex屬性的預設值為false。

屬性名稱 預設值 屬性意義
name bridge 名稱
dynamicOnly false 如果為true, 持久訂閱被啟用時才建立對應的網路持久訂閱。
decreaseNetworkConsumerPriority false 如果為true,網路的消費者優先順序降低為-5。如果為false,則預設跟本地消費者一樣為0.
excludedDestinations empty 不通過網路轉發的destination
dynamicallyIncludedDestinations empty 通過網路轉發的destinations,注意空列表代表所有的都轉發。
staticallyIncludedDestinations empty 匹配的都將通過網路轉發-即使沒有對應的消費者,如果為預設的“empty”,那麼說明所有都要被轉發
duplex false 已經進行詳細介紹的“雙工”屬性。
prefetchSize 1000 設定網路消費者的prefetch size引數。如果設定成0,那麼就像之前文章介紹過的那樣:消費者會自己輪詢訊息。顯然這是不被允許的。
suppressDuplicateQueueSubscriptions false 如果為true, 重複的訂閱關係一產生即被阻止(V5.3+ 的版本中可以使用)。
bridgeTempDestinations true 是否廣播advisory messages來建立臨時destination。
alwaysSyncSend false 如果為true,非持久化訊息也將使用request/reply方式代替oneway方式傳送到遠端broker(V5.6+ 的版本中可以使用)。
staticBridge false 如果為true,只有staticallyIncludedDestinations中配置的destination可以被處理(V5.6+ 的版本中可以使用)。

以下這些屬性,只能在靜態Network Connectors模式下使用

屬性名稱 預設值 屬性意義
initialReconnectDelay 1000 重連之前等待的時間(ms) (如果useExponentialBackOff為false)
useExponentialBackOff true 如果該屬性為true,那麼在每次重連失敗到下次重連之前,都會增大等待時間
maxReconnectDelay 30000 重連之前等待的最大時間(ms)
backOffMultiplier 2 增大等待時間的係數

請注意這些屬性,並不是networkConnector標籤的屬性,而是在uri屬性中進行設定的,例如:

uri="static:(tcp://host1:61616,tcp://host2:61616)?maxReconnectDelay=5000&useExponentialBackOff=false"