1. 程式人生 > >activemq實戰之訊息持久化

activemq實戰之訊息持久化

對於activemq訊息的持久化我們在第二節的時候就簡單介紹過,今天我們詳細的來分析一下activemq的持久化過程以及持久化外掛。在生產環境中為確保訊息的可靠性,我們肯定的面臨持久化訊息的問題,今天就一起來攻克他吧。

1. 持久化方式介紹

前面我們也簡單提到了activemq提供的外掛式的訊息儲存,在這裡再提一下,主要有以下幾種方式:

  1. AMQ訊息儲存-基於檔案的儲存方式,是activemq開始的版本預設的訊息儲存方式;
  2. KahaDB訊息儲存-提供了容量的提升和恢復能力,是現在的預設儲存方式;
  3. JDBC訊息儲存-訊息基於JDBC儲存的;
  4. Memory訊息儲存-基於記憶體的訊息儲存,由於記憶體不屬於持久化範疇,而且如果使用記憶體佇列,可以考慮使用更合適的產品,如ZeroMQ。所以記憶體儲存不在討論範圍內。

上面幾種訊息儲存方式對於訊息儲存的邏輯來說並沒有什麼區別,只是在效能以及儲存方式上來說有所不同。但是對於訊息傳送的方式來說,p2p和Pub/Sub兩種型別的訊息他們的持久化方式卻是不同的:

對於點對點的訊息一旦消費者完成消費這條訊息將從broker上刪除;對於釋出訂閱型別的訊息,即使所有的訂閱者都完成了消費,Broker也不一定會馬上刪除無用訊息,而是保留推送歷史,之後會非同步清除無用訊息。而每個訂閱者消費到了哪條訊息的offset會記錄在Broker,以免下次重複消費。因為訊息是順序消費,先進先出,所以只需要記錄上次訊息消費到哪裡就可以了。

因為AMQ現在已經被不再使用被KahaDB所替代,所以我們就講KahaDB,JDBC訊息儲存在許多對可靠性要求高而對效能要求低一些的大公司還是經常使用的,下面我們就這兩種持久化方式的使用做一節專題。

2. Kahadb

說到Kahadb之前我們還是得提到他的前身AMQ,AMQ是一種檔案儲存形式,他具有寫入速度快和容易恢復的特點,訊息儲存在一個個的檔案裡,檔案預設大小為32M,超過這個大小的訊息將會存入下一個檔案。當一個檔案中的訊息已經全部消費,那麼這個檔案將被標誌我可刪除,在下一個清除階段這個檔案將被刪除。

如果需要使用持久化,則需要在前文中的配置檔案applicationContext-ActiveMQ.xml中增加如下配置:


<persistenceAdapter>  
   <kahaDB directory="activemq-data"journalMaxFileLength="32mb"/>  
</persistenceAdapter>  
    
  • 1
  • 2
  • 3

KahaDB的屬性件下表格:

屬性名稱 屬性值 描述
directory activemq-data 訊息檔案和日誌的儲存目錄
indexWriteBatchSize 1000 一批索引的大小,當要更新的索引量到達這個值時,更新到訊息檔案中
indexCacheSize 1000 記憶體中,索引的頁大小
enableIndexWriteAsync false 索引是否非同步寫到訊息檔案中
journalMaxFileLength 32mb 一個訊息檔案的大小
enableJournalDiskSyncs true 是否講非事務的訊息同步寫入到磁碟
cleanupInterval 30000 清除操作週期,單位ms
checkpointInterval 5000 索引寫入到訊息檔案的週期,單位ms
ignoreMissingJournalfiles false 忽略丟失的訊息檔案,false,當丟失了訊息檔案,啟動異常
checkForCorruptJournalFiles false 檢查訊息檔案是否損壞,true,檢查發現損壞會嘗試修復
checksumJournalFiles false 產生一個checksum,以便能夠檢測journal檔案是否損壞。
5.4版本之後有效的屬性:
archiveDataLogs false 當為true時,歸檔的訊息檔案被移到directoryArchive,而不是直接刪除
directoryArchive null 儲存被歸檔的訊息檔案目錄
databaseLockedWaitDelay 10000 在使用負載時,等待獲得檔案鎖的延遲時間,單位ms
maxAsyncJobs 10000 同個生產者產生等待寫入的非同步訊息最大量
concurrentStoreAndDispatchTopics false 當寫入訊息的時候,是否轉發主題訊息
concurrentStoreAndDispatchQueues true 當寫入訊息的時候,是否轉發佇列訊息
5.6版本之後有效的屬性:
archiveCorruptedIndex false 是否歸檔錯誤的索引

由於在ActiveMQ V5.4+的版本中,KahaDB是預設的持久化儲存方案。所以即使你不配置任何的KahaDB引數資訊,ActiveMQ也會啟動KahaDB。這種情況下,KahaDB檔案所在位置是你的ActiveMQ安裝路徑下的/data/broker.Name/KahaDB&#x5B50;&#x76EE;&#x5F55;&#x3002;&#x5176;&#x4E2D;” role=”presentation” style=”position: relative;”>broker.Name/KahaDBbroker.Name/KahaDB子目錄。其中{broker.Name}代表這個ActiveMQ服務節點的名稱。下面我把剛啟動服務併發送了訊息之後的activemq安裝目錄開啟給大家看看:

這裡寫圖片描述

正式的生產環境還是建議在主配置檔案中明確設定KahaDB的工作引數:

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="broker" persistent="true" useShutdownHook="false">
  ...
  <persistenceAdapter>
    <kahaDB directory="activemq-data"
            journalMaxFileLength="32mb"
            concurrentStoreAndDispatchQueues="false"
            concurrentStoreAndDispatchTopics="false"
            />
  </persistenceAdapter>
</broker>
    
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

3. 關係型資料庫儲存方案

從ActiveMQ 4+版本開始,ActiveMQ就支援使用關係型資料庫進行持久化儲存——通過JDBC實現的資料庫連線。可以使用的關係型資料庫囊括了目前市面的主流資料庫。

使用JDBC的方式持久化我們就得修改之前的配置檔案:

將其中的這段配置:

<persistenceAdapter>
<kahaDB directory="${activemq.base}/data/kahadb"/>
</persistenceAdapter>
    
  • 1
  • 2
  • 3

修改為下面這段內容:

<persistenceAdapter>
       <jdbcPersistenceAdapter  dataSource="# mysql-ds "/>
</persistenceAdapter>
    
  • 1
  • 2
  • 3

在結點之後,增加資料來源的配置,如下:

<!-- MySql DataSource Sample Setup -->  
<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">  
      <property name="driverClassName" value="com.mysql.jdbc.Driver"/>  
      <property name="url" value="jdbc:mysql://localhost:3306/activemqdb?relaxAutoCommit=true&amp;useUnicode=true&amp;characterEncoding=utf-8"/>  
      <property name="username" value="root"/>  
      <property name="password" value="root"/>  
      <property name="poolPreparedStatements" value="true"/>  
</bean>  

<!-- Oracle DataSource Sample Setup -->        
<bean id="oracle-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">  
      <property name="driverClassName" value="oracle.jdbc.driver.OracleDriver"/>  
      <property name="url" value="jdbc:oracle:thin:@localhost:1521:activemqdb"/>  
      <property name="username" value="root"/>  
      <property name="password" value="root"/>  
      <property name="poolPreparedStatements" value="true"/>  
</bean> 

<!-- Oracle DataSource Sample Setup --> 
<bean id="db2-ds" class="org.apache.commons.dbcp.BasicDataSource"  destroy-method="close">  
      <property name="driverClassName" value="com.ibm.db2.jcc.DB2Driver"/>  
      <property name="url" value="jdbc:db2://hndb02.bf.ctc.com:50002/activemq"/>  
      <property name="username" value="root"/>  
      <property name="password" value="root"/>  
      <property name="maxActive" value="200"/>  
      <property name="poolPreparedStatements" value="true"/>  
</bean>  
    
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27

還是在上一篇的例項工程中,我們改變一下applicationContext-ActiveMQ.xml的配置如下:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:amq="http://activemq.apache.org/schema/core"
       xmlns:context="http://www.springframework.org/schema/context"
       xmlns:mvc="http://www.springframework.org/schema/mvc"
       xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.1.xsd
http://www.springframework.org/schema/mvc
http://www.springframework.org/schema/mvc/spring-mvc-4.1.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core-5.12.1.xsd">

    <context:component-scan base-package="cn.edu.hust.activemq" />
    <mvc:annotation-driven />

    <amq:connectionFactory id="amqConnectionFactory"
                           brokerURL="tcp://127.0.0.1:61616"
                           userName="admin"
                           password="admin" />

    <!-- 配置JMS連線工廠 -->
    <bean id="connectionFactory"
          class="org.springframework.jms.connection.CachingConnectionFactory">
        <constructor-arg ref="amqConnectionFactory" />
        <property name="sessionCacheSize" value="100" />
    </bean>

    <!-- 定義訊息佇列(Queue) -->
    <bean id="demoQueueDestination" class="org.apache.activemq.command.ActiveMQQueue">
        <!-- 設定訊息佇列的名字 -->
        <constructor-arg>
            <value>first-queue</value>
        </constructor-arg>
    </bean>

    <!-- 配置JMS模板(Queue),Spring提供的JMS工具類,它傳送、接收訊息。 -->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="defaultDestination" ref="demoQueueDestination" />
        <property name="receiveTimeout" value="10000" />
        <!-- true是topic,false是queue,預設是false,此處顯示寫出false -->
        <property name="pubSubDomain" value="false" />
    </bean>

    <!-- 配置訊息佇列監聽者(Queue) -->
    <bean id="queueMessageListener" class="cn.edu.hust.activemq.filter.QueueMessageListener" />

    <!-- 顯示注入訊息監聽容器(Queue),配置連線工廠,監聽的目標是demoQueueDestination,監聽器是上面定義的監聽器 -->
    <bean id="queueListenerContainer"
          class="org.springframework.jms.listener.DefaultMessageListenerContainer">
        <property name="connectionFactory" ref="connectionFactory" />
        <property name="destination" ref="demoQueueDestination" />
        <property name="messageListener" ref="queueMessageListener" />
    </bean>


    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" persistent="true">
        <!--<persistenceAdapter>
             <kahaDB directory="${activemq.data}/kahadb"/>
         </persistenceAdapter> -->
        <persistenceAdapter>
            <jdbcPersistenceAdapter dataDirectory="${activemq.data}" dataSource="#mysql-ds">
            </jdbcPersistenceAdapter>
        </persistenceAdapter>
    </broker>
    <bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
        <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
        <property name="url" value="jdbc:mysql://127.0.0.1/activemq?relaxAutoCommit=true"/>
        <property name="username" value="root"/>
        <property name="password" value="123456"/>
        <property name="maxActive" value="200"/>
        <property name="poolPreparedStatements" value="true"/>
    </bean>
