1. 程式人生 > >ActiveMQ 叢集配置

ActiveMQ 叢集配置

構建高可用的ActiveMQ系統在生產環境中是非常重要的,單點的ActiveMQ作為企業應用無法滿足高可用和叢集的需求,所以ActiveMQ提供了master-slave、broker cluster等多種部署方式,但通過分析多種部署方式之後我認為需要將兩種部署方式相結合才能滿足我們公司分散式和高可用的需求,所以後面就重點將解如何將兩種部署方式相結合。 

自從activemq5.9.0開始,activemq的叢集實現方式取消了傳統的Pure Master Slave方式,增加了基於zookeeper+leveldb的實現方式,其他兩種方式:目錄共享和資料庫共享依然存在。 

1、Master-Slave部署方式

1)、Shared Filesystem Master-Slave方式 
2)、Shared Database Master-Slave方式 
3)、Replicated LevelDB Store方式 

第一種方案同樣支援N個AMQ例項組網,但由於他是基於kahadb儲存策略,亦可以部署在分散式檔案系統上,應用靈活、高效且安全。 

第二種方案與shared filesystem方式類似,只是共享的儲存介質由檔案系統改成了資料庫而已,支援N個AMQ例項組網,但他的效能會受限於資料庫;  

第三種方案是ActiveMQ5.9以後才新增的特性,使用ZooKeeper協調選擇一個node作為master。被選擇的master broker node開啟並接受客戶端連線。 

其他node轉入slave模式,連線master並同步他們的儲存狀態。slave不接受客戶端連線。所有的儲存操作都將被複制到連線至Master的slaves。 
如果master死了,得到了最新更新的slave被允許成為master。fialed node能夠重新加入到網路中並連線master進入slave mode。所有需要同步的disk的訊息操作都將等待儲存狀態被複制到其他法定節點的操作完成才能完成。所以,如果你配置了replicas=3,那麼法定大小是(3/2)+1=2. Master將會儲存並更新然後等待 (2-1)=1個slave儲存和更新完成,才彙報success。至於為什麼是2-1,熟悉Zookeeper的應該知道,有一個node要作為觀擦者存在。 

單一個新的master被選中,你需要至少保障一個法定node線上以能夠找到擁有最新狀態的node。這個node將會成為新的master。因此,推薦執行至少3個replica nodes,以防止一個node失敗了,服務中斷。 

Shared Filesystem Master-Slave方式

shared filesystem Master-Slave部署方式主要是通過共享儲存目錄來實現master和slave的熱備,所有的ActiveMQ應用都在不斷地獲取共享目錄的控制權,哪個應用搶到了控制權,它就成為master。 

多個共享儲存目錄的應用,誰先啟動,誰就可以最早取得共享目錄的控制權成為master,其他的應用就只能作為slave。 





Apache ActiveMQ單點基本配置的原配置內容: 

<persistenceAdapter> 
            <kahaDB directory="${activemq.data}/kahadb"/> 
</persistenceAdapter> 

SharedFile System Master Slave 修改為: 

<persistenceAdapter> 
             <kahaDB directory="D:\\ActiveMQ Cluster\\shareBrokerData" enableIndexWriteAsync="true"  enableJournalDiskSyncs="false"/> 
</persistenceAdapter> 

在D:\\ActiveMQ Cluster目錄先建立shareBrokerData資料夾。 

注意: 

1.前面提到如果在一臺裝置上部署多個AMQ,需要修改對應埠號,如AMQ對外的監聽埠61616和jetty的監聽埠8161等。 
2.如果多套AMQ部署在不同的裝置上,這裡的directory應該指向一個遠端的系統目錄(分散式檔案系統) 
3.客戶端通過failover方式進行連線,多個AMQ例項地址使用英文逗號隔開,當某個例項斷開時會自動重連,但如果所有例項都失效,failover預設情況下會無限期的等待下去,不會有任何提示。 

下面為在一臺裝置上部署兩個AMQ示例: 
ActiveMQ A 
1.activemq.xml修改監聽埠: 

<transportConnectors> 
            <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB --> 
<!-- add &amp;wireFormat.maxInactivityDuration=0 --> 
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireformat.maxFrameSize=104857600&amp;wireFormat.maxInactivityDuration=0" discoveryUri="multicast://default"/> 
            <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireformat.maxFrameSize=104857600&amp;wireFormat.maxInactivityDuration=0"/>

