1. 程式人生 > >3,ActiveMQ-入門(基於JMS發布訂閱模型)

3,ActiveMQ-入門(基於JMS發布訂閱模型)

監聽 int @override 技術 image 可持久化 發布訂閱模型 reat creat

一、Pub/Sub-發布/訂閱消息傳遞模型

在發布/訂閱消息模型中,發布者發布一個消息,該消息通過topic傳遞給所有的客戶端。在這種模型中,發布者和訂閱者彼此不知道對方,是匿名的且可以動態發布和訂閱topic。topic主要用於保存和傳遞消息,且會一直保存消息直到消息被傳遞給客戶端。

技術分享圖片

發布訂閱模型就像訂閱報紙。我們可以選擇一份或者多份報紙,比如:北京日報、人民日報。這些報紙就相當於發布訂閱模型中的topic。如果有很多人訂閱了相同的報紙,那我們就在同一個topic中註冊,對於報紙發行方,它就和所有的訂閱者形成了一對多的關系。如下:

技術分享圖片

二,Pub/Sub特點

1,每個消息可以有多個消費者。

2,發布者和訂閱者之間有時間上的依賴性。針對某個主題(Topic)的訂閱者,它必須創建一個或多個訂閱者之後,才能消費發布者的消息,而且為了消費消息,訂閱者必須保持運行的狀態。

3,為了緩和這樣嚴格的時間相關性,JMS允許訂閱者創建一個可持久化的訂閱。這樣,即使訂閱者沒有被激活(運行),它也能接收到發布者的消息。

三,發布訂閱模型的實現

3.1,發布者

/**
 * 
 * @類名稱:ActiveMQpubsubProducter
 * @類描述:發布者-發布訂閱模型
 */
public class ActiveMQpubsubProducter {
    
//會話對象 private static Session session = null; public static void sendMessages(String name){ try { session = ActiveMQLinkUtil.initConnection(true, Session.AUTO_ACKNOWLEDGE); if(session != null){ //創建一個主題 Topic messageTopic = session.createTopic(name);
//創建消息發布者 MessageProducer messageProducer = session.createProducer(messageTopic); for (int i = 0; i < 5 ; i++) { TextMessage t = session.createTextMessage("ActiveMQ發布消息:" + i); messageProducer.send(t); } session.commit(); } } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) { ActiveMQpubsubProducter.sendMessages("topicDemo"); } }

3.2,訂閱者

----------------------------訂閱者1----------------------------

/**
 * 
 * @類名稱:ActiveMQpubsubComsumer1
 * @類描述:訂閱者1-發布訂閱模型
 */
public class ActiveMQpubsubComsumer1 {
    //會話對象
    private static Session session = null;
    
    public static void getMessages(String name){
        try {
            session = ActiveMQLinkUtil.initConnection(false, Session.AUTO_ACKNOWLEDGE);
            if(session != null){
                Topic topic = session.createTopic(name);
                //創建消費者  
                MessageConsumer messageConsumer = session.createConsumer(topic);
                //註冊消息監聽  
                messageConsumer.setMessageListener(new MyListener1());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    public static void main(String[] args) {
        ActiveMQpubsubComsumer1.getMessages("topicDemo");
    }
}

----------------------------訂閱者2----------------------------

/**
 * 
 * @類名稱:ActiveMQpubsubComsumer2
 * @類描述:訂閱者2-發布訂閱模型
 */
public class ActiveMQpubsubComsumer2 {
    //會話對象
    private static Session session = null;
    
    public static void getMessages(String name){
        try {
            session = ActiveMQLinkUtil.initConnection(false, Session.AUTO_ACKNOWLEDGE);
            if(session != null){
                Topic topic = session.createTopic(name);
                //創建消費者  
                MessageConsumer messageConsumer = session.createConsumer(topic);
                //註冊消息監聽  
                messageConsumer.setMessageListener(new MyListener2());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    public static void main(String[] args) {
        ActiveMQpubsubComsumer2.getMessages("topicDemo");
    }
}

3.3,監聽類

----------------------------訂閱者1-監聽類----------------------------

/**
 * 
 * @類名稱:MyListener1
 * @類描述:訂閱者1-監聽類
 */
public class MyListener1 implements MessageListener {
    @Override
    public void onMessage(Message message) {
        try {
            System.out.println("訂閱者一,收到的消息:" + ((TextMessage)message).getText());
            //簽收
            message.acknowledge();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

----------------------------訂閱者2-監聽類----------------------------

/**
 * 
 * @類名稱:MyListener2
 * @類描述:訂閱者2-監聽類
 */
public class MyListener2 implements MessageListener {
    @Override
    public void onMessage(Message message) {
        try {
            System.out.println("訂閱者二,收到的消息:" + ((TextMessage)message).getText());
            //簽收
            message.acknowledge();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

四,測試

在點對點的要先啟動生產者,生產者要生產消息。而發布訂閱模型,要先啟動訂閱者,訂閱者先訂閱topic,再發布消息。

第一步:運行ActiveMQpubsubComsumer1,ActiveMQpubsubComsumer2

第二步:運行ActiveMQpubsubProducter

五,結果

ActiveMQpubsubComsumer1:

技術分享圖片

ActiveMQpubsubComsumer2:

技術分享圖片

ActiveMQpubsubProducter:

技術分享圖片

ActiveMQ控制臺截圖:

技術分享圖片

Number Of Consumers

在該隊列上還有多少消費者在等待接受消息。

Messages Dequeued

消費了多少條消息,記做C。

Messages Enqueued

生產了多少條消息,記做P。

發布者發布了5條數據,但是出隊的有10條,因為有兩個訂閱者。

六,總結

發布者向一個特定的消息主題發布消息,0或者多個訂閱者可能接收到來自特定消息主題的消息感興趣。其中發布者和訂閱者不知道對方的存在。

3,ActiveMQ-入門(基於JMS發布訂閱模型)