1. 程式人生 > >SpringBoot+ActiveMq實現訂閱模式(Topic)訊息佇列

SpringBoot+ActiveMq實現訂閱模式(Topic)訊息佇列

上文已經詳細介紹了點對點模式(Queue)下的訊息佇列,今天就來再介紹一下訊息佇列的另一種模式:訂閱模式。

一、訂閱模式的流程

生產者產生一條訊息message放入一個topic中,該topic已經三個消費者訂閱了,那麼被放入topic中的這條訊息,就會同時被這三個消費者取走(當然他們必須都處於線上狀態),並進行“消費”。其實就類似現實生活中的手機接收推送。

二、訂閱模式的應用場景

釋出訂閱模式下,當釋出者訊息量很大時,顯然單個訂閱者的處理能力是不足的。實際上現實場景中是多個訂閱者節點組成一個訂閱組負載均衡消費topic訊息即分組訂閱,這樣訂閱者很容易實現消費能力線性擴充套件。可以看成是一個topic下有多個Queue,每個Queue是點對點的方式,Queue之間是釋出訂閱方式。

三、具體實現

ActiveMq的配置以及pom匯入的jar包可以參考上文

1、建立生產者:

/**
 *
 * @author yuyan
 * @create 2018-08-28 16:09
 **/
@Service
public class Topic_Producer {

    public void sendMessage(String msg){
        try {
            //建立連線工廠
            ActiveMQConnectionFactory connFactory = new ActiveMQConnectionFactory(
                    ActiveMQConnection.DEFAULT_USER,
                    ActiveMQConnection.DEFAULT_PASSWORD,
                    "tcp://localhost:61616");
            connFactory.setMaxThreadPoolSize(1);

            //連線到JMS提供者
            Connection conn = connFactory.createConnection();
//            conn.setClientID("producer1");
            conn.start();

            //事務性會話,自動確認訊息
            Session session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
            //訊息的目的地
            Destination destination = session.createTopic("topic1");
            //訊息生產者
            MessageProducer producer = session.createProducer(destination);
//            producer.setDeliveryMode(DeliveryMode.PERSISTENT); //持久化


//           //文字訊息
//          TextMessage textMessage = session.createTextMessage("這是文字訊息");
//          producer.send(textMessage);

            //鍵值對訊息
            MapMessage mapMessage = session.createMapMessage();
            mapMessage.setString("reqDesc", msg);
            producer.send(mapMessage);
//
//            //流訊息
//            StreamMessage streamMessage = session.createStreamMessage();
//            streamMessage.writeString("這是流訊息");
//            producer.send(streamMessage);
//
//            //位元組訊息
//            String s = "BytesMessage位元組訊息";
//            BytesMessage bytesMessage = session.createBytesMessage();
//            bytesMessage.writeBytes(s.getBytes());
//            producer.send(bytesMessage);
//
//            //物件訊息
//            User user = new User("obj_info", "物件訊息"); //User物件必須實現Serializable介面
//            ObjectMessage objectMessage = session.createObjectMessage();
//            objectMessage.setObject(user);
//            producer.send(objectMessage);


            session.commit(); //提交會話,該條訊息會進入"queue"佇列,生產者也完成了歷史使命
            producer.close();
            session.close();
            conn.close();
            //在事務性會話中,只有commit之後,訊息才會真正到達目的地

        }catch (Exception e){
            e.printStackTrace();

        }

    }

}

2、建立消費者

package com.springjms.queue_message;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

import javax.jms.*;
import java.util.Date;

/**
 *
 * @author
 * @create 2018-09-06 9:55
 **/
@Component
public class Topic_Consumer implements ApplicationRunner{

    @Override
    public void run(ApplicationArguments args) throws Exception {
        init();
    }

    public  void init() throws JMSException {
        ConnectionFactory factory = new ActiveMQConnectionFactory(
                ActiveMQConnectionFactory.DEFAULT_USER,
                ActiveMQConnectionFactory.DEFAULT_PASSWORD,
                "tcp://localhost:61616"
        );

        Connection conn = factory.createConnection();
//        conn.setClientID("consumer1");
        conn.start();

        Session session = conn.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
        //與生產者的訊息目的地相同
        Destination dest = session.createTopic("topic1");

        MessageConsumer messConsumer = session.createConsumer(dest);

        messConsumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                try {
                    MapMessage m = (MapMessage)message;
                    System.out.println("consumer1接收到"+m.getString("reqDesc")+"的請求並開始處理,時間是"+new Date());
                    System.out.println("這裡會停頓5s,模擬系統處理請求,時間是"+new Date());
                    Thread.sleep(5000);
                    System.out.println("consumer1接收到"+m.getString("reqDesc")+"的請求並處理完畢,時間是"+new Date());
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        });

    }

}


現在的生產者和消費者都處於同一個專案中,且是一對一的關係,如果想要驗證一個生產者對應多個消費者的情況,可以再新建一個專案,並且建立一個消費者,只要保證topic相同即可。

3、介面測試

 @RequestMapping(value = "/SendMessageByTopic", method = RequestMethod.GET)
    @ResponseBody
    public void sendTopic(String msg) {
        try {
            System.out.println(msg+"開始發出一次請求,時間是"+new Date());
            topic_producer.sendMessage(msg);
            System.out.println(msg+"請求傳送完成,時間是"+new Date());


        }catch (Exception e){
            e.printStackTrace();
        }
    }

測試結果:

可以看到兩個消費者consumer1、consumer2同時收到了來自topic的請求,並且同時完成了處理;

觀察http://localhost:8161/admin/topics.jsp

未發請求時,topic1中有兩個消費者,入佇列與出佇列的訊息數都是0:

發出請求後,topic1中有了一條訊息,入佇列數為1,出佇列數為0:

請求處理完畢後,topic1中的出佇列數為2,入佇列數為1,證明這條訊息分別被兩個消費者消費了:

這樣,訊息佇列的兩種模式就已經介紹完了,文章中介紹的方式都是基於ActiveMq這種傳統的訊息佇列,其實還有諸如rabbitMq、kafka、rocketMq等訊息佇列,它們的原理和實現方式都不盡相同,以後有時間,還是需要再研究一下!