</transportConnectors> 

2.jetty.xml修改監聽埠: 

<property name="connectors"> 
            <list> 
                <bean id="Connector" class="org.eclipse.jetty.server.nio.SelectChannelConnector"> 
                    <property name="port" value="8166" /> 
                </bean> 
                <!-- 
                    Enable this connector if you wish to use https with web console 
                --> 
                <!-- 
                <bean id="SecureConnector" class="org.eclipse.jetty.server.ssl.SslSelectChannelConnector"> 
                    <property name="port" value="8162" /> 
                    <property name="keystore" value="file:${activemq.conf}/broker.ks" /> 
                    <property name="password" value="password" /> 
                </bean> 
                --> 
            </list> 
</property> 

ActiveMQ B 
1.activemq.xml修改監聽埠: 

<transportConnectors> 
            <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB --> 
<!-- add &amp;wireFormat.maxInactivityDuration=0 --> 
            <transportConnector name="openwire" uri="tcp://0.0.0.0:61617?maximumConnections=1000&amp;wireformat.maxFrameSize=104857600&amp;wireFormat.maxInactivityDuration=0" discoveryUri="multicast://default"/> 
            <transportConnector name="amqp" uri="amqp://0.0.0.0:5673?maximumConnections=1000&amp;wireformat.maxFrameSize=104857600&amp;wireFormat.maxInactivityDuration=0"/>

</transportConnectors> 

2.jetty.xml修改監聽埠: 

<property name="connectors"> 
            <list> 
                <bean id="Connector" class="org.eclipse.jetty.server.nio.SelectChannelConnector"> 
                    <property name="port" value="8167" /> 
                </bean> 
                <!-- 
                    Enable this connector if you wish to use https with web console 
                --> 
                <!-- 
                <bean id="SecureConnector" class="org.eclipse.jetty.server.ssl.SslSelectChannelConnector"> 
                    <property name="port" value="8162" /> 
                    <property name="keystore" value="file:${activemq.conf}/broker.ks" /> 
                    <property name="password" value="password" /> 
                </bean> 
                --> 
            </list> 
</property> 


Java測試程式程式碼: 
1.Producer: 

import javax.jms.Connection; 
import javax.jms.DeliveryMode; 
import javax.jms.Destination; 
import javax.jms.JMSException; 
import javax.jms.MessageProducer; 
import javax.jms.Session; 
import javax.jms.TextMessage; 

import org.apache.activemq.ActiveMQConnectionFactory; 

public class ProducerTool { 

    private String subject = "TOOL.DEFAULT";    

    private Destination destination = null;    

    private Connection connection = null;    

    private Session session = null;    

    private MessageProducer producer = null;    

    // 初始化 
    private void initialize() throws JMSException, Exception {    
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://172.16.30.11:61616?wireFormat.maxInactivityDuration=0,tcp://172.16.30.11:61617?wireFormat.maxInactivityDuration=0)");    
        connection = connectionFactory.createConnection();    
        session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);   
        destination = session.createQueue(subject);    
        producer = session.createProducer(destination);    
        producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 

    }    

    // 傳送訊息    
    public void produceMessage(String message) throws JMSException, Exception {    
        initialize();    
        TextMessage msg = session.createTextMessage(message);    
        connection.start();    
        System.out.println("Producer:->Sending message: " + message);    
        producer.send(msg);    
        System.out.println("Producer:->Message sent complete!");    
    }    

    // 關閉連線     
    public void close() throws JMSException {    
        System.out.println("Producer:->Closing connection");    
        if (producer != null)    
            producer.close();    
        if (session != null)    
            session.close();    
        if (connection != null)    
            connection.close();    
   }    
}  


import javax.jms.Connection; 
import javax.jms.Destination; 
import javax.jms.JMSException; 
import javax.jms.Message; 
import javax.jms.MessageConsumer; 
import javax.jms.MessageListener; 
import javax.jms.Session; 
import javax.jms.TextMessage; 

import org.apache.activemq.ActiveMQConnectionFactory; 

public class ConsumerTool implements MessageListener {      

    private String subject = "TOOL.DEFAULT";    

    private Destination destination = null;    

    private Connection connection = null;    

    private Session session = null;    