</beans>
    
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78

此時,重新啟動MQ,就會發現db資料庫中多了三張表:activemq_acks,activemq_lock,activemq_msgs,OK,說明activemq已經持久化成功啦!

這裡寫圖片描述

  1. activemq_acks:用於儲存訂閱關係。如果是持久化Topic,訂閱者和伺服器的訂閱關係在這個表儲存,主要資料庫欄位如下:

    • container:訊息的destination
    • sub_dest:如果是使用static叢集,這個欄位會有叢集其他系統的資訊
    • client_id:每個訂閱者都必須有一個唯一的客戶端id用以區分
    • sub_name:訂閱者名稱
    • selector:選擇器,可以選擇只消費滿足條件的訊息。條件可以用自定義屬性實現,可支援多屬性and和or操作
    • last_acked_id:記錄消費過的訊息的id
  2. activemq_lock:在叢集環境中才有用,只有一個Broker可以獲得訊息,稱為Master Broker,其他的只能作為備份等待Master Broker不可用,才可能成為下一個Master Broker。這個表用於記錄哪個Broker是當前的Master Broker。

  3. activemq_msgs:用於儲存訊息,Queue和Topic都儲存在這個表中。主要的資料庫欄位如下:

    • id:自增的資料庫主鍵
    • container:訊息的destination
    • msgid_prod:訊息傳送者客戶端的主鍵
    • msg_seq:是傳送訊息的順序,msgid_prod+msg_seq可以組成jms的messageid
    • expiration:訊息的過期時間,儲存的是從1970-01-01到現在的毫秒數
    • msg:訊息本體的java序列化物件的二進位制資料
    • priority:優先順序,從0-9,數值越大優先順序越高
    • activemq_acks用於儲存訂閱關係。如果是持久化topic,訂閱者和伺服器的訂閱關係在這個表儲存。
        <link rel="stylesheet" href="https://csdnimg.cn/release/phoenix/template/css/markdown_views-ea0013b516.css">
            </div>

對於activemq訊息的持久化我們在第二節的時候就簡單介紹過,今天我們詳細的來分析一下activemq的持久化過程以及持久化外掛。在生產環境中為確保訊息的可靠性,我們肯定的面臨持久化訊息的問題,今天就一起來攻克他吧。

1. 持久化方式介紹

前面我們也簡單提到了activemq提供的外掛式的訊息儲存,在這裡再提一下,主要有以下幾種方式:

  1. AMQ訊息儲存-基於檔案的儲存方式,是activemq開始的版本預設的訊息儲存方式;
  2. KahaDB訊息儲存-提供了容量的提升和恢復能力,是現在的預設儲存方式;
  3. JDBC訊息儲存-訊息基於JDBC儲存的;
  4. Memory訊息儲存-基於記憶體的訊息儲存,由於記憶體不屬於持久化範疇,而且如果使用記憶體佇列,可以考慮使用更合適的產品,如ZeroMQ。所以記憶體儲存不在討論範圍內。

上面幾種訊息儲存方式對於訊息儲存的邏輯來說並沒有什麼區別,只是在效能以及儲存方式上來說有所不同。但是對於訊息傳送的方式來說,p2p和Pub/Sub兩種型別的訊息他們的持久化方式卻是不同的:

對於點對點的訊息一旦消費者完成消費這條訊息將從broker上刪除;對於釋出訂閱型別的訊息,即使所有的訂閱者都完成了消費,Broker也不一定會馬上刪除無用訊息,而是保留推送歷史,之後會非同步清除無用訊息。而每個訂閱者消費到了哪條訊息的offset會記錄在Broker,以免下次重複消費。因為訊息是順序消費,先進先出,所以只需要記錄上次訊息消費到哪裡就可以了。

因為AMQ現在已經被不再使用被KahaDB所替代,所以我們就講KahaDB,JDBC訊息儲存在許多對可靠性要求高而對效能要求低一些的大公司還是經常使用的,下面我們就這兩種持久化方式的使用做一節專題。

2. Kahadb

說到Kahadb之前我們還是得提到他的前身AMQ,AMQ是一種檔案儲存形式,他具有寫入速度快和容易恢復的特點,訊息儲存在一個個的檔案裡,檔案預設大小為32M,超過這個大小的訊息將會存入下一個檔案。當一個檔案中