訊息中介軟體解決方案-JMS-ActiveMQ
一、JMS入門
1.1 JMS簡介
JMS是Java平臺上有關面向訊息中介軟體的技術規範,它本身只定義了一系列的介面規範,是一種與廠商無關的API,用來訪問訊息收發系統。訊息是JMS中的一種型別物件,由兩部分組成:報頭和訊息主體。報頭由路由資訊以及有關該訊息的元資料組成。訊息主體則攜帶者應用程式的資料或有效負載。
JMS定義了五中不同的訊息正文格式,以及呼叫的訊息型別,允許你傳送並接受一些不同形式的資料,提供現有訊息格式的一些級別的相容性。
- TextMessage:一個字串物件
- MapMessage:一套名稱值對
- ObjectMessage:一個序列化的Java物件
- ByteMessage:一個位元組的資料流
- StreamMessage:Java原始的資料流
JMS訊息傳遞型別:
一種是點對點的,就是一個生產者和一個消費者一一對應
另一種是釋出/訂閱模式,就是一個生產者產生訊息並進行傳送後,可以由多個消費者進行接收。
1.2 訊息中介軟體
訊息中介軟體利用高效可靠的訊息傳遞機制進行平臺無關的資料交流,並基於資料通訊來進行分散式系統的整合。通過提供訊息傳遞和訊息排隊模型,它可以在分散式環境下擴充套件程序間的通訊。對於訊息中介軟體,常見的角色就有Producer(生產者)、Consumer(消費者)。
常見的訊息中介軟體產品:ActiveMQ、RabbitMQ、ZeroMQ、Kafka。
本次講解使用ActiveMQ。
1.3 ActiveMQ下載與安裝
1.下載
官網:http://activemq.apache.org/
2.安裝(Linux)
- 將安裝包 apache-activemq-5.12.0-bin.tar.gz上傳至伺服器
- 解壓安裝包:tar zxvf apache-activemq-5.12.0-bin.tar.gz
- 為解壓後的目錄賦權:chmod 777 apache-activemq-5.12.0
- 進入apache-activemq-5.12.0\bin目錄啟動:./activemq start
如上圖啟動成功。
訪問自己伺服器中的的服務,我的是:http://192.168.25.128:8161
點選紅色箭頭位置登入,賬號密碼均為admin
因為我使用的是chrome瀏覽器,頁面回自動翻譯成為中文,正常我們看到的是英文的介面。
二、JMS入門Demo
2.1 點對點模式
點對點的模式主要建立在一個佇列上面,當連線一個佇列的時候,傳送端不需要知道接收端是否在接收,可以直接向ActiveMQ傳送訊息,傳送的訊息,會先進入佇列中,如果有接收端在監聽,則會發向接收端,如果沒有接收端接收,則會儲存在ActiveMQ伺服器,直到接收端接收訊息,點對點的訊息模式可以由多個傳送端,多個接收端,但是一條訊息只會被一個接收端給接收到,哪個接收端先連上ActiveMQ,則會先接收到,後面的接收端則接收不到訊息。
案例:向mq傳送一條訊息
1.搭建maven工程並匯入座標依賴
1 <dependency> 2 <groupId>org.apache.activemq</groupId> 3 <artifactId>activemq-client</artifactId> 4 <version>5.13.4</version> 5 </dependency>
2.建立訊息生產者:QueueProducer
1 public class QueueProducer { 2 public static void main(String[] args) { 3 // 1.建立連線工廠,我們前面輸入的客戶端的接收埠號是8161,現在使用的服務端埠號是61616 4 ActiveMQConnectionFactory connectionFactory = 5 new ActiveMQConnectionFactory("tcp://192.168.25.128:61616"); 6 // 預定義物件 7 Connection connection = null; 8 Session session = null; 9 MessageProducer producer = null; 10 try { 11 // 2.獲取連線 12 connection = connectionFactory.createConnection(); 13 // 3.啟動連線 14 connection.start(); 15 // 4.獲取session(引數1:是否啟動事務,引數2:訊息確認模式) 16 // 訊息確認模式:AUTO_ACKNOWLEDGE = 1 自動確認,CLIENT_ACKNOWLEDGE = 2 客戶端手動確認 17 // DUPS_OK_ACKNOWLEDGE = 3 自動批量確認,SESSION_TRANSACTED = 0 事務提交併確認 18 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 19 // 5.建立佇列物件 20 Queue queue = session.createQueue("test-queue"); 21 // 6.建立訊息生產者 22 producer = session.createProducer(queue); 23 // 7.建立訊息 24 TextMessage textMessage = session.createTextMessage("吃了麼?"); 25 // 8.傳送訊息 26 producer.send(textMessage); 27 } catch (JMSException e) { 28 e.printStackTrace(); 29 } finally { 30 // 9.關閉資源 31 try { 32 if (producer!=null){ 33 producer.close(); 34 } 35 if (session!=null){ 36 session.close(); 37 } 38 if (connection!=null){ 39 connection.close(); 40 } 41 } catch (JMSException e) { 42 e.printStackTrace(); 43 } 44 } 45 } 46 }
3.執行main方法 -- 查詢ActiveMQ介面如下
4.建立訊息消費者:QueueConsumer
1 public class QueueConsumer { 2 public static void main(String[] args) { 3 // 1.建立連線工廠,我們前面輸入的客戶端的接收埠號是8161,現在使用的服務端埠號是61616 4 ActiveMQConnectionFactory connectionFactory = 5 new ActiveMQConnectionFactory("tcp://192.168.25.128:61616"); 6 // 預定義物件 7 Connection connection = null; 8 Session session = null; 9 MessageConsumer consumer = null; 10 try { 11 // 2.獲取連線 12 connection = connectionFactory.createConnection(); 13 // 3.啟動連線 14 connection.start(); 15 // 4.獲取session(引數1:是否啟動事務,引數2:訊息確認模式) 16 // 訊息確認模式:AUTO_ACKNOWLEDGE = 1 自動確認,CLIENT_ACKNOWLEDGE = 2 客戶端手動確認 17 // DUPS_OK_ACKNOWLEDGE = 3 自動批量確認,SESSION_TRANSACTED = 0 事務提交併確認 18 session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); 19 // 5.建立佇列物件 20 Queue queue = session.createQueue("test-queue"); 21 // 6.建立訊息消費者 22 consumer = session.createConsumer(queue); 23 // 7.監聽訊息 24 consumer.setMessageListener(new MessageListener() { 25 @Override 26 public void onMessage(Message message) { 27 TextMessage textMessage = (TextMessage) message; 28 try { 29 System.out.println("接收到訊息:"+textMessage.getText()); 30 } catch (JMSException e) { 31 e.printStackTrace(); 32 } 33 } 34 }); 35 // 8.等待鍵盤輸入 -- 為了不讓程式終止,始終進行監聽 36 System.in.read(); 37 } catch (JMSException e) { 38 e.printStackTrace(); 39 } catch (IOException e) { 40 e.printStackTrace(); 41 } finally { 42 // 9.關閉資源 43 try { 44 if (consumer != null) { 45 consumer.close(); 46 } 47 if (session != null) { 48 session.close(); 49 } 50 if (connection != null) { 51 connection.close(); 52 } 53 } catch (JMSException e) { 54 e.printStackTrace(); 55 } 56 } 57 } 58 }
執行後控制檯輸出:
5.測試
同時開啟兩個以上的消費者,再次執行生產者,觀察每個消費者控制檯的輸出,會發現只有一個消費者會接收到訊息。
2.2 釋出訂閱模式
1.建立訊息生產者:TopicProducer
與點對點模式不一樣的部分程式碼如下:
1 // 5.建立佇列物件 2 Topic topic = session.createTopic("test-topic"); 3 // 6.建立訊息生產者 4 producer = session.createProducer(topic);
執行效果如下:
2.建立訊息消費者:TopicConsumer
與點對點模式不一樣部分程式碼如下:
1 // 5.建立主題物件 2 Topic topic = session.createTopic("test-topic"); 3 // 6.建立訊息消費者 4 consumer = session.createConsumer(topic);
執行訊息消費者的main方法,會發現控制檯無輸出內容,這是因為釋出訂閱模式的情況下要想接收到訊息需先啟動訊息消費者,再啟動訊息生產者。
3.測試
首先同時開啟兩個以上的消費者,再次執行生產者,發現每個消費者都會接收到訊息。
三、Spring整合JMS
3.1 點對點模式
1.訊息生產者
- 建立工程spring-jms-producer,pom.xml中引入SpringJms、activeMQ以及Spring核心配置、測試等相關依賴
1 <?xml version="1.0" encoding="UTF-8"?> 2 <project xmlns="http://maven.apache.org/POM/4.0.0" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 5 <modelVersion>4.0.0</modelVersion> 6 7 <groupId>com.buwei</groupId> 8 <artifactId>spring-jms-producer</artifactId> 9 <version>1.0-SNAPSHOT</version> 10 11 <!-- 集中定義依賴版本號 --> 12 <properties> 13 <junit.version>4.12</junit.version> 14 <spring.version>4.2.4.RELEASE</spring.version> 15 <mysql.version>5.1.32</mysql.version> 16 <activemq.version>5.11.2</activemq.version> 17 </properties> 18 19 <dependencies> 20 <!-- Spring --> 21 <dependency> 22 <groupId>org.springframework</groupId> 23 <artifactId>spring-context</artifactId> 24 <version>${spring.version}</version> 25 </dependency> 26 <dependency> 27 <groupId>org.springframework</groupId> 28 <artifactId>spring-beans</artifactId> 29 <version>${spring.version}</version> 30 </dependency> 31 <dependency> 32 <groupId>org.springframework</groupId> 33 <artifactId>spring-webmvc</artifactId> 34 <version>${spring.version}</version> 35 </dependency> 36 <dependency> 37 <groupId>org.springframework</groupId> 38 <artifactId>spring-jdbc</artifactId> 39 <version>${spring.version}</version> 40 </dependency> 41 <dependency> 42 <groupId>org.springframework</groupId> 43 <artifactId>spring-aspects</artifactId> 44 <version>${spring.version}</version> 45 </dependency> 46 <dependency> 47 <groupId>org.springframework</groupId> 48 <artifactId>spring-jms</artifactId> 49 <version>${spring.version}</version> 50 </dependency> 51 <dependency> 52 <groupId>org.springframework</groupId> 53 <artifactId>spring-context-support</artifactId> 54 <version>${spring.version}</version> 55 </dependency> 56 <dependency> 57 <groupId>org.springframework</groupId> 58 <artifactId>spring-test</artifactId> 59 <version>${spring.version}</version> 60 </dependency> 61 <dependency> 62 <groupId>org.apache.activemq</groupId> 63 <artifactId>activemq-client</artifactId> 64 <version>5.13.4</version> 65 </dependency> 66 <dependency> 67 <groupId>org.apache.activemq</groupId> 68 <artifactId>activemq-all</artifactId> 69 <version>${activemq.version}</version> 70 </dependency> 71 <dependency> 72 <groupId>junit</groupId> 73 <artifactId>junit</artifactId> 74 <version>${junit.version}</version> 75 <scope>test</scope> 76 </dependency> 77 </dependencies> 78 </project>pom.xml
- 1.建立Spring配置檔案:applicationContext-jms-producer-queue.xml
1 <?xml version="1.0" encoding="UTF-8"?> 2 <beans xmlns="http://www.springframework.org/schema/beans" 3 xmlns:context="http://www.springframework.org/schema/context" 4 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 5 xmlns:amq="http://activemq.apache.org/schema/core" 6 xmlns:jms="http://www.springframework.org/schema/jms" 7 xsi:schemaLocation="http://www.springframework.org/schema/beans 8 http://www.springframework.org/schema/beans/spring-beans.xsd 9 http://www.springframework.org/schema/context 10 http://www.springframework.org/schema/context/spring-context.xsd"> 11 12 <!--包掃描--> 13 <context:component-scan base-package="com.buwei"></context:component-scan> 14 15 <!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供--> 16 <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> 17 <property name="brokerURL" value="tcp://192.168.25.128:61616"/> 18 </bean> 19 <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory --> 20 <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> 21 <!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory --> 22 <property name="targetConnectionFactory" ref="targetConnectionFactory"/> 23 </bean> 24 <!-- Spring提供的JMS工具類,它可以進行訊息傳送、接收等 --> 25 <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> 26 <!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory物件 --> 27 <property name="connectionFactory" ref="connectionFactory"/> 28 </bean> 29 <!--這個是佇列目的地,點對點的 文字資訊--> 30 <bean id="queueTextDestination" class="org.apache.activemq.command.ActiveMQQueue"> 31 <constructor-arg value="jms-queue"/> 32 </bean> 33 </beans>
- 建立訊息生產者類:QueueProducer
1 @Component 2 public class QueueProducer { 3 @Autowired 4 private JmsTemplate jmsTemplate; 5 6 @Autowired // 注意Destination是javax.jms.Destination; 7 private Destination queueTextDestination; 8 9 /** 10 * 點對點方式傳送文字資訊 11 * @param message 12 */ 13 public void sendTestMessage(final String message){ 14 jmsTemplate.send(queueTextDestination, new MessageCreator() { 15 @Override 16 public Message createMessage(Session session) throws JMSException { 17 return session.createTextMessage(message); 18 } 19 }); 20 } 21 }
- 單元測試類,TestQueueProducer
1 @RunWith(SpringJUnit4ClassRunner.class) 2 @ContextConfiguration(locations = "classpath:applicationContext-jms-producer-queue.xml") 3 public class TestQueueProducer { 4 5 @Autowired 6 private QueueProducer queueProducer; 7 8 @Test 9 public void testQueueSend(){ 10 queueProducer.sendTestMessage("SpringJms-點對點模式,吃了嘛?"); 11 } 12 }
2.訊息消費者
- 建立工程spring-jms-consumer,pom檔案中依賴同上一個工程
- 建立訊息監聽者 - - MyMessageListenerQueue
1 public class MyMessageListenerQueue implements MessageListener { 2 @Override 3 public void onMessage(Message message) { 4 TextMessage textMessage = (TextMessage)message; 5 try { 6 System.out.println("接收到新訊息"+textMessage.getText()); 7 } catch (JMSException e) { 8 e.printStackTrace(); 9 } 10 } 11 }
- 建立配置檔案:applicationContext-jms-consumer-queue.xml
1 <?xml version="1.0" encoding="UTF-8"?> 2 <beans xmlns="http://www.springframework.org/schema/beans" 3 xmlns:context="http://www.springframework.org/schema/context" 4 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 5 xmlns:amq="http://activemq.apache.org/schema/core" 6 xmlns:jms="http://www.springframework.org/schema/jms" 7 xsi:schemaLocation="http://www.springframework.org/schema/beans 8 http://www.springframework.org/schema/beans/spring-beans.xsd 9 http://www.springframework.org/schema/context 10 http://www.springframework.org/schema/context/spring-context.xsd"> 11 12 <!--包掃描--> 13 <context:component-scan base-package="com.buwei"></context:component-scan> 14 15 <!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供--> 16 <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> 17 <property name="brokerURL" value="tcp://192.168.25.128:61616"/> 18 </bean> 19 <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory --> 20 <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> 21 <!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory --> 22 <property name="targetConnectionFactory" ref="targetConnectionFactory"/> 23 </bean> 24 <!-- Spring提供的JMS工具類,它可以進行訊息傳送、接收等 --> 25 <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> 26 <!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory物件 --> 27 <property name="connectionFactory" ref="connectionFactory"/> 28 </bean> 29 30 <!--這個是訊息目的地,點對點的 文字資訊--> 31 <bean id="queueTextDestination" class="org.apache.activemq.command.ActiveMQQueue"> 32 <constructor-arg value="jms-queue"/> 33 </bean> 34 <!--我的監聽類--> 35 <bean id="myMessageListenerQueue" class="com.buwei.MyMessageListenerQueue"></bean> 36 <!--訊息監聽容器--> 37 <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer"> 38 <property name="connectionFactory" ref="connectionFactory" /> 39 <property name="destination" ref="queueTextDestination" /> 40 <property name="messageListener" ref="myMessageListenerQueue" /> 41 </bean> 42 </beans>
- 單元測試類,TestQueueConsumer
1 @RunWith(SpringJUnit4ClassRunner.class) 2 @ContextConfiguration(locations = "classpath:applicationContext-jms-consumer-queue.xml") 3 public class TestQueueConsumer { 4 5 @Test 6 public void testQueueReceive(){ 7 try { 8 System.in.read(); 9 } catch (IOException e) { 10 e.printStackTrace(); 11 } 12 } 13 }
測試:分別執行TestQueueProducer和TestQueueConsumer中的測試方法(先後順序沒有關係),控制檯列印接收到的訊息
3.2 釋出訂閱模式
1.訊息生產者
- 在工程spring-jms-producer中建立配置檔案:applicationContext-jms-producer-topic.xml
1 <?xml version="1.0" encoding="UTF-8"?> 2 <beans xmlns="http://www.springframework.org/schema/beans" 3 xmlns:context="http://www.springframework.org/schema/context" 4 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 5 xmlns:amq="http://activemq.apache.org/schema/core" 6 xmlns:jms="http://www.springframework.org/schema/jms" 7 xsi:schemaLocation="http://www.springframework.org/schema/beans 8 http://www.springframework.org/schema/beans/spring-beans.xsd 9 http://www.springframework.org/schema/context 10 http://www.springframework.org/schema/context/spring-context.xsd"> 11 12 <!--包掃描--> 13 <context:component-scan base-package="com.buwei"></context:component-scan> 14 15 <!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供--> 16 <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> 17 <property name="brokerURL" value="tcp://192.168.25.128:61616"/> 18 </bean> 19 <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory --> 20 <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> 21 <!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory --> 22 <property name="targetConnectionFactory" ref="targetConnectionFactory"/> 23 </bean> 24 <!-- Spring提供的JMS工具類,它可以進行訊息傳送、接收等 --> 25 <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> 26 <!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory物件 --> 27 <property name="connectionFactory" ref="connectionFactory"/> 28 </bean> 29 <!--這個是訂閱模式 文字資訊--> 30 <bean id="topicTextDestination" class="org.apache.activemq.command.ActiveMQTopic"> 31 <constructor-arg value="jms-topic"/> 32 </bean> 33 </beans>
- 建立訊息生產者類:TopicProducer
1 @Component 2 public class TopicProducer { 3 @Autowired 4 private JmsTemplate jmsTemplate; 5 6 @Autowired // 注意Destination是javax.jms.Destination; 7 private Destination topicTextDestination; 8 9 /** 10 * 釋出訂閱方式傳送 11 * @param message 12 */ 13 public void sendTestMessage(final String message){ 14 jmsTemplate.send(topicTextDestination, new MessageCreator() { 15 @Override 16 public Message createMessage(Session session) throws JMSException { 17 return session.createTextMessage(message); 18 } 19 }); 20 } 21 }
- 單元測試類,TestTopicProducer
1 @RunWith(SpringJUnit4ClassRunner.class) 2 @ContextConfiguration(locations= "classpath:applicationContext-jms-producer-topic.xml ") 3 public class TestTopicProducer { 4 @Autowired 5 private TopicProducer topicProducer; 6 7 @Test 8 public void testTopicSend(){ 9 topicProducer.sendTestMessage("SpringJms-釋出訂閱模式,吃好了"); 10 } 11 }
2.訊息消費者
- 在工程spring-jms-consumer中建立訊息監聽者 - - MyMessageListenerTopic
1 public class MyMessageListenerTopic implements MessageListener { 2 @Override 3 public void onMessage(Message message) { 4 TextMessage textMessage = (TextMessage)message; 5 try { 6 System.out.println("接收到新訊息"+textMessage.getText()); 7 } catch (JMSException e) { 8 e.printStackTrace(); 9 } 10 } 11 }
- 在工程spring-jms-consumer中建立配置檔案:applicationContext-jms-consumer-topic.xml
1 <?xml version="1.0" encoding="UTF-8"?> 2 <beans xmlns="http://www.springframework.org/schema/beans" 3 xmlns:context="http://www.springframework.org/schema/context" 4 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 5 xmlns:amq="http://activemq.apache.org/schema/core" 6 xmlns:jms="http://www.springframework.org/schema/jms" 7 xsi:schemaLocation="http://www.springframework.org/schema/beans 8 http://www.springframework.org/schema/beans/spring-beans.xsd 9 http://www.springframework.org/schema/context 10 http://www.springframework.org/schema/context/spring-context.xsd"> 11 12 <!--包掃描--> 13 <context:component-scan base-package="com.buwei"></context:component-scan> 14 15 <!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供--> 16 <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> 17 <property name="brokerURL" value="tcp://192.168.25.128:61616"/> 18 </bean> 19 <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory --> 20 <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> 21 <!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory --> 22 <property name="targetConnectionFactory" ref="targetConnectionFactory"/> 23 </bean> 24 <!-- Spring提供的JMS工具類,它可以進行訊息傳送、接收等 --> 25 <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> 26 <!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory物件 --> 27 <property name="connectionFactory" ref="connectionFactory"/> 28 </bean> 29 <!--這個訊息目的地,釋出訂閱的,文字資訊--> 30 <bean id="topicTextDestination" class="org.apache.activemq.command.ActiveMQTopic"> 31 <constructor-arg value="jms-topic"/> 32 </bean> 33 <!--我的監聽類--> 34 <bean id="myMessageListenerTopic" class="com.buwei.MyMessageListenerTopic"></bean> 35 <!--訊息監聽容器--> 36 <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer"> 37 <property name="connectionFactory" ref="connectionFactory" /> 38 <property name="destination" ref="topicTextDestination" /> 39 <property name="messageListener" ref="myMessageListenerTopic" /> 40 </bean> 41 </beans>
- 單元測試類,TestTopicConsumer
1 @RunWith(SpringJUnit4ClassRunner.class) 2 @ContextConfiguration(locations = "classpath:applicationContext-jms-consumer-topic.xml") 3 public class TestTopicConsumer { 4 5 @Test 6 public void testTopicReceive(){ 7 try { 8 System.in.read(); 9 } catch (IOException e) { 10 e.printStackTrace(); 11 } 12 } 13 }
測試:分別執行TestTopicProducer和TestTopicConsumer中的測試方法(消費者中的測試方法要先執行),控制檯列印接收到的訊息
同時執行多個訊息消費者工程,再執行訊息生產者工程,檢視控制檯都可以打印出訊息。