1. 程式人生 > >【ACtiveMQ】四 資料持久化與叢集

【ACtiveMQ】四 資料持久化與叢集

ActiveMQ

一 容錯機制

Failover Protocol                        

 前面講述的都是Client配置連結到指定的broker上。但是,如果Broker的連結失敗怎麼辦呢?此 時,Client有兩個選項:要麼立刻死掉,要麼去連線到其它的broker上。 Failover協議實現了自動重新連結的邏輯。這裡有兩種方式提供了穩定的brokers列表對於 Client連結。第一種方式:提供一個static的可用的Brokers列表。第二種方式:提供一個dynamic 發 現的可用Brokers。

Failover Protocol 的配置方式

 failover:(uri1,...,uriN)?key=value 或者 failover:uri1,...,uriN

Failover Protocol 的預設配置

預設情況下,這種協議用於隨機的去選擇一個連結去連結,如果連結失敗了,那麼會連結到其他 的Broker上。預設的配置定義了延遲重新連結,意味著傳輸將會在10秒後自動的去重新連結可用的 broker。當然所有的重新連結引數都可以根據應用的需要而配置。

Failover Protocol 的使用示例,在客戶端程式裡面:

 ConnectionFactoryconnectionFactory= new

ActiveMQConnectionFactory("failover:(tcp://192.168.1.106:61679,tcp://192.168.1.106:61819) ?randomize=false");

         簡單的說:容錯機制就是客戶端可以在伺服器端,一個伺服器掛了之後,通過failover協議連線到另外的伺服器端。

Failover Protocol 可用的配置引數:

1:initialReconnectDelay:在第一次嘗試重連之前等待的時間長度(毫秒),預設10

2:maxReconnectDelay:最長重連的時間間隔(毫秒),預設30000

3:useExponentialBackOff:重連時間間隔是否以指數形式增長,預設true

4:backOffMultiplier:遞增倍數,預設2.0

5:maxReconnectAttempts: 預設-1|0,自版本5.6起:-1為預設值,代表不限重試次數;0代表從不重試 (只嘗試連線一次,並不重連),5.6以前的版本:0為預設值,代表不限重試次數所有版本:如果設定 為大於0的數,代表最大重試次數

6:startupMaxReconnectAttempts:初始化時的最大重連次數。一旦連線上,將使用maxReconnectAttempts 的配置,預設0

7:randomize:使用隨機連結,以達到負載均衡的目的,預設true

8:backup:提前初始化一個未使用連線,以便進行快速失敗轉移,預設false

9:timeout:設定傳送操作的超時時間(毫秒),預設-1

10:trackMessages:設定是否快取[故障發生時]尚未傳送完成的訊息,當broker一旦重新連線成功,便將 這些快取中的訊息重新整理到新連線的代理中,使得訊息可以在broker切換前後順利傳送,預設false

11:maxCacheSize:當trackMessages啟用時,快取的最大位元組,預設為128*1024bytes

12:updateURIsSupported:設定是否可以動態修改broker uri(自版本5.4起),預設true

二 叢集

2.1 Queue consumer 叢集

 ActiveMQ支援Consumer對訊息高可靠性的負載平衡消費,如果一個Consumer死掉, 該訊息會轉發到其它的Consumer消費的Queue上。如果一個Consumer獲得訊息比其它 Consumer快,那麼他將獲得更多的訊息。因此推薦ActiveMQ的Broker和Client使用

failover://transport的方式來配置連結。 如何實現消費者的負載均衡呢,這個就要更改networkConnector的屬性配置conduitSubscriptions :預設true,是否把同一個broker的多個consumer當做一個來處理,保證在一個消費者端的多個消費者不被當成一個,按比例分配。

2.2 Broker 叢集

