1. 程式人生 > >ActiveMQ訊息佇列主從配置

ActiveMQ訊息佇列主從配置

介紹

目前ActiveMQ提供了三種主從配置方案,分別是共享檔案系統(Shared File System Master Slave)、資料庫(JDBC Master Slave)和LevelDB(Replicated LevelDB Store)。需要注意的是LevelDB儲存已被棄用,官方不再支援或建議使用,推薦使用KahaDB來代替。
第一種方式需要直接共享檔案系統,在實際操作上比較少見,而第三種方案已經被棄用,所以下面主要對基於JDBC的主從配置方式展開介紹。
關於第二個方案的效能官方有一段描述:共享資料庫不會太快,因為它不能使用高效能日誌,但即便如此這也是目前最可靠、可行的一種方案。

方案

下面我具體介紹一下詳細的方案

介紹

  1. 在mq程序啟動時一個主(某一臺mq程序)抓取資料庫中表的獨佔鎖,另一臺mq程序會由於爭搶不到表鎖而阻塞,這樣其它的程序都變成了從。(只允許有一個主,可以有多個從)
    這裡寫圖片描述
    從上面的示意圖上可以看到,三個Broker啟動了但只有一個主,其它兩個都是從。此時客戶端1、2只能連線主。
  2. 如果主的服務斷開或者服務中斷(比如程序中斷、機器宕機等),它(原來的主)所佔用的資料庫表鎖就會被釋放,此時其他從開始競爭鎖,誰先搶到誰就是新的主。
    這裡寫圖片描述
    上面的示意圖很好的說明了當主服務中斷時,其它從服務競爭主的情況。
  3. 如果原來的主重新啟動,但是已經有了新主,此時會怎麼辦呢?
    這裡寫圖片描述

    從上面的示意圖不難看出,原來的主加回來之後變成了從,原因很簡單,因為它爭取不到鎖(鎖給新主了)。

客戶端連線

通過上面的主從配置部署之後,儲存服務只有一個是可用的,也就是說當主服務工作時,其它從服務都不會工作,如果你連線從服務會返回連線失敗的錯誤。那麼問題來了,客戶端不確定哪個服務可用,哪個不可用,在連線時要“寫死”連線嗎?

目前有三種方案可以供大家來解決這個問題:
第一種:在客戶端連線mq服務的地址上來解決,我們可以使用含故障轉移的連線地址,例如:

failover:(tcp://192.168.1.100:61616,tcp://192.168.1.101:61616,tcp://192.168.1.102:61616)

第二種:在專案程式碼上實現故障轉移功能,用配置的方式維護一個服務列表,每次連線時從列表上逐一嘗試,當前連線超時後就順序嘗試下一個,如果某個服務成功就儲存下來,防止每次都走這個重試流程。
第三種:使用負載均衡軟體(例如Keeplived、Nginx、LVS和HAProxy等),這樣程式連線的地址是唯一的,故障轉移由上述軟體來完成即可。

安裝

下面講述在CentOS上安裝、部署的方法

  • 從官網下載二進位制包
cd /usr/local/src
wget http://mirrors.cnnic.cn/apache//activemq/5.14.3/apache-activemq-5.14.3-bin.tar.gz
  • 解壓到任意目錄
tar -zxvf apache-activemq-5.14.3-bin.tar.gz
  • 啟動mq
cd apache-activemq-5.14.3
./bin/activemq start

此時安裝就完成了,僅需要解壓出來放到某個位置,沒有實際的安裝過程。(比如編譯、安裝等操作)

將這個過程在另外兩臺機器上操作一次,如果你是在一臺機器上部署,那就把解壓出來的檔案再複製出來兩個即可。(記得要修改配置檔案中埠)

你可以在瀏覽器上訪問mq的服務來判斷是否都安裝成功,訪問地址 http://192.168.1.103:8161 你應該可以看到和我相同的頁面。
這裡寫圖片描述

配置

  • 修改web控制檯憑證,重啟服務後可以使用新的密碼來登入。
vim conf/jetty-realm.properties
#將裡面的內容清空,重新新增一行
admin: Admin!123, admin

vim conf/jetty.xml
#修改第30行變成如下
<property name="roles" value="admin" />

修改服務的憑證

關閉不需要的協議

vim conf/activemq.xml
#定位到120行找到 '''<transportConnectors>''' 節點註釋不需要開放的協議,僅保留下面兩種協議即可。
<transportConnectors>
    <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
</transportConnectors>

為了保證安全,上面這兩個埠可以配置為僅內網訪問。(具體以實際專案情況來決定)

配置主從佇列

  • 修改配置檔案
vim conf/activemq.xml

#在第40行修改 <borker> 節點的 brokerName 屬性,新加入 persistent 屬性。
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="127.0.0.1" persistent="true" dataDirectory="${activemq.data}">

#在第81行 <persistenceAdapter> 節點下面配置如下程式碼
<persistenceAdapter>
    <jdbcPersistenceAdapter dataDirectory="${activemq.base}/activemq-data" dataSource="#mysql-ds"/>
</persistenceAdapter>

#在第125行 <broker> 節點外面新增如下內容
<!-- MySql DataSource Master Slave -->
<bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
    <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
    <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
    <property name="username" value="activemq"/>
    <property name="password" value="activemq"/>
    <property name="poolPreparedStatements" value="true"/>
</bean>
  • 安裝java yum -y install java-1.8.0-openjdk

  • 下載 DBCP 連線池包放到 lib 裡面

  • 下載 java-mysql-jdbc 驅動包放到 [activemq_install_dir]/lib 裡面

  • 建立資料庫結構(表結構通常會在mq程序啟動時自動建立,如果沒有自動建立那你可以手動執行)

CREATE DATABASE `activemq`;

USE `activemq`;

DROP TABLE IF EXISTS `activemq_acks`;

CREATE TABLE `activemq_acks` (
  `LAST_ACKED_ID` int(11) DEFAULT NULL,
  `CONTAINER` varchar(250) DEFAULT NULL,
  `PRIORITY` bigint(20) NOT NULL DEFAULT '5',
  `XID` varchar(250) DEFAULT NULL,
  `CLIENT_ID` varchar(250) DEFAULT NULL,
  `SUB_DEST` varchar(250) DEFAULT NULL,
  `SUB_NAME` varchar(250) DEFAULT NULL,
  `SELECTOR` varchar(250) DEFAULT NULL,
  KEY `ACTIVEMQ_ACKS_XIDX` (`XID`)
) ENGINE=MyISAM DEFAULT CHARSET=utf8;

DROP TABLE IF EXISTS `activemq_lock`;

CREATE TABLE `activemq_lock` (
  `ID` bigint(20) NOT NULL,
  `TIME` bigint(20) DEFAULT NULL,
  `BROKER_NAME` varchar(250) DEFAULT NULL,
  PRIMARY KEY (`ID`)
) ENGINE=MyISAM DEFAULT CHARSET=utf8;

DROP TABLE IF EXISTS `activemq_msgs`;

CREATE TABLE `activemq_msgs` (
  `ID` bigint(20) NOT NULL,
  `CONTAINER` varchar(250) DEFAULT NULL,
  `MSGID_PROD` varchar(250) DEFAULT NULL,
  `MSGID_SEQ` bigint(20) DEFAULT NULL,
  `EXPIRATION` bigint(20) DEFAULT NULL,
  `MSG` longblob,
  `PRIORITY` bigint(20) DEFAULT NULL,
  `XID` varchar(250) DEFAULT NULL,
  PRIMARY KEY (`ID`),
  KEY `ACTIVEMQ_MSGS_MIDX` (`MSGID_PROD`,`MSGID_SEQ`),
  KEY `ACTIVEMQ_MSGS_CIDX` (`CONTAINER`),
  KEY `ACTIVEMQ_MSGS_EIDX` (`EXPIRATION`),
  KEY `ACTIVEMQ_MSGS_PIDX` (`PRIORITY`),
  KEY `ACTIVEMQ_MSGS_XIDX` (`XID`)
) ENGINE=MyISAM DEFAULT CHARSET=utf8;
  • 至此本教程就全部結束了