1. 程式人生 > >ActiveMQ集群搭建

ActiveMQ集群搭建

consumer local ica 生產者 data gin exceptio 超時 image

一、 Activemq主備搭建

  Shared Filesystem Master-Slave方式
  shared filesystem Master-Slave部署方式主要是通過共享存儲目錄來實現master和slave的熱備,所有的ActiveMQ應用都在不斷地獲取共享目錄的控制權,哪個應用搶到了控制權,它就成為master。
  多個共享存儲目錄的應用,誰先啟動,誰就可以最早取得共享目錄的控制權成為master,其他的應用就只能作為slave。

技術分享圖片

技術分享圖片

1 搭建配置步驟搭建 master-slave (一主 一備份)

準備mq的1節點 activemq-1
準備mq的2節點 activemq-2

  特點:
    只能本地不能分布式 和 集群。

針對每一個activemq的節點進行配置:

1.1 配置節點1:
首先創建共享目錄,並創建兩個節點
如圖:

技術分享圖片

  • 配置activemq-1,需要修改持久數據庫位置,修改:activemq-1/conf/activemq.xml

技術分享圖片

  • 配置activemq-1,需要修改activemq-1/conf/activemq.xml

  如圖:修改成61617

技術分享圖片

  • 配置activemq-1 ,需要修改activemq-1/conf/jetty.xml

技術分享圖片

2.2 配置節點2:

  • 配置activemq-2,需要修改 持久目錄文件,修改:activemq-2/conf/activemq.xml

技術分享圖片

  • 配置activemq-2,需要修改activemq-2/conf/activemq.xml

技術分享圖片

  如圖:修改成61618

  • 配置activemq-1 ,需要修改activemq-2/conf/jetty.xml

技術分享圖片