大部情況下是使用一系列的Broker和Client連結到一起。如果一個Broker死掉了, Client可以自動連結到其它Broker上。實現以上行為需要用failover協議作為Client。 如果啟動了多個Broker,Client可以使用static discover或者 Dynamic discovery 容易的從一個broker到另一個broker直接連結。 這樣當一個broker上沒有Consumer的話,那麼它的訊息不會被消費的,然而該 broker會通過儲存和轉發的策略來把該訊息發到其它broker上。 特別注意:ActiveMQ預設的兩個broker,static連結後是單方向的,broker-A可以 訪問消費broker-B的訊息,如果要支援雙向通訊,需要在netWorkConnector配置的時候, 設定duplex=true,並且配置上訊息迴流機制就可以了。

2.2.1 方式一 JDBC

利用資料庫作為資料來源,採用Master/Slave模式,其中在啟動的時候Master首先獲 得獨有鎖,其它Slaves Broker則等待獲取獨有鎖。 推薦客戶端使用Failover來連結Brokers。 具體如下圖所示:

如果Master失敗,則它釋放獨有鎖,其他Slaver則獲取獨有鎖,其它Slaver立即獲得獨有鎖後此時它將變成Master,並且啟動所有的傳輸連結。同時,Client將停止連結之前的Master和將會輪詢連結到其他可以利用的Broker即新Master。如上中圖所示

Master重啟 任何時候去啟動新的Broker,即作為新的Slave來加入叢集,如上右圖所示

JDBC Master Slave的配置 使用<jdbcPersistenceAdapter/>來配置訊息的持久化,自動就會使用JDBC Master Slave的方式。

2.2.2 方式二 橋接

2.2.2.1建立ACtiveMQ例項

步驟如下:

1:把整個conf資料夾複製一份,比如叫做conf2                      

2:修改裡面的activemq.xml檔案

(1)裡面的brokerName 不能跟原來的重複

(2)資料存放的檔名稱不能重複,比如: <kahaDBdirectory="${activemq.data}/kahadb_2"/>

(3)所有涉及的transportConnectors 的埠,都要跟前面的不一樣

3:修改jetty.xml,主要就是修改埠,比如: <property name=“port” value=“8181”/> 埠必須和前面的不一樣

4:到bin下面,複製一個activemq,比如叫做activemq2:

(1)修改程式的id,不能和前面的重複 ACTIVEMQ_PIDFILE="$ACTIVEMQ_DATA/activemq2-`hostname`.pid"

(2)修改配置檔案路徑 ACTIVEMQ_CONF="$ACTIVEMQ_BASE/conf2"

(3)修改埠,裡面有個tcp的61616的埠,要改成不一樣的,最好跟activemq.xml裡面的tcp的埠一致

(4)然後就可以執行了,如果執行沒有許可權的話,就授權:chmod751 activemq2

(5) 如果(3),(4)兩步沒有也沒必要補足。

2.2.2.2 ACtiveMQ broker網路

ActiveMQ的networkConnector是什麼 在某些場景下,需要多個ActiveMQ的Broker做叢集,那麼就涉及到Broker到Broker的通訊,這個被稱為ActiveMQ的networkConnector。         ActiveMQ的networkConnector預設是單向的,一個Broker在一端傳送訊息,另一Broker在另一端接收訊息。這就是所謂的“橋接”。 ActiveMQ也支援雙向連結,建立一個雙向的通道對於兩個Broker,不僅傳送訊息而且也能從相同的通道來接收訊息,通常作為duplex connector來對映,如下

discovery

一般情況下,discovery是被用來發現遠端的服務,客戶端通常想去發現所有可利用 的brokers;另一層意思,它是基於現有的網路Broker去發現其他可用的Brokers。 有兩種配置Client到Broker的連結方式,

一種方式:Client通過Statically配置的方式去連線Broker,

一種方式:Client通過discovery agents來dynamically的發現 Brokers

Static networks

 Static networkConnector是用於建立一個靜態的配置對於網路中的多個Broker。這 種協議用於複合url,一個複合url包括多個url地址。格式如下: static:(uri1,uri2,uri3,...)?key=value 1:

2.2.3 <broker>中配置

<networkConnectors>

<networkConnector name="localnetwork" uri="static://(tcp://localhost:61616,tcp:// localhost:61617)"/>

