1. 程式人生 > >ActiveMQ之虛擬主題和映象佇列

ActiveMQ之虛擬主題和映象佇列

轉自:http://blog.csdn.net/zhu_tianwei/article/details/46303419, 略做補充

ActiveMQ支援的虛擬Destinations分為有兩種,分別是

1.虛擬主題(Virtual Topics)
2.組合 Destinations(CompositeDestinations)

這兩種虛擬Destinations可以看做對簡單的topic和queue用法的補充,基於它們可以實現一些簡單有用的EIP功能,虛擬主題類似於1對多的分支功能+消費端的cluster+failover,組合Destinations類似於簡單的destinations直接的路由功能。

虛擬主題(Virtual Topics)
ActiveMQ中,topic只有在持久訂閱(durablesubscription)下是持久化的。存在持久訂閱時,每個持久訂閱者,都相當於一個持久化的queue的客戶端,它會收取所有訊息。這種情況下存在兩個問題:
1.同一應用內consumer端負載均衡的問題:同一個應用上的一個持久訂閱不能使用多個consumer來共同承擔訊息處理功能。因為每個都會獲取所有訊息。queue模式可以解決這個問題,broker端又不能將訊息傳送到多個應用端。所以,既要釋出訂閱,又要讓消費者分組,這個功能jms規範本身是沒有的。
2.同一應用內consumer端failover的問題:由於只能使用單個的持久訂閱者,如果這個訂閱者出錯,則應用就無法處理訊息了,系統的健壯性不高。
為了解決這兩個問題,ActiveMQ中實現了虛擬Topic的功能。使用起來非常簡單。
對於訊息釋出者來說,就是一個正常的Topic,名稱以VirtualTopic.開頭。例如VirtualTopic.TEST。
對於訊息接收端來說,是個佇列,不同應用裡使用不同的字首作為佇列的名稱,即可表明自己的身份即可實現消費端應用分組。例如Consumer.A.VirtualTopic.TEST,說明它是名稱為A的消費端,同理Consumer.B.VirtualTopic.TEST說明是一個名稱為B的客戶端。可以在同一個應用裡使用多個consumer消費此queue,則可以實現上面兩個功能。又因為不同應用使用的queue名稱不同(字首不同),所以不同的應用中都可以接收到全部的訊息。每個客戶端相當於一個持久訂閱者,而且這個客戶端可以使用多個消費者共同來承擔消費任務。

生產者:

  1. package cn.slimsmart.activemq.demo.virtualtopic;  
  2. import javax.jms.Connection;  
  3. import javax.jms.DeliveryMode;  
  4. import javax.jms.JMSException;  
  5. import javax.jms.MessageProducer;  
  6. import javax.jms.Session;  
  7. import javax.jms.TextMessage;  
  8. import javax.jms.Topic;  
  9. import org.apache.activemq.ActiveMQConnectionFactory;  
  10. publicclass Producer {  
  11.     publicstaticvoid main(String[] args) throws JMSException {  
  12.         // 連線到ActiveMQ伺服器
  13.         ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.18.67:61616");  
  14.         Connection connection = factory.createConnection();  
  15.         connection.start();  
  16.         Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);  
  17.         // 建立主題
  18.         Topic topic = session.createTopic("VirtualTopic.TEST");  
  19.         MessageProducer producer = session.createProducer(topic);  
  20.         // NON_PERSISTENT 非持久化 PERSISTENT 持久化,傳送訊息時用使用持久模式
  21.         producer.setDeliveryMode(DeliveryMode.PERSISTENT);  
  22.         TextMessage message = session.createTextMessage();  
  23.         message.setText("topic 訊息。");  
  24.         message.setStringProperty("property""訊息Property");  
  25.         // 釋出主題訊息
  26.         producer.send(message);  
  27.         System.out.println("Sent message: " + message.getText());  
  28.         session.close();  
  29.         connection.close();  
  30.     }  
  31. }  

