1. 程式人生 > >使用Java編寫ActiveMQ的隊列模式和主題模式

使用Java編寫ActiveMQ的隊列模式和主題模式

ActiveMQ 消息中間件 分布式 主題模式 隊列模式

隊列模式的消息演示

本小節簡單演示一下如何使用JMS接口規範連接ActiveMQ,首先創建一個Maven工程,在pom.xml文件中,添加activemq的依賴:

<dependencies>
    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-all</artifactId>
        <version>5.9.0</version>
    </dependency>
</dependencies>

創建一個 AppProducer 類,用於演示下如何使用JMS接口規範使用ActiveMQ的隊列模式。代碼如下:

package org.zero01.jms.queue;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @program: jms-test
 * @description: 消息生產者
 * @author: 01
 * @create: 2018-05-26 16:44
 **/
public class AppProducer {

    // activemq服務器的url地址,默認通信端口為61616
    private static final String URL = "tcp://192.168.190.129:61616";
    // 隊列的名稱
    private static final String QUEUE_NAME = "queue-test";

    public static void main(String[] args) throws JMSException {
        // 1.創建連接工廠對象(ConnectionFactory)
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);

        // 2.創建連接對象(Connection)
        Connection connection = connectionFactory.createConnection();

        // 3.啟動連接
        connection.start();

        // 4.創建session會話,第一參數表示啟用事務處理,第二個參數表示啟動哪種應答模式,這裏啟用的是自動應答
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // 5.創建目的地(destination)
        Destination destination = session.createQueue(QUEUE_NAME);

        // 6.創建生產者
        MessageProducer producer = session.createProducer(destination);

        // 循環發送消息
        for (int i = 0; i < 100; i++) {
            // 7.創建消息,這裏創建的是簡單的文本消息體
            TextMessage textMessage = session.createTextMessage("test" + i);
            // 8.使用消息生產者往目的地發送消息
            producer.send(destination, textMessage);

            System.out.println("消息發送成功:" + textMessage.getText());
        }

        // 9.關閉連接
        connection.close();
    }
}

編寫完代碼後,登錄ActiveMQ的管理頁面,點擊選項卡上的 “Queues” 進入到如下界面,可以看到現在這裏什麽數據都沒有:
技術分享圖片

我們運行上面編寫的代碼之後,刷新該頁面,可以看到現在就有數據了:
技術分享圖片

接著我們來編寫一個消費者,去消費隊列中的消息。代碼如下:

package org.zero01.jms.queue;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @program: jms-test
 * @description: 消息消費者
 * @author: 01
 * @create: 2018-05-26 17:08
 **/
public class AppConsumer {

    // activemq服務器的url地址,默認通信端口為61616
    private static final String URL = "tcp://192.168.190.129:61616";
    // 隊列的名稱
    private static final String QUEUE_NAME = "queue-test";

    public static void main(String[] args) throws JMSException {
        // 1.創建連接工廠對象(ConnectionFactory)
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);

        // 2.創建連接對象(Connection)
        Connection connection = connectionFactory.createConnection();

        // 3.啟動連接
        connection.start();

        // 4.創建session會話,第一參數表示啟用事務處理,第二個參數表示啟動哪種應答模式,這裏啟用的是自動應答
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // 5.創建目的地(destination)
        Destination destination = session.createQueue(QUEUE_NAME);

        // 6.創建消費者
        MessageConsumer messageConsumer = session.createConsumer(destination);

