1. 程式人生 > >ActiveMQ學習筆記(8)----ActiveMQ的訊息儲存持久化

ActiveMQ學習筆記(8)----ActiveMQ的訊息儲存持久化

1. 概述

  ActiveMQ不僅支援persistent和non-persistent兩種方式,還支援訊息的恢復(recovery)方式。

2. PTP

  Queue的儲存是很簡單的,其實就是FIFO的Queue

    

2. PUB/SUB

  對於持久化訂閱主題,每一個消費者都將獲得一個訊息的複製。

    

3. 有效的訊息儲存

  ActiveMQ 提供了一個外掛式的訊息儲存,類似於訊息的多點傳播,主要實現瞭如下幾種:

  1. AMQ訊息儲存-基於檔案的儲存方式,是以前預設的訊息儲存。

  2. KahaDB訊息儲存-提供了容量的提升和恢復能力,是現在的預設儲存方式。

  3. JDBC 訊息儲存-訊息基於JDBC儲存的

  4. Memory 訊息儲存-基於記憶體的訊息儲存

4. KahaDB Message Store概述

  KahaDB是目前預設的儲存方式,可用於任何場景,提高了效能和恢復能力。訊息儲存使用一個事務日誌和僅僅用一個索引檔案來儲存它所有的地址。

  KahaDB是一個專門針對訊息持久化的解決方案,它對典型的訊息使用模式進行了優化。在KahaDB中,資料被追加到data logs中,當不再需要log檔案中的資料的時候,log檔案會被丟棄。

5. KahaDB的基本配置例子

  

<persistenceAdapter
>     <kahaDB directory="${activemq.data}/kahadb"/>   </persistenceAdapter>

 

  可用的屬性有:

    1. directory:KahaDB存放的路徑,預設值是activemq-data

    2. indexWriteBatchSize: 批量寫入磁碟索引page數量,預設值1000

    3. indexCacheSize: 記憶體中快取索引page的數量,預設值10000

    4. enableIndexWriteAsync: 是否支援非同步索引,預設false

    5. journalMaxFileLength: 設定每個訊息data log的大小,預設是23MB

    6. enableJournalDiskSyncs: 設定是否保證每個沒有事務的內容,被同步寫入磁碟,JMS持久化的時候需要,預設為true

    7. cleanupInterval: 在檢查到不再使用的訊息後,在具體刪除訊息前的時間,預設30000

    8. checkpointInterval: checkpoint的間隔時間,預設5000

    9. ignoreMissingJournalfiles: 是否忽略丟失的訊息日誌檔案,預設false

    10. checkForGorruptJournalFiles:在啟動的時候,將會驗證訊息檔案是否損壞,預設false

    11. checksumJournalFiles: 是否為每個訊息日誌檔案提供checksun,預設false

    12. archiveDataLogs: 是否移動檔案到特定的路徑,而不是刪除它們,預設false

    13. directoryArchive: 定義訊息已經被消費過後,移動data log到路徑,預設null

    14. databaseLockedWaitDelay: 獲取資料庫鎖的等待時間(userd by shared master/slave),預設10000

    15 maxAsyncJobs: 設定最大的可以儲存的非同步訊息佇列,預設值10000,可以和concurrent MessageProducers設定成一樣的值。

    16. concurrentStoreAndDispatchTransactions:是否分發訊息到客戶端,同時事務儲存訊息,預設為true.

    17. concurrentStoreAndDispatchTopics:是否分發Topic訊息到客戶端,同時進行儲存,預設為true.

    18. concurrentStoreAndDispatchQueues:是否分發Queue訊息到客戶端,同時進行儲存,預設為true.

6. 在Java中內嵌使用Broker,使用KahaDB的例子。

package com.wangx.activemq;

import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.store.kahadb.KahaDBStore;

import java.io.File;

public class MyBroker {
    public static void main(String[] args) throws Exception {
        BrokerService brokerService = new BrokerService();
        File dataFileDir = new File("target/amq-in/kahadb");
        KahaDBStore kahaDBStore = new KahaDBStore();
        kahaDBStore.setDirectory(dataFileDir);
        //use a bigger journal file
        kahaDBStore.setJournalMaxFileLength(1024*100);
        kahaDBStore.setIndexWriteBatchSize(100);
        kahaDBStore.setEnableIndexWriteAsync(true);
        brokerService.setPersistenceAdapter(kahaDBStore);
        brokerService.addConnector("tcp://localhost:61616");
        brokerService.start();
    }
}

