1. 程式人生 > >ActiveMQ(二)——訊息處理機制

ActiveMQ(二)——訊息處理機制

一、前言 上文中,小編提到安裝ActiveMQ,但是對於ActiveMQ中訊息是用什麼樣的形式儲存的?下面小編就向大家介紹一下。

二、訊息型別 對於訊息的傳遞有兩種型別:

1.點對點的,即一個生產者和一個消費者一一對應; 在這裡插入圖片描述 2.釋出/訂閱模式,即一個生產者產生訊息並進行傳送後,可以由多個消費者進行接收。

在這裡插入圖片描述 JMS定義了五種不同的訊息正文格式,以及呼叫的訊息型別,允許你傳送並接收以一些不同形式的資料,提供現有訊息格式的一些級別的相容性。

StreamMessage – Java原始值的資料流 MapMessage–一套名稱-值對 TextMessage–一個字串物件 ObjectMessage–一個序列化的 Java物件 BytesMessage–一個位元組的資料流 三、點對點Queue

3.1 Producer 生產者:生產訊息,傳送端。 把jar包新增到工程中。使用5.11.2版本的jar包。

第一步:建立ConnectionFactory物件,需要指定服務端ip及埠號。 
第二步:使用ConnectionFactory物件建立一個Connection物件。 
第三步:開啟連線,呼叫Connection物件的start方法。 
第四步:使用Connection物件建立一個Session物件。 
第五步:使用Session物件建立一個Destination物件(topic、queue),此處建立一個Queue物件。 
第六步:使用Session物件建立一個Producer物件。 
第七步:建立一個Message物件,建立一個TextMessage物件。 
第八步:使用Producer物件傳送訊息。 
第九步:關閉資源。
@Test
    public void testQueueProducer() throws Exception {
        // 第一步:建立ConnectionFactory物件,需要指定服務端ip及埠號。
        //brokerURL伺服器的ip及埠號
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");
        // 第二步:使用ConnectionFactory物件建立一個Connection物件。
        Connection connection = connectionFactory.createConnection();
        // 第三步:開啟連線,呼叫Connection物件的start方法。
        connection.start();
        // 第四步:使用Connection物件建立一個Session物件。
        //第一個引數:是否開啟事務。true:開啟事務,第二個引數忽略。
        //第二個引數:當第一個引數為false時,才有意義。訊息的應答模式。1、自動應答2、手動應答。一般是自動應答。
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 第五步:使用Session物件建立一個Destination物件(topic、queue),此處建立一個Queue物件。
        //引數:佇列的名稱。
        Queue queue = session.createQueue("test-queue");
        // 第六步:使用Session物件建立一個Producer物件。
        MessageProducer producer = session.createProducer(queue);
        // 第七步:建立一個Message物件,建立一個TextMessage物件。
        /*TextMessage message = new ActiveMQTextMessage();
        message.setText("hello activeMq,this is my first test.");*/
        TextMessage textMessage = session.createTextMessage("hello activeMq,this is my first test.");
        // 第八步:使用Producer物件傳送訊息。
        producer.send(textMessage);
        // 第九步:關閉資源。
        producer.close();
        session.close();
        connection.close();
    }

3.2 12.1.2. Consumer 消費者:接收訊息。