        // 7.創建一個監聽器
        messageConsumer.setMessageListener((message) -> {
            TextMessage textMessage = (TextMessage) message;
            try {
                System.out.println("接收消息: " + textMessage.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        });
    }
}

編寫完代碼後,點擊選項卡上的 “Connections” 進入到如下界面,可以看到現在這裏一個連接都沒有:
技術分享圖片

我們運行上面編寫的代碼之後,刷新該頁面,可以看到現在就有一個消費者連接了:
技術分享圖片

消費者運行之後,是一個線程阻塞狀態的,也就是會與ActiveMQ服務器保持連接。現在我們再來啟動一個消費者,如下就有兩個消費者了:
技術分享圖片

啟動了兩個消費者後,運行生產者的代碼。我們來看隊列模式的一個現象,如下:
技術分享圖片
技術分享圖片

控制臺打印信息如上,有沒有發現,消費者1所消費的消息是偶數的,而消費者2消費的消息則是奇數的。這就是隊列模式的一個現象,消費者們會均勻地、盡可能平均地消費隊列中的消息。


主題模式的消息演示

主題模式的代碼和隊列模式的代碼十分類似,只有創建目的地的方法不一樣。消息發布者代碼如下:

package org.zero01.jms.topic;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @program: jms-test
 * @description: 消息發布者
 * @author: 01
 * @create: 2018-05-26 16:44
 **/
public class AppPublisher {

    // activemq服務器的url地址,默認通信端口為61616
    private static final String URL = "tcp://192.168.190.129:61616";
    // 隊列的名稱
    private static final String TOPIC_NAME = "topic-test";

    public static void main(String[] args) throws JMSException {
        // 1.創建連接工廠對象(ConnectionFactory)
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);

        // 2.創建連接對象(Connection)
        Connection connection = connectionFactory.createConnection();

        // 3.啟動連接
        connection.start();

        // 4.創建session會話,第一參數表示啟用事務處理,第二個參數表示啟動哪種應答模式,這裏啟用的是自動應答
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // 5.創建目的地(destination)
        Destination destination = session.createTopic(TOPIC_NAME);

        // 6.創建生產者
        MessageProducer producer = session.createProducer(destination);

        // 循環發送消息
        for (int i = 0; i < 100; i++) {
            // 7.創建消息,這裏創建的是簡單的文本消息體
            TextMessage textMessage = session.createTextMessage("test" + i);
            // 8.使用消息生產者往目的地發送消息
            producer.send(destination, textMessage);

            System.out.println("消息發送成功:" + textMessage.getText());
        }

        // 9.關閉連接
        connection.close();
    }
}

消息訂閱者代碼如下:

package org.zero01.jms.topic;

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

/**
 * @program: jms-test
 * @description: 消息訂閱者
 * @author: 01
 * @create: 2018-05-26 17:08
 **/
public class AppSubscriber {

    // activemq服務器的url地址,默認通信端口為61616
    private static final String URL = "tcp://192.168.190.129:61616";
    // 隊列的名稱
    private static final String TOPIC_NAME = "topic-test";

    public static void main(String[] args) throws JMSException {
        // 1.創建連接工廠對象(ConnectionFactory)
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);

        // 2.創建連接對象(Connection)
        Connection connection = connectionFactory.createConnection();

        // 3.啟動連接
        connection.start();

        // 4.創建session會話,第一參數表示啟用事務處理,第二個參數表示啟動哪種應答模式,這裏啟用的是自動應答
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // 5.創建目的地(destination)
        Destination destination = session.createTopic(TOPIC_NAME);

        // 6.創建消費者
        MessageConsumer messageConsumer = session.createConsumer(destination);

        // 7.創建一個監聽器
        messageConsumer.setMessageListener((message) -> {
            TextMessage textMessage = (TextMessage) message;
            try {
                System.out.println("接收消息: " + textMessage.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        });
    }
}

但主題模式與隊列模式的消費方式不太一樣,隊列模式是先啟動消息生產者去發送消息到隊列裏,然後消費者再去消費。而主題模式則是先啟動消息訂閱者去進行訂閱,然後再啟動消息發布者去發布消息,這樣消息訂閱者才能收到消息發布者所發布的消息。所以我們先啟動消息訂閱者,再啟動消息發布者。完成啟動後,這時到ActiveMQ的 “Topics” 頁面上查看信息如下:
技術分享圖片

除了以上這個區別外,我們來啟動兩個訂閱者,然後再啟動發布者,看看訂閱者們接收到的消息是怎麽樣的:
技術分享圖片
技術分享圖片

控制臺打印信息如上,可以看到兩個訂閱者都各自接收到了同一份消息。也就是說,如果有兩個訂閱者,那麽消息就會有兩份,有多個訂閱者則有多份。

使用Java編寫ActiveMQ的隊列模式和主題模式