7. AMQ Message Store 概述

  AMQ Message Store是ActiveMQ5.0預設的持久化儲存,它是一個基於檔案,事務儲存設計為快速訊息儲存的一個結構,該結構是以流的形式來進行訊息互動的。

  這種方式中,Message被儲存到data logs中,同時被reference store進行索引以提高存取速度。Data logs 由一些單獨的data log檔案組成,預設的檔案大小是32M,如果某個訊息的大小超過了data log檔案的大小,那麼可以修改配置以增加data log檔案的大小。如果某個data log檔案中的所有訊息都被成功消費了,那麼這個data log檔案將會被標記,以便在下一輪的清理中被刪除或者歸檔。

8. AMQ Message Store 配置示例

 

 <broker brokerName="broker" persistent="true" useShutdownHook="false">

    <persistenceAdapter>
      <amqPersistenceAdapter directory="${activemq.base}/data" maxFileLength="32MB/>

    </persistenceAdapter>
  </broker>

9. 使用JDBC來持久化訊息

  ActiveMQ支援使用JDBC來持久化訊息,預定義的表如下:

  1. 訊息表,預設表名為ACTIVEMQ ESGS queue和topic都存在裡面,結構如下:

    

  2. ACTIVEMQ_ACKS表儲存持久化訂閱的資訊和最後一個持久訂閱接收的訊息ID,結構如下

  

  2. 鎖定表,預設表名為ACTIVE_LOCK,用來確保某一時刻,只能有一個ActiveMQ broker例項來訪問資料庫,結構如下:

  

10. 使用JDBC來持久化訊息的配置示例

 <beans>
        <broker brokerName="test-broker" persistent="true"
                xmlns="http://activemq.apache.org/schema/core">
            <persistenceAdapter>
                <jdbcPersistenceAdapter dataSource="#mysql-ds"/>
            </persistenceAdapter>
        </broker>
        <bean name="msql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
            <property name="driverClassName">
                <value>org.gjt.mm.mysql.Driver</value>
            </property>
            <property name="url">
                <value>jdbc:mysql:127.0.0.1:3306</value>
            </property>
            <property name="username">
                <value>wangx</value>
            </property>
            <property name="password">
                <value>wangx</value>
            </property>
        </bean>
    </beans>

11. JDBC Message Store with ActiveMQ Journal

  這種方式克服了JDBC Store的不足,使用快速的快取寫入技術,大大的提高了效能,配置如下:

 <beans>
        <broker brokerName="test-broker" persistent="true"
                xmlns="http://activemq.apache.org/schema/core">
            <persistenceFactory>
               <journalPersistenceAdapterFactory
                   journalLogFiles="4"
                   journalLogFileSize="32768"
                   useJournal="true"
                   useQuickJournal="true"
                   dataDirectory="activemq-data"/>
            </persistenceFactory>
        </broker>
    </beans>

12 JDBC Store 和JDBC Message Store with ActiveMQ Journal的區別

  1. Jdbc with journal 的效能優於jdbc,

  2. Jdbc 用於master/slave模式的資料庫共享

  3. Jdbc with journal不能用於master/slave模式

  5. 一般情況下,推薦使用Jdbc with journal

13. Memory Message Store

  記憶體訊息儲存主要是儲存所有的持久化訊息在記憶體中,這裡沒有動態的快取存在,所以你必須注意設定你的broker所在的JVM和記憶體限制。

  配置示例如下:

<beans>
        <broker brokerName="test-broker" persistent="true"
                xmlns="http://activemq.apache.org/schema/core">
            <transportConnectors>
                <transportConnector uri="tcp://localhost:61635"/>
            </transportConnectors>
        </broker>
    </beans>

  在java中內嵌使用Broker,使用Memory的例子

package com.wangx.activemq;

import org.apache.activemq.broker.BrokerService;


public class MyBroker {
    public static void main(String[] args) throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.setPersistent(false);
        brokerService.addConnector("tcp://localhost:61616");
        brokerService.start();
    }
}