</networkConnectors>

2.2.4 演示結果

Broker2

可以從上面的兩個看出已經搭建起來了一個偽叢集了。

Static networkConnector的基本原理示意圖:

上圖中,兩個Brokers是通過一個static的協議來網路連結的。一個 Consumer連結到brokerB的一個地址上,當 Producer在brokerA上以相同的地址 傳送訊息時,此時它將被轉移到brokerB上。也就是,BrokerA會轉發訊息到 BrokerB上。

現在生產者向61616生成資料

執行傳送者程式碼

public class PersisiTopicSender {

     public static void main(String[] args) throws JMSException {

          ActiveMQConnectionFactory conFactory = new ActiveMQConnectionFactory("tcp://192.168.232.128:61616");

          Connection createConnection = conFactory.createConnection();

          Session createSession = createConnection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);

          Topic createTopic = createSession.createTopic("persisitent");

          MessageProducer createProducer = createSession.createProducer(createTopic);

          createProducer.setDeliveryMode(DeliveryMode.PERSISTENT);

          createConnection.start();

          for(int i=0;i<3;i++){

               TextMessage createTextMessage = createSession.createTextMessage("message"+i);

               createProducer.send(createTextMessage);

          }

          createSession.commit();

          createSession.close();

          createConnection.close();

      }

}

現在我到61617這個broker例項上去消費

此時在61616上已經可以看到相同的佇列,並且執行一次消費者程式碼註冊了一次了。

此時把消費者程式碼停止了,再執行一次就可以消費訊息了

public class PersisiTopicReceiver {

     public static void main(String[] args) throws JMSException {

         ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory("tcp://192.168.232.128:61617");

         Connection createConnection = activeMQConnectionFactory.createConnection();

         createConnection.setClientID("訂閱者B_ID");

         createConnection.start();

         Session createSession = createConnection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);

         Topic createTopic = createSession.createTopic("persisitent");

         TopicSubscriber createDurableSubscriber = createSession.createDurableSubscriber(createTopic, "T1");

         TextMessage message = (TextMessage)createDurableSubscriber.receive();

         while(message!=null){

              System.out.println(message.getText());

              message = (TextMessage)createDurableSubscriber.receive();

         }

         createSession.commit();

         createSession.close();

         createConnection.close();

     }

}

如上已經消費到訊息了。

2.2.5 靜態網路屬性

networkConnector配置的可用屬性:

1:name:預設是bridge

2:dynamicOnly:預設是false,如果為true, 持久訂閱被啟用時才建立對應的網路持久訂閱。預設是啟動時啟用

3:decreaseNetworkConsumerPriority:預設是false。設定消費者優先權,如果為true,網路的消費者優先順序降低 為-

5。如果為false,則預設跟本地消費者一樣為0 4:networkTTL :預設是1 ,網路中用於訊息和訂閱消費的broker數量

5:messageTTL :預設是1 ,網路中用於訊息的broker數量

6:consumerTTL:預設是1 ,網路中用於消費的broker數量

7:conduitSubscriptions :預設true,是否把同一個broker的多個consumer當做一個來處理

8:dynamicallyIncludedDestinations :預設為空,要包括的動態訊息地址,類似於excludedDestinations,如: <dynamicallyIncludedDestinations> <queue physicalName="include.test.foo"/> <topic physicalName="include.test.bar"/> </dynamicallyIncludedDestinations>

9:staticallyIncludedDestinations :預設為空,要包括的靜態訊息地址。類似於excludedDestinations,如: <staticallyIncludedDestinations> <queue physicalName="always.include.queue"/> </staticallyIncludedDestinations>

