1. 程式人生 > >activeMQ中的Virtual Topics詳解及使用

activeMQ中的Virtual Topics詳解及使用

一、好言

太遠容易生疏,太近容易情盡。

二、背景

最近接手專案,公司的MQ做了一層封裝,挺好用的,會有一片文章記載,然後在其中我們使用了<a href="http://activemq.apache.org/virtual-destinations.html">虛擬話題</a>的概念,這個我沒有使用過,之前一直都是使用單純的佇列或者topic,所以就查詢資料,自己配製寫測試案列看看實際效果。

三、直接先上測試程式碼

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import javax.jms.*;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Created by Mahone Wu on 2017/4/12.
 */
@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration("/applicationContext-v.xml")
public class TopicTest {

    private Logger logger = LoggerFactory.getLogger(SpringJmsTopicTest.class);

    ActiveMQConnectionFactory factoryA;

    Session session;


    @Before
    public void init(){
        try{
            factoryA = getAMQConnectionFactory();
            ActiveMQConnection conn = (ActiveMQConnection) factoryA.createConnection();
            conn.start();
            session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
        }catch (Exception e){
            e.printStackTrace();
        }

    }

    @Test
    public void testNormalTopic(){
        try {
            ActiveMQTopic queue = new ActiveMQTopic(getNormalTopicName());
            MessageConsumer consumer1 = session.createConsumer(queue);
            MessageConsumer consumer2 = session.createConsumer(queue);
            final AtomicInteger count = new AtomicInteger(0);
            MessageListener listenerA = new MessageListener() {
                public void onMessage(javax.jms.Message message) {

                    try{
                        int index =count.incrementAndGet();
                        logger.info("index={}---------->receive from {},訊息={}",index,getNormalTopicName(),((TextMessage)message).getText());
                        Thread.sleep(10L);
                    }catch (Exception e){
                        e.printStackTrace();
                    }
                }
            };
            consumer1.setMessageListener(listenerA);
            consumer2.setMessageListener(listenerA);

            MessageProducer producer = session.createProducer(new ActiveMQTopic(getNormalTopicName()));
            int index = 0;
            while (index++ < 10) {
             //   logger.info("{},{}",index < 100,index + " message.");
                TextMessage message = session.createTextMessage(index
                        + " message.");
                producer.send(message);
                Thread.sleep(5L);
            }
        }catch (Exception e){
            e.printStackTrace();
        }

        try {
            System.in.read();
        }catch (Exception e){
            e.printStackTrace();
        }

    }



    @Test
    public void testNormalVirtualTopic(){
        try{
            Queue queue = new ActiveMQQueue(getVirtualTopicConsumerName());
            MessageConsumer consumer1 = session.createConsumer(queue);
            MessageConsumer consumer2 = session.createConsumer(queue);
            final AtomicInteger count = new AtomicInteger(0);

            MessageListener listenerA = new MessageListener() {
                public void onMessage(javax.jms.Message message) {
                    try {
                        int index = count.getAndIncrement();
                        logger.info("index={}---------->receive from {},訊息={}", index, getNormalTopicName(), ((TextMessage) message).getText());
                    }catch (Exception e){
                        e.printStackTrace();
                    }
                }
            };
            consumer1.setMessageListener(listenerA);
            consumer2.setMessageListener(listenerA);
            MessageProducer producer = session.createProducer(new ActiveMQTopic(getNormalVTopicName()));
            int index = 0;
            while (index++ < 10) {
                TextMessage message = session.createTextMessage(index
                        + " message.");
                producer.send(message);
            }
        }catch (Exception e){
            e.printStackTrace();
        }

    }

