1. 程式人生 > >ActiveMQ的幾種訊息持久化機制

ActiveMQ的幾種訊息持久化機制

為了避免意外宕機以後丟失資訊,需要做到重啟後可以恢復訊息佇列,訊息系統一般都會採用持久化機制。

ActiveMQ的訊息持久化機制有JDBC,AMQ,KahaDB和LevelDB四種方式,無論使用哪種持久化方式,訊息的儲存邏輯都是一致的。

就是在傳送者將訊息傳送出去後,訊息中心首先將訊息儲存到本地資料檔案、記憶體資料庫或者遠端資料庫等,然後試圖將訊息傳送給接收者,傳送成功則將訊息從儲存中刪除,失敗則繼續嘗試。

訊息中心啟動以後首先要檢查指定的儲存位置,如果有未傳送成功的訊息,則需要把訊息傳送出去。

1. JDBC持久化方式

使用JDBC持久化方式,資料庫會建立3個表:activemq_msgs,activemq_acks和activemq_lock。 activemq_msgs用於儲存訊息,Queue和Topic都儲存在這個表中。

(1)配置方式

配置持久化的方式,都是修改安裝目錄下conf/acticvemq.xml檔案,

首先定義一個mysql-ds的MySQL資料來源,然後在persistenceAdapter節點中配置jdbcPersistenceAdapter並且引用剛才定義的資料來源。

<persistenceAdapter> 
    <jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="false" /> 
</persistenceAdapter>

dataSource指定持久化資料庫的bean,createTablesOnStartup是否在啟動的時候建立資料表,預設值是true,這樣每次啟動都會去建立資料表了,一般是第一次啟動的時候設定為true,之後改成false。 使用MySQL配置JDBC持久化:

<beans>
    <broker brokerName="test-broker" persistent="true" xmlns="http://activemq.apache.org/schema/core">
        <persistenceAdapter>
            <jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="false"/>
        </persistenceAdapter>
    </broker>
    <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
        <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
        <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
        <property name="username" value="activemq"/>
        <property name="password" value="activemq"/>
        <property name="maxActive" value="200"/>
        <property name="poolPreparedStatements" value="true"/>
    </bean>
</beans>

(2)資料庫表資訊 activemq_msgs用於儲存訊息,Queue和Topic都儲存在這個表中: ID:自增的資料庫主鍵 CONTAINER:訊息的Destination MSGID_PROD:訊息傳送者客戶端的主鍵 MSG_SEQ:是傳送訊息的順序,MSGID_PROD+MSG_SEQ可以組成JMS的MessageID EXPIRATION:訊息的過期時間,儲存的是從1970-01-01到現在的毫秒數 MSG:訊息本體的Java序列化物件的二進位制資料 PRIORITY:優先順序,從0-9,數值越大優先順序越高

activemq_acks用於儲存訂閱關係。如果是持久化Topic,訂閱者和伺服器的訂閱關係在這個表儲存: 主要的資料庫欄位如下: CONTAINER:訊息的Destination SUB_DEST:如果是使用Static叢集,這個欄位會有叢集其他系統的資訊 CLIENT_ID:每個訂閱者都必須有一個唯一的客戶端ID用以區分 SUB_NAME:訂閱者名稱 SELECTOR:選擇器,可以選擇只消費滿足條件的訊息。條件可以用自定義屬性實現,可支援多屬性AND和OR操作 LAST_ACKED_ID:記錄消費過的訊息的ID。

表activemq_lock在叢集環境中才有用,只有一個Broker可以獲得訊息,稱為Master Broker, 其他的只能作為備份等待Master Broker不可用,才可能成為下一個Master Broker。 這個表用於記錄哪個Broker是當前的Master Broker。

2. AMQ方式

效能高於JDBC,寫入訊息時,會將訊息寫入日誌檔案,由於是順序追加寫,效能很高。為了提升效能,建立訊息主鍵索引,並且提供快取機制,進一步提升效能。每個日誌檔案的大小都是有限制的(預設32m,可自行配置)。 當超過這個大小,系統會重新建立一個檔案。當所有的訊息都消費完成,系統會刪除這個檔案或者歸檔(取決於配置)。 主要的缺點是AMQ Message會為每一個Destination建立一個索引,如果使用了大量的Queue,索引檔案的大小會佔用很多磁碟空間。 而且由於索引巨大,一旦Broker崩潰,重建索引的速度會非常慢。

配置片段如下:

<persistenceAdapter>
     <amqPersistenceAdapter directory="${activemq.data}/activemq-data" maxFileLength="32mb"/>
</persistenceAdapter>

雖然AMQ效能略高於下面的Kaha DB方式,但是由於其重建索引時間過長,而且索引檔案佔用磁碟空間過大,所以已經不推薦使用。

3. KahaDB方式

KahaDB是從ActiveMQ 5.4開始預設的持久化外掛,也是我們專案現在使用的持久化方式。

KahaDb恢復時間遠遠小於其前身AMQ並且使用更少的資料檔案,所以可以完全代替AMQ。 kahaDB的持久化機制同樣是基於日誌檔案,索引和快取。

配置方式:

<persistenceAdapter>
    <kahaDB directory="${activemq.data}/activemq-data" journalMaxFileLength="16mb"/>
</persistenceAdapter>

directory : 指定持久化訊息的儲存目錄 journalMaxFileLength : 指定儲存訊息的日誌檔案大小,具體根據你的實際應用配置   (1)KahaDB主要特性 1、日誌形式儲存訊息; 2、訊息索引以B-Tree結構儲存,可以快速更新; 3、完全支援JMS事務; 4、支援多種恢復機制;

(2)KahaDB的結構

訊息儲存在基於檔案的資料日誌中。如果訊息傳送成功,變標記為可刪除的。系統會週期性的清除或者歸檔日誌檔案。 訊息檔案的位置索引儲存在記憶體中,這樣能快速定位到。定期將記憶體中的訊息索引儲存到metadata store中,避免大量訊息未傳送時,訊息索引佔用過多記憶體空間。

Data logs: Data logs用於儲存訊息日誌,訊息的全部內容都在Data logs中。 同AMQ一樣,一個Data logs檔案大小超過規定的最大值,會新建一個檔案。同樣是檔案尾部追加,寫入效能很快。 每個訊息在Data logs中有計數引用,所以當一個檔案裡所有的訊息都不需要了,系統會自動刪除檔案或放入歸檔資料夾。

Metadata cache : 快取用於存放線上消費者的訊息。如果消費者已經快速的消費完成,那麼這些訊息就不需要再寫入磁碟了。 Btree索引會根據MessageID建立索引,用於快速的查詢訊息。這個索引同樣維護持久化訂閱者與Destination的關係,以及每個消費者消費訊息的指標。

Metadata store 在db.data檔案中儲存訊息日誌中訊息的元資料,也是以B-Tree結構儲存的,定時從Metadata cache更新資料。Metadata store中也會備份一些在訊息日誌中存在的資訊,這樣可以讓Broker例項快速啟動。 即便metadata store檔案被破壞或者誤刪除了。broker可以讀取Data logs恢復過來,只是速度會相對較慢些。

4.LevelDB方式

從ActiveMQ 5.6版本之後,又推出了LevelDB的持久化引擎。 目前預設的持久化方式仍然是KahaDB,不過LevelDB持久化效能高於KahaDB,可能是以後的趨勢。 在ActiveMQ 5.9版本提供了基於LevelDB和Zookeeper的資料複製方式,用於Master-slave方式的首選資料複製方案。