1. 程式人生 > >MQ 入門(一)——MQ、JMS的瞭解與 activemq 基本操作

MQ 入門(一)——MQ、JMS的瞭解與 activemq 基本操作

一、MQ
1.1 關於訊息佇列 MQ
訊息佇列(MQ)是一種應用程式對應用程式的通訊方法。應用程式通過寫和檢索出入列隊的針對應用程式的資料(訊息)來通訊,而無需專用連線來連結它們。訊息傳遞指的是程式之間通過在訊息中傳送資料進行通訊,而不是通過直接呼叫彼此來通訊,直接呼叫通常是用於諸如遠端過程呼叫的技術。排隊指的是應用程式通過佇列來通訊。佇列的使用除去了接收和傳送應用程式同時執行的要求
1.2 MQ特點:
MQ的消費-生產者模型的一個典型的代表,一端往訊息佇列中不斷的寫入訊息,而另一端則可以讀取或者訂閱佇列中的訊息。MQ和JMS類似,但不同的是JMS是SUN JAVA訊息中介軟體服務的一個標準和API定義,而MQ則是遵循了AMQP協議的具體實現和產品。

1.3 使用場景:
在專案中,將一些無需即時返回且耗時的操作提取出來,進行了非同步處理,而這種非同步處理的方式大大的節省了伺服器的請求響應時間,從而提高了系統的吞吐量。

二、JMS
參考文章:https://baike.baidu.com/item/JMS/2836691?fr=aladdin
2.1 關於JMS
JMS即Java訊息服務(Java Message Service)應用程式介面,是一個Java平臺中關於面向訊息中介軟體(MOM)的API,用於在兩個應用程式之間,或分散式系統中傳送訊息,進行非同步通訊。Java訊息服務是一個與具體平臺無關的API,絕大多數MOM提供商都對JMS提供支援。

2.2. JMS體系結構

JMS由以下元素組成
JMS提供者 —— 連接面向訊息中介軟體的,JMS介面的一個實現。提供者可以是Java平臺的JMS實現,也可以是非Java平臺的面向訊息中介軟體的介面卡 (activemq,rabbitmq等)。
JMS客戶 —— 生產或消費基於訊息的Java的應用程式或物件(生產者和消費者)
JMS生產者 —— 建立併發送訊息的JMS客戶。(activemq 中體現為生成訊息物件,並將物件存入訊息佇列中。)
JMS消費者 —— 接收訊息的JMS客戶。(activemq 中體現為獲取訊息佇列中的訊息物件,進行相應的業務處理)
JMS訊息 —— 在JMS客戶之間傳遞的資料的物件
JMS佇列 —— 一個容納那些被髮送的等待閱讀的訊息的區域。與佇列名字所暗示的意思不同,訊息的接受順序並不一定要與訊息的傳送順序相同。
JMS主題 —— 一種支援傳送訊息給多個訂閱者的機制。

2.3 JMS物件型別
連線工廠(ConnectionFactory)利用連線工廠建立一個JMS連線。
JMS連線(Connection)表示JMS客戶端和伺服器端之間的一個活動的連線,是由客戶端通過呼叫連線工廠的方法建立的。
JMS會話(Session)表示JMS客戶與JMS伺服器之間的會話狀態。JMS會話建立在JMS連線上,表示客戶與伺服器之間的一個會話執行緒。
JMS目的(Destination),又稱為訊息佇列,是實際的訊息源。
生產者(Message Producer)和消費者(Message Consumer)物件由Session物件建立,用於傳送和接收訊息。

2.4 JMS訊息通常有兩種型別:
① 點對點(Point-to-Point)。在點對點的訊息系統中,訊息分發給一個單獨的使用者。點對點訊息往往與佇列(javax.jms.Queue)相關聯。【針對該訊息,只有一個消費者將獲得訊息
② 釋出/訂閱(Publish/Subscribe)。釋出/訂閱訊息系統支援一個事件驅動模型,訊息生產者和消費者都參與訊息的傳遞。生產者釋出事件,而使用者訂閱感興趣的事件,並使用事件。該型別訊息一般與特定的主題(javax.jms.Topic)關聯。【針對該訊息:多個消費者可以獲得訊息

3、JMS和MQ的關係:
JMS是一個用於提供訊息服務的技術規範,它制定了在整個訊息服務提供過程中的所有資料結構和互動流程。而MQ則是訊息佇列服務,是面向訊息中介軟體(MOM)的最終實現,是真正的服務提供者;MQ的實現可以基於JMS,也可以基於其他規範或標準。

二. ActiveMQ 瞭解與操作
對於MQ本次使用的activeMQ.
1. ActiveMQ的安裝(windows)