10:excludedDestinations :預設為空,指定排除的地址,示例如下: <networkConnectors> <networkConnectoruri="static://(tcp://localhost:61617)" name="bridge" dynamicOnly="false" conduitSubscriptions="true" decreaseNetworkConsumerPriority="false"> <excludedDestinations> <queue physicalName="exclude.test.foo"/> <topic physicalName="exclude.test.bar"/> </excludedDestinations> <dynamicallyIncludedDestinations> <queue physicalName="include.test.foo"/> <topic physicalName="include.test.bar"/> </dynamicallyIncludedDestinations> <staticallyIncludedDestinations> <queue physicalName="always.include.queue"/> <topic physicalName="always.include.topic"/> </staticallyIncludedDestinations> </networkConnector> </networkConnectors>

11duplex :預設false,設定是否能雙向通訊

12:prefetchSize :預設是1000,持有的未確認的最大訊息數量,必須大於0,因 為網路消費者不能自己輪詢訊息

13:suppressDuplicateQueueSubscriptions:預設false,如果為true, 重複的訂 閱關係一產生即被阻止

14:bridgeTempDestinations :預設true,是否廣播advisory messages來建立臨 時destination 15:alwaysSyncSend :預設false,如果為true,非持久化訊息也將使用 request/reply方式代替oneway方式傳送到遠端broker。

16:staticBridge :預設false,如果為true,只有 staticallyIncludedDestinations中配置的destination可以被處理。

如上屬性所示,11就是讓borker1broker2可以互相通訊的屬性配置,實際上之前的演示訊息是隻能從61616流轉至61617的,如果要讓訊息可以迴流行程一個網,那麼我們要在networkConnector上配置duplex這樣訊息佇列才會形成一個網狀的叢集。

2.2.6 訊息迴流

配置完duplex以後,我們會認為他就成為了一個叢集了,實際上不是這樣的,duplex只是決定了,生產者不管是在61616,還是61617我們都可以在網路連線的另一方通過消費者消費到訊息。但是當訊息從61616流轉帶61617以後,沒有被消費完的這部分訊息是不會因為消費者在61616消費而回流的。這就是我們所說的訊息丟失。這是一個嚴重的問題。

<destinationPolicy>

<policyMap>

<policyEntries>

<policyEntry queue=">"

enableAudit="false">

<networkBridgeFilterFactory>

<conditionalNetworkBridgeFilterFactory replayWhenNoConsumers="true"/> </networkBridgeFilterFactory>              

         </policyEntry>

        </policyEntries>

 </policyMap>

</destinationPolicy>

此時就要求我們在activemq.xml中進行配置,能夠讓訊息進行迴流,真正的行程一個叢集。

加上上面這一段,儘量兩臺機都要進行配置。

2.2.6.1 傳送者

public class MsgSendder {

    public static void main(String[] args) throws Exception {

        ActiveMQConnectionFactory ConnectionFactoryconnectionFactory = new ActiveMQConnectionFactory("tcp://192.168.232.128:61617");

        Connection connection = ConnectionFactoryconnectionFactory.createConnection();

        connection.start();

        Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);

        Destination destination = session.createQueue("persisitent");

        MessageProducer producer = session.createProducer(destination);

            for (int i = 0; i < 20; i++) {

                TextMessage message = session.createTextMessage("message--" + i);

                    producer.send(message);

            }

            session.commit();

            session.close();

            connection.close();

        }

}

2.2.6.2 61616 消費者

public class PersisiTopicReceiver {

    public static void main(String[] args) throws JMSException, InterruptedException {

        ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory("tcp://192.168.232.128:61616");

        Connection createConnection = activeMQConnectionFactory.createConnection();

        createConnection.setClientID("B_ID");

        createConnection.start();

        final Session createSession = createConnection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);

        for(int i=0;i<30;i++){

            Queue destination = createSession.createQueue("persisitent");

            MessageConsumer consumer = createSession.createConsumer(destination);

            consumer.setMessageListener(new MessageListener() {

               

                public void onMessage(Message message) {

                    TextMessage msg = (TextMessage)message;

                    System.out.println(msg);

                    try {

                        Thread.sleep(2000);

                        createSession.commit();

                    } catch (InterruptedException e) {

                        // TODO Auto-generated catch block

                        e.printStackTrace();

                    } catch (JMSException e) {

                        // TODO Auto-generated catch block

                        e.printStackTrace();

                    }

                   

                }

            });

        }