    @Test
    public void testVirtualTopic(){
        try {
            Queue queue = new ActiveMQQueue(getVirtualTopicConsumerNameA());
            MessageConsumer consumer1 = session.createConsumer(queue);
            MessageConsumer consumer2 = session.createConsumer(queue);
            MessageConsumer consumer3 = session.createConsumer(new ActiveMQQueue(getVirtualTopicConsumerNameB()));

            final AtomicInteger countA = new AtomicInteger(0);
            MessageListener listenerA = new MessageListener() {
                public void onMessage(javax.jms.Message message) {
                    try {
                        int index = countA.getAndIncrement();
                        logger.info("A index={}---------->receive from {},訊息={}", index, getNormalTopicName(), ((TextMessage) message).getText());
                    }catch (Exception e){
                        logger.error(""+e);
                        e.printStackTrace();
                    }
                }
            };
            consumer1.setMessageListener(listenerA);
            consumer2.setMessageListener(listenerA);
            final AtomicInteger countB = new AtomicInteger(0);
            MessageListener listenerB = new MessageListener() {
                public void onMessage(javax.jms.Message message) {
                    try {
                        int index = countB.getAndIncrement();
                        logger.info("B index={}---------->receive from {},訊息={}", index, getNormalTopicName(), ((TextMessage) message).getText());
                    }catch (Exception e){
                        e.printStackTrace();
                        logger.error(""+e);
                    }
                }
            };
            consumer3.setMessageListener(listenerB);
            MessageProducer producer = session.createProducer(new ActiveMQTopic(getVirtualTopicName()));
            int index = 0;
            while (index++ < 10) {
                TextMessage message = session.createTextMessage(index
                        + " message.");
                producer.send(message);
            }
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    private ActiveMQConnectionFactory getAMQConnectionFactory(){
        return new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
    }

    private static String getNormalTopicName(){
        return "normal.TEST";
    }

    private static String getNormalVTopicName(){
        return "VirtualTopic.NORMAL";
    }

    private static String getVirtualTopicName(){
        return "VirtualTopic.TEST";
    }

    private static String getVirtualTopicConsumerName(){
        return "Consumer.normal.VirtualTopic.NORMAL";
    }

    private static String getVirtualTopicConsumerNameA(){
        return "Consumer.A.VirtualTopic.TEST";
    }

    private static String getVirtualTopicConsumerNameB(){
        return "Consumer.B.VirtualTopic.TEST";
    }
}

applicationContext-v.xml:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns:p="http://www.springframework.org/schema/p" xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
    xsi:schemaLocation="
            http://www.springframework.org/schema/beans
            http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/context
            http://www.springframework.org/schema/context/spring-context.xsd">
</beans>

testNormalTopic列印結果:

testNormalTopic.png

結論:從這個裡面我們可以看出,consumer1,consumer2都監聽了normal.TEST,所以結果是列印了20條資料

testNormalVirtualTopic列印結果:

testNormalVirtualTopic.png

結論:虛擬的方式,consumer1,consumer2,都訂閱了VirtualTopic.NORMAL,所以按照配製消費端Consumer.normal.VirtualTopic.NORMAL的方式,結果只打印了10條資料,所以此時consumer1,consumer2只有一個接收成功。

testVirtualTopic列印結果

testVirtualTopic.png

結論:這個跟上面這個類似,對比更佳明顯的說明了VirtualTopic中,A,B是兩個不同的應用,所以列印了20條,如果是同一個應用,則只會又一個接收成功。

Consumer.A.VirtualTopic.Test 1、Consumer ,VirtualTopic 為系統配置,勿做修改。** 2、A為消費方的系統名稱。 3、Test 為根據業務定義的訊息地址 <a href="http://activemq.apache.org/virtual-destinations.html">官方文件</a>有做相關說明。

四、虛擬主題用處

摘自網上:

  1. 同一應用內consumer端負載均衡的問題:同一個應用上的一個持久訂閱不能使用多個consumer來共同承擔訊息處理功能。因為每個都會獲取所有訊息。queue模式可以解決這個問題,broker端又不能將訊息傳送到多個應用端。所以,既要釋出訂閱,又要讓消費者分組,這個功能jms規範本身是沒有的。
  1. 同一應用內consumer端failover的問題:由於只能使用單個的持久訂閱者,如果這個訂閱者出錯,則應用就無法處理訊息了,系統的健壯性不高。 對於上述的表述個人覺得理解起來好糾結,因為這裡又涉及到持久化問題,對於持久訂閱的意義可以看這篇<a href="http://blog.csdn.net/wenlixing110/article/details/53032324">文章</a>

所以我個人覺得這裡解決的就是對於如果單純的使用topic方式,那麼如果消費端部署的是叢集方式,那麼每一個都訂閱了,在傳送訊息的時候,叢集中的每一個訂閱者都有可能收到,那麼這不是我們想要的效果;可能上面說的有這麼一個方面還有一個就是涉及到持久定於的健壯性問題。 所以virtualtopic我們可以理解為是queue和topic的結合,即是,使用topic的一對多的廣播功能,又需要在叢集的時候,只有一個收到,也就是佇列的一對一的特效。

作者:吳世浩 連結:https://www.jianshu.com/p/8b255d8e885c 來源:簡書 簡書著作權歸作者所有,任何形式的轉載都請聯絡作者獲得授權並註明出處。