    private MessageConsumer consumer = null;    

    // 初始化    
    private void initialize() throws JMSException, Exception {    
       ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://172.16.30.11:61616,tcp://172.16.30.11:61617)"); 
        connection = connectionFactory.createConnection();    
        session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);   
        destination = session.createQueue(subject);    
        consumer = session.createConsumer(destination);    

    }    

    // 消費訊息       
    public void consumeMessage() throws JMSException, Exception {    
        initialize();    
        connection.start();    

        System.out.println("Consumer:->Begin listening...");    
        // 
        consumer.setMessageListener(this);    
        // Message message = consumer.receive();    
    }    

    // 關閉連線   
    public void close() throws JMSException {    
        System.out.println("Consumer:->Closing connection");    
        if (consumer != null)    
            consumer.close();    
        if (session != null)    
            session.close();    
        if (connection != null)    
            connection.close();    
    }    

    // 訊息處理函式  
    public void onMessage(Message message) {    
        try {    
            if (message instanceof TextMessage) {    
                TextMessage txtMsg = (TextMessage) message;    
                String msg = txtMsg.getText();    
                System.out.println("Consumer:->Received: " + msg);    
            } else {    
                System.out.println("Consumer:->Received: " + message);    
            }    
        } catch (JMSException e) {    
            // TODO Auto-generated catch block    
            e.printStackTrace();    
        }    
    }    
}  


2.Consumer: 

import javax.jms.Connection; 
import javax.jms.Destination; 
import javax.jms.JMSException; 
import javax.jms.Message; 
import javax.jms.MessageConsumer; 
import javax.jms.MessageListener; 
import javax.jms.Session; 
import javax.jms.TextMessage; 

import org.apache.activemq.ActiveMQConnectionFactory; 

public class ConsumerTool implements MessageListener {      

    private String subject = "TOOL.DEFAULT";    

    private Destination destination = null;    

    private Connection connection = null;    

    private Session session = null;    

    private MessageConsumer consumer = null;    

    // 初始化    
    private void initialize() throws JMSException, Exception {    
       ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://172.16.30.11:61616,tcp://172.16.30.11:61617)"); 
        connection = connectionFactory.createConnection();    
        session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);   
        destination = session.createQueue(subject);    
        consumer = session.createConsumer(destination);    

    }    

    // 消費訊息       
    public void consumeMessage() throws JMSException, Exception {    
        initialize();    
        connection.start();    

        System.out.println("Consumer:->Begin listening...");    
        // 
        consumer.setMessageListener(this);    
        // Message message = consumer.receive();    
    }    

    // 關閉連線   

相關推薦