消費者:

  1. package cn.slimsmart.activemq.demo.virtualtopic;  
  2. import javax.jms.Connection;  
  3. import javax.jms.JMSException;  
  4. import javax.jms.Message;  
  5. import javax.jms.MessageConsumer;  
  6. import javax.jms.MessageListener;  
  7. import javax.jms.Queue;  
  8. import javax.jms.Session;  
  9. import javax.jms.TextMessage;  
  10. import org.apache.activemq.ActiveMQConnectionFactory;  
  11. publicclass Consumer {  
  12.     publicstaticvoid main(String[] args) throws JMSException, InterruptedException {  
  13.         // 連線到ActiveMQ伺服器
  14.         ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://192.168.18.67:61616");  
  15.         Connection connection = factory.createConnection();  
  16.         connection.start();  
  17.         Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);  
  18.         // 建立主題 
  19.         Queue topicA = session.createQueue("Consumer.A.VirtualTopic.TEST");  
  20.         Queue topicB = session.createQueue("Consumer.B.VirtualTopic.TEST");  
  21.         // 消費者A組建立訂閱
  22.         MessageConsumer consumerA1 = session.createConsumer(topicA);  
  23.         consumerA1.setMessageListener(new MessageListener() {  
  24.             // 訂閱接收方法
  25.             publicvoid onMessage(Message message) {  
  26.                 TextMessage tm = (TextMessage) message;  
  27.                 try {  
  28.                     System.out.println("Received message A1: " + tm.getText()+":"+tm.getStringProperty("property"));  
  29.                 } catch (JMSException e) {  
  30.                     e.printStackTrace();  
  31.                 }  
  32.             }  
  33.         });  
  34.         MessageConsumer consumerA2 = session.createConsumer(topicA);  
  35.         consumerA2.setMessageListener(new MessageListener() {  
  36.             // 訂閱接收方法
  37.             publicvoid onMessage(Message message) {  
  38.                 TextMessage tm = (TextMessage) message;  
  39.                 try {  
  40.                     System.out.println("Received message A2: " + tm.getText()+":"+tm.getStringProperty("property"));  
  41.                 } catch (JMSException e) {  
  42.                     e.printStackTrace();  
  43.                 }  
  44.             }  
  45.         });  
  46.         //消費者B組建立訂閱
  47.         MessageConsumer consumerB1 = session.createConsumer(topicB);  
  48.         consumerB1.setMessageListener(new MessageListener() {  
  49.             // 訂閱接收方法
  50.             publicvoid onMessage(Message message) {  
  51.                 TextMessage tm = (TextMessage) message;  
  52.                 try {  
  53.                     System.out.println("Received message B1: " + tm.getText()+":"+tm.getStringProperty("property"));  
  54.                 } catch (JMSException e) {  
  55.                     e.printStackTrace();  
  56.                 }  
  57.             }  
  58.         });  
  59.         MessageConsumer consumerB2 = session.createConsumer(topicB);  
  60.         consumerB2.setMessageListener(new MessageListener() {  
  61.             // 訂閱接收方法
  62.             publicvoid onMessage(Message message) {  
  63.                 TextMessage tm = (TextMessage) message;  
  64.                 try {  
  65.                     System.out.println("Received message B2: " + tm.getText()+":"+tm.getStringProperty("property"));  
  66.                 } catch (JMSException e) {  
  67.                     e.printStackTrace();  
  68.                 }  
  69.             }  
  70.         });  
  71.         session.close();  
  72.         connection.close();  
  73.     }  
  74. }  
使用同樣queue名稱的消費者會平分所有訊息。
從queue接收到的訊息,message.getJMSDestination().toString()為topic://VirtualTopic.TEST,即原始的destination。訊息的persistent屬性為true,即每個相當於一個持久訂閱。
A1和A2為一個應用,B1和B2為一個應用,2組應用內部做負載,和failover。

Virtual Topic這個功能特性在broker上有個總開關,useVirtualTopics屬性,預設為true,設定為false即可關閉此功能。當此功能開啟,並且使用了持久化的儲存時,broker啟動的時候會從持久化儲存裡拿到所有的destinations的名稱,如果名稱模式與Virtual Topics匹配,則把它們新增到系統的Virtual Topics列表中去。當然,沒有顯式定義的Virtual Topics,也可以直接使用的,系統會自動建立對應的實際topic。當有consumer訪問此VirtualTopics時,系統會自動建立持久化的queue,並在每次Topic收到訊息時,分發到具體的queue。
消費端使用的queue名稱字首的Consumer是可以修改的。示例如下: