1. 程式人生 > >ActiveMQ學習筆記(14)----Destination高階特性(二)

ActiveMQ學習筆記(14)----Destination高階特性(二)

1. Visual Destinations

  1.1 概述

  虛擬Destination用來建立邏輯Destinations,客戶端可以通過它來產生和消費訊息,它會把訊息對映到物理Destinations。ActiveMQ支援兩種方式:

  1. 虛擬主題(Virtual Topics)

  2. 組合Destinations (Composite Destinations)

  1.2 為何使用虛擬主題

  ActiveMQ中,topic只有在持久訂閱下才會持久化,持久訂閱時,每個訂閱者,都相當於一個queue的客戶端,它會收取所有訊息,這種情況下存在兩個問題:

  1. 同一個應用內的consumer端負載均衡的問題,也就是同一個應用上的一個持久訂閱者不能使用多個consumer來共同承擔訊息處理功能,因為每個consumer都會獲取所有訊息。

  2. 同一應用內的consumer端的failover問題:由於只能使用單個的持久訂閱者,如果這個訂閱者出錯,則應用就無法處理訊息了,系統的健壯性不高。

  為了解決這兩個問題,ActiveMQ中實現了虛擬的Topic的功能。

  1.3 如何使用虛擬主題

  1. 對於訊息釋出者來說,就是一個正常的Topic,名稱以VirtualTopic開頭。例如

  VirtualTopic.Orders.程式碼示例如下:

Topic destination = session.createTopic("VirtualTopic.Orders");

  2. 對於訊息接收端來說,是一個佇列,不同應用裡面應使用不同的字首作為佇列的名稱,即可表明自己的身份,即可實現消費端應用的分組。

  例如Consumer.A.VritualTopic.Orders,說明它是名稱A的消費端,同理Consumer.B.VritualTopic.Orders表示時一個名稱為B的客戶端,可以在同一個應用裡面使用多個consumer消費此queue,則可以實現上面兩個功能。

  又因為不同應用使用的queue名稱不同(字首不同),所以不同的應用中都可以接收到全部的訊息,每個客戶端相當於一個持久訂閱者,而且這個客戶端可以使用多個消費者來共同承擔消費任務,程式碼示例如下:

 Destination destiantion = session.createQueue("Consumer.A.VirtualTopic.Orders“);

  完整得測試程式碼如下:

  訊息傳送者

package com.wangx.activemq.topic;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class TopicSender {

    public static void main(String[] args) throws JMSException {
        ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        Connection connection = factory.createConnection();
        Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);

        //建立虛擬主題,加字首VirtualTopic
        Topic topic = session.createTopic("VirtualTopic.myTopic");
        MessageProducer producer = session.createProducer(topic);
        producer.setDeliveryMode(DeliveryMode.PERSISTENT);
        connection.start();

        for (int i = 0; i < 30; i++) {
            TextMessage textMessage = session.createTextMessage("topic訊息===" + i);
            producer.send(textMessage);
        }
        session.commit();
        connection.close();

    }
}

  A客戶端程式碼(只有一個消費者)

package com.wangx.activemq.topic;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;
public class QR1 {

    public static void main(String[] args) throws JMSException {
        ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        Connection connection = factory.createConnection();
        connection.start();
        final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
        //建立佇列
        Destination destination = session.createQueue("Consumer.A.VirtualTopic.myTopic");
        MessageConsumer consumer = session.createConsumer(destination);

        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.out.println("Consumer.A.接收到得訊息:" + textMessage.getText());
                    session.commit();
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });

    }
}

  B客戶端程式碼,有兩個消費者

package com.wangx.activemq.topic;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class QR2 {
    public static void main(String[] args) throws JMSException {
        ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        Connection connection = factory.createConnection();
        connection.start();
        final Session session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
        //建立佇列
        Destination destination = session.createQueue("Consumer.B.VirtualTopic.myTopic");
        final MessageConsumer consumer = session.createConsumer(destination);
        final MessageConsumer messageConsumer = session.createConsumer(destination);
        //模擬多個consumer消費一個queue
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    consumer.setMessageListener(new MessageListener() {
                        @Override
                        public void onMessage(Message message) {
                            TextMessage textMessage = (TextMessage) message;
                            try {
                                System.out.println("Consumer.B-->consumer接收到訊息:" + textMessage.getText());
                                session.commit();
                            } catch (JMSException e) {
                                e.printStackTrace();
                            }
                        }
                    });
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    messageConsumer.setMessageListener(new MessageListener() {
                        @Override
                        public void onMessage(Message message) {
                            TextMessage textMessage = (TextMessage) message;
                            try {
                                System.out.println("Consumer.B-->messageConsumer接收到訊息:" + textMessage.getText());
                                session.commit();
                            } catch (JMSException e) {
                                e.printStackTrace();
                            }
                        }
                    });
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

  在接收訊息之前,應該先執行一下consumer客戶端,將消費者註冊到Broker中。

  3. 預設虛擬主題得字首是:VirtualTopic.>

  自定義消費虛擬地址預設格式:Consumer.*.VirtualTopic.>

  自定義消費虛擬地址可以改,比如下面的配置就把它修改了。

  xml配置如下

<broker xmlns="http:/activemq.apache.org/schema/core">

    <destinationInterceptors>
      <virtualDestinationInterceptor>
        <virtualDestinations>
          <virtualTopic name=">" prefix="VirtualTopicConsumers.*." selectorAware="false"/>
        </virtualDestinations>
      </virtualDestinationInterceptor>
    </destinationInterceptors>
  </broker>

  配置之後將consumer端的相應字首修改即可。

2. Mirrored Queues

  2.1 概述

   ActiveMQ中每個queue中的訊息只能被一個consumer消費,然而,有時候你可能希望能夠監視生產者和消費者之間的訊息流。你可以通過Virtual Destinations來建立一個virtual queue 來把訊息傳送到多個queues中。但是,為系統中沒給queue都進行如此配置可能會很麻煩。

  2.1使用

  ActiveMQ支援Mirrored Queues。Broker會把傳送到某個queue的所有訊息都轉發到一個名稱類似的topic,因此監控程式只需要訂閱這個mirrored queue topic。為了啟用Mirrored Queues,首先將Broker Service的useMirroredQueues屬性設定成true,然後可以通過destinationInterceptors設定其他屬性,如:mirror topic的字首,預設是:“VirtualTopic.Mirror.".

    比如修改後的配置如下

<destinationInterceptors>
            <mirroredQueue copyMessage="true" postfix=".qmirror" prefix=""/>
        </destinationInterceptors>

  在我的配置中並沒有配置配置字首,但是一定需要配置copyMessage="true",檢視控制檯如下

  存在預設字首的topic,此時只需要用一個消費者去消費該topic中的訊息就可以了。

3. Per Destination Policies

  ActiveMQ支援多種不同的策略,來單獨配置每一個Destination,它的屬性很多,可以參考官方文件:

  http://activemq.apache.org/per-destination-policies.html

  官方文件最後還給出了一個demo做參考。