1. 程式人生 > >消息中間件JMS

消息中間件JMS

發送消息 圖片 spa 出了 () 3.4 消費者 技術 pro

一.什麽是消息中間件  

   消息中間件利用高效可靠的消息傳遞機制進行平臺無關的數據交流,並基於數據通信來進行分布式系統的集成。通過提供消息傳遞和消息排隊模型,它可以在分布式環境下擴展進程間的通信。

  ActiveMQ 是Apache出品,最流行的,能力強勁的開源消息總線。ActiveMQ 是一個完全支持JMS1.1和J2EE 1.4規範的 JMS Provider實現。

  

  對於消息的傳遞有兩種類型:

    一種是點對點的,即一個生產者和一個消費者一一對應;

    技術分享圖片

    另一種是發布/ 訂閱模式,即一個生產者產生消息並進行發送後,可以由多個消費者進行接收。

    技術分享圖片

二.ActiveMQ
下載與安裝

  1.官方網站下載:http://activemq.apache.org/

  2.centos上安裝ActiveMQ

    1)將apache-activemq-5.12.0-bin.tar.gz 上傳至服務器

    2)解壓

      tar zxvf apache-activemq-5.12.0-bin.tar.gz

    3)為apache-activemq-5.12.0目錄賦權

      chmod 777 apache-activemq-5.12.0

    4)進入apache-activemq-5.12.0\bin目錄,賦與執行權限

     chmod 755 activemq

  3.啟動  

    ./activemq start

    啟動成功出現如下信息

    技術分享圖片

  4.打開瀏覽器輸入服務器地址

    http://192.168.56.102:8161

    即可訪問管理界面

    技術分享圖片

    點擊Manage ActiveMQ broker登錄,登錄名和密碼均為admin

  5.點對點消息列表Queues  

  技術分享圖片  

    Number Of Pending Messages :等待消費的消息 這個是當前未出隊列的數量。
    Number Of Consumers :消費者 這個是消費者端的消費者數量
    Messages Enqueued :進入隊列的消息 進入隊列的總數量,包括出隊列的。
    Messages Dequeued :出了隊列的消息 可以理解為是消費這消費掉的數量。

三.測試案例

  1.點對點模式

    1.1消息生產者

    1)創建Maven工程,引入依賴

<dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-client</artifactId>
    <version>5.13.4</version>
</dependency>

    2)創建類QueueProducer main方法代碼如下:

    //1.創建連接工廠
    ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.25.135:61616");
    //2.獲取連接
    Connection connection = connectionFactory.createConnection();
    //3.啟動連接
    connection.start();
    //4.獲取session  (參數1:是否啟動事務,參數2:消息確認模式)
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);        
    //5.創建隊列對象
    Queue queue = session.createQueue("test-queue");
    //6.創建消息生產者
    MessageProducer producer = session.createProducer(queue);
    //7.創建消息
    TextMessage textMessage = session.createTextMessage("歡迎來到ActiveMQ世界");
    //8.發送消息
    producer.send(textMessage);
    //9.關閉資源
    producer.close();
    session.close();
    connection.close();    

    上述代碼中第4步創建session 的兩個參數: 

    第2個參數 消息的確認模式

  • AUTO_ACKNOWLEDGE = 1 自動確認
  • CLIENT_ACKNOWLEDGE = 2 客戶端手動確認
  • DUPS_OK_ACKNOWLEDGE = 3 自動批量確認
  • SESSION_TRANSACTED = 0 事務提交並確認

    運行後通過ActiveMQ管理界面查詢

  技術分享圖片

    1.2 消息消費者

    創建類QueueConsumer ,main方法代碼如下:

    //1.創建連接工廠
    ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.56.102:61616");
    //2.獲取連接
    Connection connection = connectionFactory.createConnection();
    //3.啟動連接
    connection.start();
    //4.獲取session  (參數1:是否啟動事務,參數2:消息確認模式)
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    //5.創建隊列對象
    Queue queue = session.createQueue("test-queue");
    //6.創建消息消費
    MessageConsumer consumer = session.createConsumer(queue);
    
    //7.監聽消息
    consumer.setMessageListener(new MessageListener() {
        public void onMessage(Message message) {
            TextMessage textMessage=(TextMessage)message;
            try {
                System.out.println("接收到消息:"+textMessage.getText());
            } catch (JMSException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    });    
    //8.等待鍵盤輸入
    System.in.read();    
    //9.關閉資源
    consumer.close();
    session.close();
    connection.close();    

    執行後看到控制臺輸出:歡迎來到神奇的ActiveMq世界

    1.3 運行測試

    同時開啟2個以上的消費者,再次運行生產者,觀察每個消費者控制臺的輸出,會發現只有一個消費者會接收到消息。

  2.發布訂閱模式

    2.1消息生產者

    創建類TopicProducer ,main方法代碼如下:

  

//1.創建連接工廠
    ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.56.102:61616");
    //2.獲取連接
    Connection connection = connectionFactory.createConnection();
    //3.啟動連接
    connection.start();
    //4.獲取session  (參數1:是否啟動事務,參數2:消息確認模式)
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    //5.創建主題對象
    Topic topic = session.createTopic("test-topic");
    //6.創建消息生產者
    MessageProducer producer = session.createProducer(topic);
    //7.創建消息
    TextMessage textMessage = session.createTextMessage("歡迎來到神奇的ActiveMq世界");
    //8.發送消息
    producer.send(textMessage);
    //9.關閉資源
    producer.close();
    session.close();
    connection.close();

    2.2消息消費者

    創建類TopicConsumer ,main方法代碼如下:

    

    //1.創建連接工廠
    ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.56.102:61616");
    //2.獲取連接
    Connection connection = connectionFactory.createConnection();
    //3.啟動連接
    connection.start();
    //4.獲取session  (參數1:是否啟動事務,參數2:消息確認模式)
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    //5.創建主題對象
    //Queue queue = session.createQueue("test-queue");
    Topic topic = session.createTopic("test-topic");
    //6.創建消息消費
    MessageConsumer consumer = session.createConsumer(topic);
    
    //7.監聽消息
    consumer.setMessageListener(new MessageListener() {
        public void onMessage(Message message) {
            TextMessage textMessage=(TextMessage)message;
            try {
                System.out.println("接收到消息:"+textMessage.getText());
            } catch (JMSException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    });
    //8.等待鍵盤輸入
    System.in.read();
    //9.關閉資源
    consumer.close();
    session.close();
    connection.close();    

    2.3 運行測試

      同時開啟2個以上的消費者,再次運行生產者,觀察每個消費者控制臺的輸出,會發現每個消費者會接收到消息。

消息中間件JMS