        createSession.close();

        createConnection.close();

    }

}

2.2.6.3 61617 消費者

public class PersisiTopicReceiver2 {

    public static void main(String[] args) throws JMSException, InterruptedException {

        ActiveMQConnectionFactory activeMQConnectionFactory=new ActiveMQConnectionFactory("tcp://192.168.232.128:61617");

        Connection createConnection = activeMQConnectionFactory.createConnection();

        createConnection.setClientID("B_ID2");

        createConnection.start();

        final Session createSession = createConnection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);

        for(int i=0;i<30;i++){

            Queue destination = createSession.createQueue("persisitent");

            MessageConsumer consumer = createSession.createConsumer(destination);

            consumer.setMessageListener(new MessageListener() {

                public void onMessage(Message message) {

                    TextMessage msg = (TextMessage)message;

                    System.out.println(msg);

                    try {

                        createSession.commit();

                        Thread.sleep(2000);

                    } catch (InterruptedException e) {

                        // TODO Auto-generated catch block

                        e.printStackTrace();

                    } catch (JMSException e) {

                        // TODO Auto-generated catch block

                        e.printStackTrace();

                    }

                }

            });

        }

       

        createSession.close();

        createConnection.close();

    }

}

如上所示的配置,加上測試程式碼,就可以實現訊息的互相通訊與訊息的迴流了,這就形成了一個訊息佇列的偽叢集了。

三 ActiveMQ資料持久化

3.1 ActiveMQ資料持久化方式

ActiveMQ提供了一個外掛式的訊息儲存,類似於訊息的多點傳播,主要實現瞭如下幾種: 1:AMQ訊息儲存-基於檔案的儲存方式,是以前的預設訊息儲存

2:KahaDB訊息儲存-提供了容量的提升和恢復能力,是現在的預設儲存方式

3:JDBC訊息儲存-訊息基於JDBC儲存的

4:Memory 訊息儲存-基於記憶體的訊息儲存

3.2 AMQ持久化

AMQ Message Store概述 AMQ Message Store是ActiveMQ5.0預設的持久化儲存,它是一個基於檔案、事務儲存設計為快速訊息儲存的一個結構,該結構是以流的形式來進行訊息互動的。 這種方式中,Messages被儲存到data logs中,同時被reference store進行索 引以提高存取速度。Date logs由一些單獨的data log檔案組成,預設的檔案大小是 32M,如果某個訊息的大小超過了data log檔案的大小,那麼可以修改配置以增加 data log檔案的大小。如果某個data log檔案中所有的訊息都被成功消費了,那麼這個data log檔案將會被標記,以便在下一輪的清理中被刪除或者歸檔。

AMQ Message Store配置示例

<broker brokerName="broker" persistent="true" useShutdownHook="false">

<persistenceAdapter>

<amqPersistenceAdapterdirectory="${activemq.base}/data"maxFileLength="32mb"/> </persistenceAdapter>

 </broker>

3.3 KahaDB 持久化

         KahaDB Message Store概述 KahaDB是目前預設的儲存方式,可用於任何場景,提高了效能和恢復能力。訊息儲存使用一個事務日誌和僅僅用一個索引檔案來儲存它所有的地址。 KahaDB是一個專門針對訊息持久化的解決方案,它對典型的訊息使用模式進行了優化。在Kaha 中,資料被追加到data logs中。當不再需要log檔案中的資料的時候,log檔案會被丟棄。

KahaDB基本配置例子

 <persistenceAdapter>

<kahaDB directory="${activemq.data}/kahadb"/>

</persistenceAdapter>

可用的屬性有:

1:director:KahaDB存放的路徑,預設值activemq-data

2:indexWriteBatchSize: 批量寫入磁碟的索引page數量,預設值1000

3:indexCacheSize:記憶體中快取索引page的數量,預設值10000

4:enableIndexWriteAsync:是否非同步寫出索引,預設false

5:journalMaxFileLength:設定每個訊息data log的大小,預設是32MB