第一步:建立一個ConnectionFactory物件。 
第二步:從ConnectionFactory物件中獲得一個Connection物件。 
第三步:開啟連線。呼叫Connection物件的start方法。 
第四步:使用Connection物件建立一個Session物件。 
第五步:使用Session物件建立一個Destination物件。和傳送端保持一致queue,並且佇列的名稱一致。 
第六步:使用Session物件建立一個Consumer物件。 
第七步:接收訊息。 
第八步:列印訊息。 
第九步:關閉資源
@Test
    public void testQueueConsumer() throws Exception {
        // 第一步:建立一個ConnectionFactory物件。
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");
        // 第二步:從ConnectionFactory物件中獲得一個Connection物件。
        Connection connection = connectionFactory.createConnection();
        // 第三步:開啟連線。呼叫Connection物件的start方法。
        connection.start();
        // 第四步:使用Connection物件建立一個Session物件。
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 第五步:使用Session物件建立一個Destination物件。和傳送端保持一致queue,並且佇列的名稱一致。
        Queue queue = session.createQueue("test-queue");
        // 第六步:使用Session物件建立一個Consumer物件。
        MessageConsumer consumer = session.createConsumer(queue);
        // 第七步:接收訊息。
        consumer.setMessageListener(new MessageListener() {

            @Override
            public void onMessage(Message message) {
                try {
                    TextMessage textMessage = (TextMessage) message;
                    String text = null;
                    //取訊息的內容
                    text = textMessage.getText();
                    // 第八步:列印訊息。
                    System.out.println(text);
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        //等待鍵盤輸入
        System.in.read();
        // 第九步:關閉資源
        consumer.close();
        session.close();
        connection.close();
    }

四、釋出訂閱Topic 4.1 Producer 使用步驟:

第一步:建立ConnectionFactory物件,需要指定服務端ip及埠號。 
第二步:使用ConnectionFactory物件建立一個Connection物件。 
第三步:開啟連線,呼叫Connection物件的start方法。 
第四步:使用Connection物件建立一個Session物件。 
第五步:使用Session物件建立一個Destination物件(topic、queue),此處建立一個Topic物件。 
第六步:使用Session物件建立一個Producer物件。 
第七步:建立一個Message物件,建立一個TextMessage物件。 
第八步:使用Producer物件傳送訊息。 
第九步:關閉資源。
@Test
    public void testTopicProducer() throws Exception {
        // 第一步:建立ConnectionFactory物件,需要指定服務端ip及埠號。
        // brokerURL伺服器的ip及埠號
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");
        // 第二步:使用ConnectionFactory物件建立一個Connection物件。
        Connection connection = connectionFactory.createConnection();
        // 第三步:開啟連線,呼叫Connection物件的start方法。
        connection.start();
        // 第四步:使用Connection物件建立一個Session物件。
        // 第一個引數:是否開啟事務。true:開啟事務,第二個引數忽略。
        // 第二個引數:當第一個引數為false時,才有意義。訊息的應答模式。1、自動應答2、手動應答。一般是自動應答。
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 第五步:使用Session物件建立一個Destination物件(topic、queue),此處建立一個topic物件。
        // 引數:話題的名稱。
        Topic topic = session.createTopic("test-topic");
        // 第六步:使用Session物件建立一個Producer物件。
        MessageProducer producer = session.createProducer(topic);
        // 第七步:建立一個Message物件,建立一個TextMessage物件。
        /*
         * TextMessage message = new ActiveMQTextMessage(); message.setText(
         * "hello activeMq,this is my first test.");
         */
        TextMessage textMessage = session.createTextMessage("hello activeMq,this is my topic test");
        // 第八步:使用Producer物件傳送訊息。
        producer.send(textMessage);
        // 第九步:關閉資源。
        producer.close();
        session.close();
        connection.close();
    }

4.2 Consumer 消費者:接收訊息。

第一步:建立一個ConnectionFactory物件。 
第二步:從ConnectionFactory物件中獲得一個Connection物件。 
第三步:開啟連線。呼叫Connection物件的start方法。 
第四步:使用Connection物件建立一個Session物件。 
第五步:使用Session物件建立一個Destination物件。和傳送端保持一致topic,並且話題的名稱一致。 
第六步:使用Session物件建立一個Consumer物件。 
第七步:接收訊息。 
第八步:列印訊息。 
第九步:關閉資源
@Test
    public void testTopicConsumer() throws Exception {
        // 第一步:建立一個ConnectionFactory物件。
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.168:61616");
        // 第二步:從ConnectionFactory物件中獲得一個Connection物件。
        Connection connection = connectionFactory.createConnection();
        // 第三步:開啟連線。呼叫Connection物件的start方法。
        connection.start();
        // 第四步:使用Connection物件建立一個Session物件。
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        // 第五步:使用Session物件建立一個Destination物件。和傳送端保持一致topic,並且話題的名稱一致。
        Topic topic = session.createTopic("test-topic");
        // 第六步:使用Session物件建立一個Consumer物件。
        MessageConsumer consumer = session.createConsumer(topic);
        // 第七步:接收訊息。
        consumer.setMessageListener(new MessageListener() {

            @Override
            public void onMessage(Message message) {
                try {
                    TextMessage textMessage = (TextMessage) message;
                    String text = null;
                    // 取訊息的內容
                    text = textMessage.getText();
                    // 第八步:列印訊息。
                    System.out.println(text);
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        System.out.println("topic的消費端03。。。。。");
        // 等待鍵盤輸入
        System.in.read();
        // 第九步:關閉資源
        consumer.close();
        session.close();
        connection.close();
    }

五、小結 通過分析和用不與spring結合的測試方法來更加深刻的瞭解mq在具體的使用過程中的步驟。

都是引入相關jar包後,建立連線工廠,工廠建立相關的連線物件,連線物件開啟,建立相關的會話,會話發起目的地並且建立生產者或者消費者,然後對訊息進行處理,最後關閉連線。

下一篇部落格小編會向大家介紹如何使用spring管理ActiveMQ。