JMS學習十二(Spring+ActiveMQ叢集配置

ActiveMQ叢集         ActiveMQ具有強大和靈活的叢集功能,但在使用的過程中會發現很多的缺點,ActiveMQ的叢集方式主要由兩種:Master-Slave和Broker Cluster。 1、Master-Slave         Master-Sl

ActiveMQ 叢集配置

構建高可用的ActiveMQ系統在生產環境中是非常重要的,單點的ActiveMQ作為企業應用無法滿足高可用和叢集的需求,所以ActiveMQ提供了master-slave、broker cluster等多種部署方式,但通過分析多種部署方式之後我認為需要將兩種部署方式相結合才

ActiveMQ叢集配置及使用

為什麼要對訊息中介軟體叢集? 實現高可用,以排除單點故障引起的服務中斷 實現負載均衡,以提升效率為更多使用者提供服務 叢集方式: 客戶端叢集:讓多個消費者消費同一個佇列 Broker clusters:多個Broker之間同步訊息 Master Slave:實現高可用

基於zookeeper的activemq的主從叢集配置

專案,要用到訊息佇列,這裡採用activemq,相對使用簡單點。這裡重點是環境部署。0. 伺服器環境RedHat710.90.7.210.90.7.1010.90.2.1021. 下載安裝zookeeperzookeeper的安裝,採用一臺機器裝3個例項,偽叢集。其實,搭建真

ActiveMQ叢集配置

原文地址: http://activemq.apache.org/clustering.html (第一次翻譯,如有不足,歡迎指正) 叢集是一個很大的範圍總是意味著不同的事對不同的人, 此處我們將列出ActiveMQ的各種叢集 1. Queue consumer clu

訊息佇列系列之ActiveMQ(JMS、叢集配置

1、ActiveMQ的下載與啟動 到http://activemq.apache.org/activemq-5152-release.html下載ActiveMQ windows版本的啟動: 執行bin資料夾中的win32(32位系統)/win64(64位系統)下的: ac

ActiveMQ基本配置

activemq.xml jetty.xml 一、配置登錄監視控制臺1,啟用登錄驗證(authenticate屬性的值設置為true表示需要登錄驗證)e:\apache-activemq-5.14.5\conf\jetty.xml<bean id="securityConstraint" cla

activeMq密碼配置 - 6

password pan web config rain true name 正常 沒有 上一個章中沒有密碼設置怎麽就能接發消息了?? 首先activemq中常見的有兩種密碼:控制臺 和 broker 控制臺的密碼在conf/jetty.xml文件中: <bean

centos7下redis哨兵叢集配置

redis作為一個高效能記憶體資料庫,也常用於系統的快取資料庫,與memcache類似,再生產環境中,當然需要做高可用的結構,即主從複製,替換等功能,可以實現主資料庫掛掉,從庫自動補上,不影響正常使用。 redis的主從,哨兵配置也非常簡單,一主N從,N哨兵都可以。具體的配置方法下面記錄一下: 預設

SSO CAS叢集配置

                         

Apache +Jetty的負載均衡與叢集配置(下)

分別訪問http://192.168.55.229:9009/fgw/index.jsp和http://192.168.55.231:9009/fgw/index.jsp   重新整理會出現新的頁面:   (6)此時訪問http://192.16

Apache +Jetty的負載均衡與叢集配置(上)

                                         

多節點高可用Eureka叢集配置與部署

前言 上一節講的是動態擴容Eureka服務,說實話,一般情況這種操作並不多,一般多用在,由於大量服務節點部署後給Eureka造成壓力突然積增,而解決的辦法。這節講的是一次啟動或部署,直接就是叢集多節點的,多用於服務節點相對穩定的場景。還有筆者這裡有實際部署和應用的經驗分享給大家,就是,我目前25

Mariadb10 galera叢集配置

由於專案需要做冗餘備份,同時瞭解mariadb10自帶了galera叢集功能於是測試,環境Centos7+mariadb10.1 一、安裝mariadb 1、yum安裝很簡單 yum install -y mariadb-server,如何直接安裝將會預設安裝mariadb 5.5,如果需要安裝

zookeeper叢集配置詳細教程

  第一步:環境準備 環境 版本

springboot上傳下載檔案(3)--java api 操作HDFS叢集+叢集配置

 只有光頭才能變強! 前一篇文章講了nginx+ftp搭建獨立的檔案伺服器 但這個伺服器宕機了怎麼辦? 我們用hdfs分散式檔案系統來解決這個問題(同時也為hadoop系列開個頭) 目錄 1、Ubuntu14.04下配置Hadoop(2.8.5)叢集環境詳解(完全分

解決Centos6.4叢集配置ssh免密登陸仍然需要輸入密碼問題

遇到的問題展示:   Ssh免密登陸配置成功之後仍然需要輸入密碼,找了很多方法,最後已解決,現把解決辦法的詳細過程記錄下來,希望對你有幫助。。。       解決辦法:   一、準備工作   1、安裝ssh服務 執行rpm -q

redis cluster叢集配置步驟

1. 配置redis.conf: 建立7000-7005 六個資料夾,拷貝redis.conf到各個資料夾,然後修改配置如下(除了埠以外其他都可以一樣): port 7000 cluster-enabled yes cluster-config-file nodes7000.conf

Windows平臺下的Redis-Sentinel叢集配置

一、環境介紹 Microsoft Windows 10 企業版 Redis-x64-3.2.100 jedis 2.9.0 Sentinel是一個管理多個redis例項的工具,它可以實現對redis的監控、通知、自動故障轉移。sentinel不斷的檢測red

Kafka叢集配置---Windows版

Kafka叢集配置---Windows版   Kafka是一種高吞吐量的分散式釋出訂閱的訊息佇列系統,Kafka對訊息進行儲存時是通過tipic進行分組的。今天我們僅實現Kafka叢集的配置。 前言 最近研究kafka,發現網上很多關於kafka的介紹都是基於Linux作業