6:enableJournalDiskSyncs:設定是否保證每個沒有事務的內容,被同步寫入磁碟,JMS持久化的時候需 要,預設為true

7:cleanupInterval:在檢查到不再使用的訊息後,在具體刪除訊息前的時間,預設30000

8:checkpointInterval:checkpoint的間隔時間,預設5000

9:ignoreMissingJournalfiles:是否忽略丟失的訊息日誌檔案,預設false

10:checkForCorruptJournalFiles:在啟動的時候,將會驗證訊息檔案是否損壞,預設false

11:checksumJournalFiles:是否為每個訊息日誌檔案提供checksum,預設false

12:archiveDataLogs:是否移動檔案到特定的路徑,而不是刪除它們,預設false

13:directoryArchive:定義訊息已經被消費過後,移動data log到的路徑,預設null

14:databaseLockedWaitDelay:獲得資料庫鎖的等待時間 (used by shared master/slave),預設 10000

15:maxAsyncJobs:設定最大的可以儲存的非同步訊息佇列,預設值10000,可以和concurrent MessageProducers 設定成一樣的值

16:concurrentStoreAndDispatchTransactions:是否分發訊息到客戶端,同時事務儲存訊息,預設 true

17:concurrentStoreAndDispatchTopics:是否分發Topic訊息到客戶端,同時進行儲存,預設true

18:concurrentStoreAndDispatchQueues:是否分發queue訊息到客戶端,同時進行儲存,預設true

3.4 JDBC持久化

3.4.1 建表

ACTIVEMQ_MSGS 訊息表

ACTIVEMQ_ACKS 確認表

ACTIVEMQ_LOCK 鎖表

3.4.2 配置

<beans>

<broker

brokerName="test-broker" persistent=true xmlns="http://activemq.apache.org/schema/core"> 
    <persistenceAdapter>

         <jdbcPersistenceAdapterdataSource=“#mysql-ds"/>

    </persistenceAdapter>

</broker>

   <bean name="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">

    <property name="driverClassName">

        <value>org.gjt.mm.mysql.Driver</value>

    </property>

    <property name="url">

        <value>jdbc:mysql://192.168.1.100:3306/test?useUnicode=true&amp;characterEncodi         ng=UTF-8</value>

    </property>

    <property name="username">
    
        <value>root</value>

</property>

<property name="password" value="cc"/>

</bean>

3.5 JDBC Message Store with ActiveMQ Journal

這種方式克服了JDBC Store的不足,使用快速的快取寫入技術,大大提高了效能。 配置示例如下

3.5.1 配置

<beans>

<broker brokerName="test-broker" xmlns="http://activemq.apache.org/schema/core">

 <persistenceFactory>

 <journalPersistenceAdapterFactory journalLogFiles="4" journalLogFileSize="32768" useJournal="true" useQuickJournal="true" dataSource="#derby-ds" dataDirectory="activemq-data" />

</persistenceFactory>

 </broker>

 </beans> 

3.5.2 JDBC Store和JDBCMessage Store with ActiveMQJournal的區別

1:Jdbcwith journal的效能優於jdbc

2:Jdbc用於master/slave模式的資料庫分享

3:Jdbcwith journal不能用於master/slave模式

4:一般情況下,推薦使用jdbcwith journal

3.6 Memory持久化

記憶體訊息儲存主要是儲存所有的持久化的訊息在記憶體中。這裡沒有動態的快取存在,所以 你必須注意設定你的broker所在的JVM和記憶體限制    

3.5.1 配置

<beans>

<broker brokerName="test-broker" persistent="false" xmlns="http://activemq.apache.org/schema/core">

 <transportConnectors>

<transportConnectoruri="tcp://localhost:61635"/>

</transportConnectors>

 </broker>

</beans>

3.5.2 內嵌使用Broker

public void createEmbeddedBroker() throws Exception {

BrokerServicebroker = new BrokerService();

broker.setPersistent(false);

broker.addConnector("tcp://localhost:61616");

broker.start();

 }