從官網下載安裝包, http://activemq.apache.org/download.html
直接解壓,進入相應的bin目錄下,選擇相應的版本,點選active.bat即可
如果想把其安裝成服務,點選InstallService.bat即可安裝成服務啟動。
圖片如下:
這裡寫圖片描述

啟動後,activeMQ會佔用兩個埠,一個是負責接收發送訊息的tcp埠:61616,一個是基於web負責使用者介面化管理的埠:8161。這兩個埠可以在conf下面的xml中找到。
啟動完成後,管理介面訪問地址為:http://localhost:8161/admin/ 預設的使用者名稱 :admin 密碼為:admin 具體可以在conf資料夾下的users.properties中檢視
圖片如下:
這裡寫圖片描述

2、使用java 操作activemq

2.1 生產者:產生訊息物件,放入佇列中。具體程式碼如下:

/**
 * 生產者
 * @author onionflower
 *
 */
public class JMSProducer {


    private static final int SENDNUM = 10;

    public static void main(String[] args) {
        //連線工廠
        ConnectionFactory connectionFactory;
        //連線
        Connection connection  = null;
        //會話,接受或者傳送訊息的執行緒
        Session session;
        //訊息的目的地
        Destination destination;
        //訊息生產者
        MessageProducer messageProducer;
        //例項化工廠
        connectionFactory = new ActiveMQConnectionFactory(ConnectionConstants.BROKENURL);
        try {
            //獲取連線
            connection = connectionFactory.createConnection(ConnectionConstants.USERNAME, ConnectionConstants.PASSWORD);
            //啟動連線
            connection.start();
            //建立session
            //引數1:true:當記錄被消費後,中間站中的資料不會被刪除。false:當記錄被消費後,中間站中的資料會被刪除  【引數1為true,第二個引數無效】
            //引數2:Session.AUTO_ACKNOWLEDGE為自動確認,客戶端傳送和接收訊息不需要做額外的工作。異常也會確認訊息,應該是在執行之前確認的
            //Session.CLIENT_ACKNOWLEDGE為客戶端確認。客戶端接收到訊息後,必須呼叫javax.jms.Message的acknowledge方法。jms伺服器才會刪除訊息。可以在失敗的
            //時候不確認訊息,不確認的話不會移出佇列,一直存在,下次啟動繼續接受。接收訊息的連線不斷開,其他的消費者也不會接受(正常情況下佇列模式不存在其他消費者)
            //DUPS_OK_ACKNOWLEDGE允許副本的確認模式。一旦接收方應用程式的方法呼叫從處理訊息處返回,會話物件就會確認訊息的接收;而且允許重複確認。在需要考慮資源使用時,這種模式非常有效。
            session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
            //建立一個訊息佇列
            destination = session.createQueue("firstDemo");
            //建立訊息生成這
            messageProducer = session.createProducer(destination);
            //傳送訊息
            sendMessage(session, messageProducer);

            session.commit();
        } catch (JMSException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    public static void sendMessage(Session session,MessageProducer messageProducer) throws JMSException{
        for(int i=0;i<SENDNUM ;i++){
            TextMessage message = session.createTextMessage("ACTIVEMQ 生產者 生產訊息,這是第"+i+"次生產");
            //發出訊息
            messageProducer.send(message);
        }
    }
}

2.2 消費者:從佇列中獲取訊息物件具體程式碼如下:

/**
 * 訊息消費者
 * @author onionflower
 *
 */
public class JMSConSumer {

    public static void main(String[] args) {
        ConnectionFactory connectionFactory ;

        Connection connection;

        Session session;

        Destination destination;
        //訊息消費者
        MessageConsumer messageConsumer;

        connectionFactory = new ActiveMQConnectionFactory(ConnectionConstants.BROKENURL);

        try {
            connection = connectionFactory.createConnection(ConnectionConstants.USERNAME, ConnectionConstants.PASSWORD);

            connection.start();

            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

            destination = session.createQueue("firstDemo");

            messageConsumer = session.createConsumer(destination);

            while (true) {
                TextMessage textMessage = (TextMessage) messageConsumer.receive(10000);
                if(textMessage != null){
                    System.out.println("ACTIVEMQ 消費者收到的訊息:"+textMessage.getText());
                }else{
                    break;
                }
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }

    }
}

常量輔助類:

/**
 * 常量使用類
 * @author onionflower
 *
 */
public class ConnectionConstants {

    //預設使用者名稱
    public static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    //預設密碼
    public static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    //預設連線url地址
    public static final String BROKENURL = ActiveMQConnection.DEFAULT_BROKER_URL;
}

執行結果如下所示:
1、啟動生產著和消費者,啟動之後,會在管理介面可以看到訊息物件的生產和消費情況,如下:
這裡寫圖片描述

控制檯執行情況如下:
這裡寫圖片描述

2、執行程式之後,我們可以返現,生成者和消費者程式啟動之後, 程式一直處於監聽狀態。