使用Java編寫ActiveMQ的隊列模式和主題模式
本小節簡單演示一下如何使用JMS接口規範連接ActiveMQ,首先創建一個Maven工程,在pom.xml文件中,添加activemq的依賴:
<dependencies> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.9.0</version> </dependency> </dependencies>
創建一個 AppProducer 類,用於演示下如何使用JMS接口規範使用ActiveMQ的隊列模式。代碼如下:
package org.zero01.jms.queue; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * @program: jms-test * @description: 消息生產者 * @author: 01 * @create: 2018-05-26 16:44 **/ public class AppProducer { // activemq服務器的url地址,默認通信端口為61616 private static final String URL = "tcp://192.168.190.129:61616"; // 隊列的名稱 private static final String QUEUE_NAME = "queue-test"; public static void main(String[] args) throws JMSException { // 1.創建連接工廠對象(ConnectionFactory) ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL); // 2.創建連接對象(Connection) Connection connection = connectionFactory.createConnection(); // 3.啟動連接 connection.start(); // 4.創建session會話,第一參數表示啟用事務處理,第二個參數表示啟動哪種應答模式,這裏啟用的是自動應答 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 5.創建目的地(destination) Destination destination = session.createQueue(QUEUE_NAME); // 6.創建生產者 MessageProducer producer = session.createProducer(destination); // 循環發送消息 for (int i = 0; i < 100; i++) { // 7.創建消息,這裏創建的是簡單的文本消息體 TextMessage textMessage = session.createTextMessage("test" + i); // 8.使用消息生產者往目的地發送消息 producer.send(destination, textMessage); System.out.println("消息發送成功:" + textMessage.getText()); } // 9.關閉連接 connection.close(); } }
編寫完代碼後,登錄ActiveMQ的管理頁面,點擊選項卡上的 “Queues” 進入到如下界面,可以看到現在這裏什麽數據都沒有:
我們運行上面編寫的代碼之後,刷新該頁面,可以看到現在就有數據了:
接著我們來編寫一個消費者,去消費隊列中的消息。代碼如下:
package org.zero01.jms.queue; import org.apache.activemq.ActiveMQConnectionFactory; import javax.jms.*; /** * @program: jms-test * @description: 消息消費者 * @author: 01 * @create: 2018-05-26 17:08 **/ public class AppConsumer { // activemq服務器的url地址,默認通信端口為61616 private static final String URL = "tcp://192.168.190.129:61616"; // 隊列的名稱 private static final String QUEUE_NAME = "queue-test"; public static void main(String[] args) throws JMSException { // 1.創建連接工廠對象(ConnectionFactory) ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL); // 2.創建連接對象(Connection) Connection connection = connectionFactory.createConnection(); // 3.啟動連接 connection.start(); // 4.創建session會話,第一參數表示啟用事務處理,第二個參數表示啟動哪種應答模式,這裏啟用的是自動應答 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); // 5.創建目的地(destination) Destination destination = session.createQueue(QUEUE_NAME); // 6.創建消費者 MessageConsumer messageConsumer = session.createConsumer(destination); // 7.創建一個監聽器 messageConsumer.setMessageListener((message) -> { TextMessage textMessage = (TextMessage) message; try { System.out.println("接收消息: " + textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } }); } }
編寫完代碼後,點擊選項卡上的 “Connections” 進入到如下界面,可以看到現在這裏一個連接都沒有:
我們運行上面編寫的代碼之後,刷新該頁面,可以看到現在就有一個消費者連接了:
消費者運行之後,是一個線程阻塞狀態的,也就是會與ActiveMQ服務器保持連接。現在我們再來啟動一個消費者,如下就有兩個消費者了:
啟動了兩個消費者後,運行生產者的代碼。我們來看隊列模式的一個現象,如下:
控制臺打印信息如上,有沒有發現,消費者1所消費的消息是偶數的,而消費者2消費的消息則是奇數的。這就是隊列模式的一個現象,消費者們會均勻地、盡可能平均地消費隊列中的消息。
主題模式的消息演示
主題模式的代碼和隊列模式的代碼十分類似,只有創建目的地的方法不一樣。消息發布者代碼如下:
package org.zero01.jms.topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @program: jms-test
* @description: 消息發布者
* @author: 01
* @create: 2018-05-26 16:44
**/
public class AppPublisher {
// activemq服務器的url地址,默認通信端口為61616
private static final String URL = "tcp://192.168.190.129:61616";
// 隊列的名稱
private static final String TOPIC_NAME = "topic-test";
public static void main(String[] args) throws JMSException {
// 1.創建連接工廠對象(ConnectionFactory)
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
// 2.創建連接對象(Connection)
Connection connection = connectionFactory.createConnection();
// 3.啟動連接
connection.start();
// 4.創建session會話,第一參數表示啟用事務處理,第二個參數表示啟動哪種應答模式,這裏啟用的是自動應答
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5.創建目的地(destination)
Destination destination = session.createTopic(TOPIC_NAME);
// 6.創建生產者
MessageProducer producer = session.createProducer(destination);
// 循環發送消息
for (int i = 0; i < 100; i++) {
// 7.創建消息,這裏創建的是簡單的文本消息體
TextMessage textMessage = session.createTextMessage("test" + i);
// 8.使用消息生產者往目的地發送消息
producer.send(destination, textMessage);
System.out.println("消息發送成功:" + textMessage.getText());
}
// 9.關閉連接
connection.close();
}
}
消息訂閱者代碼如下:
package org.zero01.jms.topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @program: jms-test
* @description: 消息訂閱者
* @author: 01
* @create: 2018-05-26 17:08
**/
public class AppSubscriber {
// activemq服務器的url地址,默認通信端口為61616
private static final String URL = "tcp://192.168.190.129:61616";
// 隊列的名稱
private static final String TOPIC_NAME = "topic-test";
public static void main(String[] args) throws JMSException {
// 1.創建連接工廠對象(ConnectionFactory)
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
// 2.創建連接對象(Connection)
Connection connection = connectionFactory.createConnection();
// 3.啟動連接
connection.start();
// 4.創建session會話,第一參數表示啟用事務處理,第二個參數表示啟動哪種應答模式,這裏啟用的是自動應答
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 5.創建目的地(destination)
Destination destination = session.createTopic(TOPIC_NAME);
// 6.創建消費者
MessageConsumer messageConsumer = session.createConsumer(destination);
// 7.創建一個監聽器
messageConsumer.setMessageListener((message) -> {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("接收消息: " + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
});
}
}
但主題模式與隊列模式的消費方式不太一樣,隊列模式是先啟動消息生產者去發送消息到隊列裏,然後消費者再去消費。而主題模式則是先啟動消息訂閱者去進行訂閱,然後再啟動消息發布者去發布消息,這樣消息訂閱者才能收到消息發布者所發布的消息。所以我們先啟動消息訂閱者,再啟動消息發布者。完成啟動後,這時到ActiveMQ的 “Topics” 頁面上查看信息如下:
除了以上這個區別外,我們來啟動兩個訂閱者,然後再啟動發布者,看看訂閱者們接收到的消息是怎麽樣的:
控制臺打印信息如上,可以看到兩個訂閱者都各自接收到了同一份消息。也就是說,如果有兩個訂閱者,那麽消息就會有兩份,有多個訂閱者則有多份。
使用Java編寫ActiveMQ的隊列模式和主題模式