2 測試master-slave
2.1 生產者

 1 public class ProduceQueue {??
 2     @Test?
 3     public void sendMessage() throws Exception{?
 4         //1.創建一個連接工廠 connectionfactory? 參數:就是要連接的服務器的地址?
5 ConnectionFactory factory = new ActiveMQConnectionFactory("failover:(tcp://192.168.25.130:61617,tcp://192.168.25.130:61618)");? 6 //2.通過工廠獲取連接對象 創建連接? 7 Connection connection = factory.createConnection();? 8 //3.開啟連接? 9 connection.start();? 10 //4.創建一個session對象 提供發送消息等方法? 11 // 第一個參數:表示是否開啟分布式事務(JTA) 一般是false 不開啟。? 12 // 第二個參數:就是設置消息的應答模式 13 // 如果 第一個參數為false時,第二個參數設置才有意義。用的是自動應答? 14 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 15 //5.創建目的地 (destination) queue? 參數:目的地的名稱? 16 Queue queue = session.createQueue("queue-test-cluster");? 17 //6.創建個生產者? 18 MessageProducer producer = session.createProducer(queue);? 19 //7.構建消息的內容? 20 TextMessage textMessage = session.createTextMessage("queue測試發送的消息"); 21 // 8.發送消息? 22 producer.send(textMessage);? 23 //9.關閉資源? 24 producer.close();? 25 session.close();? 26 connection.close();? 27 } 28 ?}

2.2 消費者

 1 public class ConsumerQueue {??
 2     @Test?
 3     public void consumer() throws Exception{?
 4         //1.創建連接的工廠?
 5         ConnectionFactory factory = new ActiveMQConnectionFactory("failover:(tcp://192.168.25.130:61617,tcp://192.168.25.130:61618)");?
 6         //2.創建連接?
 7         Connection connection = factory.createConnection();?
 8         //3.開啟連接?
 9         connection.start();?
10         //4.創建session?
11         // 第一個參數:表示是否開啟分布式事務(JTA)  一般是false 不開啟。?
12         // 第二個參數:就是設置消息的應答模式   如果 第一個參數為false時,第二個參數設置才有意義。用的是自動應答?
13         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);?
14         // 5.創建接收消息的一個目的地?
15         Queue queue = session.createQueue("queue-test-cluster");?
16         // 6.創建消費者?
17         MessageConsumer consumer = session.createConsumer(queue);?
18         // 7.接收消息 打印?
19         // 第一種?
20         /*while(true){?
21             Message message = consumer.receive(1000000);//設置接收消息的超時時間?
22             //沒有接收到消息就跳出循環?
23             if(message==null){?
24                 break;?
25             }?
26             if(message instanceof TextMessage){?
27                 TextMessage message2 = (TextMessage) message;?
28                 System.out.println("接收的消息為"+message2.getText());?
29                 }?
30          }*/?
31         //第二種??
32         // 設置一個監聽器?
33         // System.out.println("start");?
34         // 這裏其實開辟了一個新的線程?
35         consumer.setMessageListener(new MessageListener() {??
36             //當有消息的時候會執行以下的邏輯?
37             @Override?
38             public void onMessage(Message message) {?
39                 if(message instanceof TextMessage){?
40                     TextMessage message2 = (TextMessage) message;?
41                     try {?
42                         System.out.println("接收的消息為"+message2.getText());?
43                     } catch (JMSException e) {?
44                         e.printStackTrace();?
45                     }?
46                 }?
47             }?
48         });?
49         //System.out.println("end");?
50         Thread.sleep(199999);?
51         // 8.關閉資源?
52         consumer.close();?
53         session.close();?
54         connection.close();?
55     }??
56 }

先啟動的成為主節點,平常主節點工作,slave不工作但是一直做監聽。當主節點掛掉,slave接手工作。

二、 基於zookeeper的activemq集群搭建(推薦)

2.1 基於可復制的 LevelDB
  LevelDB 是 Google 開發的一套用於持久化數據的高性能類庫。 LevelDB 並不是一種服務,用戶需要自行實現 Server。 是單進程的服務,能夠處理十億級別規模 Key-Value 型數據,占用內存小。
  http://activemq.apache.org/replicated-leveldb-store.html

技術分享圖片

  高可用的原理:使用 ZooKeeper(集群)註冊所有的 ActiveMQ Broker。只有其中的一個 Broker 可以提供服務,被視為 Master,其他的 Broker 處於待機狀態,被視為 Slave。如果 Master 因故障而不能提供服務,ZooKeeper 會從 Slave 中選舉出一個 Broker 充當 Master。Slave 連接 Master 並同步他們的存儲狀態, Slave 不接受客戶端連接。所有的存儲操作都將被復制到連接至 Master 的 Slaves。 如果 Master 宕了,得到了最新更新的 Slave 會成為 Master。 故障節點在恢復後會重新加入到集群中並連接 Master 進入 Slave 模式。所有需要同步的 disk 的消息操作都將等待存儲狀態被復制到其他法定節點的操作完成才能完成。所以,如果你配置了 replicas=3,那麽法定大小是(3/2)+1=2。 Master 將會存儲並更新然後等待 (2-1)=1 個Slave 存儲和更新完成,才匯報 success。 至於為什麽是 2-1,熟悉 Zookeeper 的應該知道,有一個 node要作為觀擦者存在。當一個新的 Master 被選中,你需要至少保障一個法定 node 在線以能夠找到擁有最新狀態的 node。這個 node 可以成為新的 Master。因此,推薦運行至少 3 個 replica nodes,以防止一個 node失敗了,服務中斷。(原理與 ZooKeeper 集群的高可用實現方式類似)。

2.2 集群單機環境規劃
  定義環境:3個activemq節點(node01 node02 node03)

Ip 集群通信端口 節點消息連接端口 Jetty後臺運行端口
192.168.25.130 63631 51515 8361
192.168.25.130 63632 51516 8362
192.168.25.130 63633 51517 8363

2.3 配置zookeeper集群

見相關教程;

2.4 節點1的配置

  在activemq.xml中配置:

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="brokerCluster" dataDirectory="${activemq.data}">

  brokerName指定一個名字:任意即可。但是整個集群的所有的配置項名稱都應該是一致的。

  broker標簽下的配置:

<persistenceAdapter> 
  <!-- kahaDB directory="${activemq.data}/kahadb"/ --> 
  <replicatedLevelDB 
  directory="${activemq.data}/leveldb" 
  replicas="3" 
  bind="tcp://0.0.0.0:63631" 
  zkAddress="192.168.25.130:2181,192.168.25.130:2182,192.168.25.130:2183" 
  hostname="localhost" 
  zkPath="/activemq2/leveldb-stores"/> 
</persistenceAdapter>

  jetty.xml:配置項:

<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
  <!-- the default port number for the web console -->
  <property name="host" value="0.0.0.0"/>
  <property name="port" value="8361"/>
</bean>

2.5 節點2的配置
  參考節點1搭建即可,註意:端口不要一樣。

  搭建後的截圖:?

技術分享圖片

  node01 -node03 是activemq的3個節點。

ActiveMQ集群搭建