1. 程式人生 > >ActiveMQ中訊息的重發與持久化儲存

ActiveMQ中訊息的重發與持久化儲存

訊息中介軟體解決方案續

  上一篇中我們講到了在Spring工程中基本的使用訊息中介軟體,這裡就不在繼續贅述。

  針對訊息中介軟體,這篇講解兩個我們常遇到的問題。

  問題1:如果我們的訊息的接收過程中發生異常,怎麼解決?

  問題2:釋出訂閱模式(Topic)下如果消費端宕機引起的訊息的丟失,怎麼解決?

  問題解決方案:

  問題1暫時有兩種解決方案:第一種是開啟訊息確認機制,第二種開啟事務。下面會在點對點模式下進行演示。

  問題2的解決方案:實現釋出訂閱訊息的持久化儲存。

  根據上面的解決方案搭建工程如下:測試訊息的重發使用的是點對點模式(queue)

問題一解決方案如下:

  方案一:訊息確認機制

  1.訊息的生產類QueueProducer 

 1 @Component
 2 public class QueueProducer {
 3 
 4     @Autowired
 5     private JmsTemplate jmsTemplate;
 6 
 7     @Autowired // 注意Destination是javax.jms.Destination;
 8     private Destination queueTextDestination;
 9 
10     /**
11      * 點對點方式傳送文字資訊
12 * @param message 13 */ 14 public void sendTestMessage(final String message){ 15 jmsTemplate.send(queueTextDestination, new MessageCreator() { 16 @Override 17 public Message createMessage(Session session) throws JMSException { 18 return session.createTextMessage(message);
19 } 20 }); 21 } 22 }

  2.訊息的監聽類MyMessageListenerQueueAcknowledge 

 1 public class MyMessageListenerQueueAcknowledge implements SessionAwareMessageListener {
 2 
 3     @Override
 4     public void onMessage(Message message, Session session) throws JMSException {
 5         // 為了在點對點模式情況下記錄訊息傳送的次數
 6         System.out.println(System.currentTimeMillis()+"請接收開啟了訊息確認機制的訊息");
 7 
 8         try { // 模擬發生異常
 9             if(1==1){
10                 throw new RuntimeException("出異常了");
11             }
12             TextMessage textMessage = (TextMessage)message;
13             System.out.println(textMessage);
14             System.out.println("queue模式接收到新訊息"+textMessage.getText());
15 
16             message.acknowledge();// 接收完成,通知activeMq我們正常消費完
17         } catch (JMSException e) {
18             session.recover();// 發生異常,通知activeMQ需要恢復訊息傳送重新消費
19             e.printStackTrace();
20         }
21     }
22 }

  3.訊息生產者的配置檔案producer-queue.xml

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <beans xmlns="http://www.springframework.org/schema/beans"
 3        xmlns:context="http://www.springframework.org/schema/context"
 4        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 5        xmlns:amq="http://activemq.apache.org/schema/core"
 6        xmlns:jms="http://www.springframework.org/schema/jms"
 7        xsi:schemaLocation="http://www.springframework.org/schema/beans
 8         http://www.springframework.org/schema/beans/spring-beans.xsd
 9         http://www.springframework.org/schema/context
10         http://www.springframework.org/schema/context/spring-context.xsd">
11 
12     <!--包掃描-->
13     <context:component-scan base-package="com.buwei"></context:component-scan>
14 
15     <!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供-->
16     <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
17         <property name="brokerURL" value="tcp://192.168.25.128:61616"/>
18     </bean>
19 
20     <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory -->
21     <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
22         <!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory -->
23         <property name="targetConnectionFactory" ref="targetConnectionFactory"/>
24     </bean>
25 
26     <!-- Spring提供的JMS工具類,它可以進行訊息傳送、接收等 -->
27     <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
28         <!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory物件 -->
29         <property name="connectionFactory" ref="connectionFactory"/>
30     </bean>
31 
32     <!--這個是佇列目的地,點對點的  文字資訊-->
33     <bean id="queueTextDestination" class="org.apache.activemq.command.ActiveMQQueue">
34         <constructor-arg value="jms-queue"/>
35     </bean>
36 
37 </beans>

  4.訊息消費者的配置檔案consumer-queue-acknowledge.xml

    這是需要主要第53行配置的訊息的確認模式為CLIENT_ACKNOWLEDGE,有兩種配置在註釋用有指出。

    其中第19行以及22-25註釋掉的配置是對於訊息重發的一些規則的配置,為了不影響測試效果,這裡先註釋掉。

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <beans xmlns="http://www.springframework.org/schema/beans"
 3        xmlns:context="http://www.springframework.org/schema/context"
 4        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 5        xmlns:amq="http://activemq.apache.org/schema/core"
 6        xmlns:jms="http://www.springframework.org/schema/jms"
 7        xsi:schemaLocation="http://www.springframework.org/schema/beans
 8         http://www.springframework.org/schema/beans/spring-beans.xsd
 9         http://www.springframework.org/schema/context
10         http://www.springframework.org/schema/context/spring-context.xsd">
11 
12     <!--包掃描-->
13     <context:component-scan base-package="com.buwei"></context:component-scan>
14 
15     <!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供-->
16     <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
17         <property name="brokerURL" value="tcp://192.168.25.128:61616"/>
18         <!--配置訊息重發的是一些設定-->
19         <!--<property name="redeliveryPolicy" ref="activeMQRedeliveryPolicy"/>-->
20     </bean>
21 
22     <!--實現訊息重發的bean-->
23     <!--<bean id="activeMQRedeliveryPolicy" class = "org.apache.activemq.RedeliveryPolicy">
24         <property name="maximumRedeliveries" value="2"></property>
25     </bean>-->
26 
27     <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory -->
28     <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
29         <!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory -->
30         <property name="targetConnectionFactory" ref="targetConnectionFactory"/>
31     </bean>
32 
33     <!-- Spring提供的JMS工具類,它可以進行訊息傳送、接收等 -->
34     <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
35         <!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory物件 -->
36         <property name="connectionFactory" ref="connectionFactory"/>
37     </bean>
38 
39     <!--這個是訊息目的地,點對點的  文字資訊-->
40     <bean id="queueTextDestination" class="org.apache.activemq.command.ActiveMQQueue">
41         <constructor-arg value="jms-queue"/>
42     </bean>
43 
44     <!--我的監聽類-->
45     <bean id="myMessageListenerQueue" class="com.buwei.MyMessageListenerQueueAcknowledge"></bean>
46 
47     <!--訊息監聽容器-->
48     <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer" >
49         <property name="connectionFactory" ref="connectionFactory" />
50         <property name="destination" ref="queueTextDestination" />
51         <property name="messageListener" ref="myMessageListenerQueue" />
52         <!--設定訊息的確認模式,數字2對應為ClIENT_ACKNOWLEDGE模式,也可以設定屬性sessionAcknowledgeModeName的名稱來實現-->
53         <property name="sessionAcknowledgeMode" value="2"/>
54   </bean>
55 </beans>

  5.配置訊息生產者的測試類QueueProducerTest

 1 @RunWith(SpringJUnit4ClassRunner.class)
 2 @ContextConfiguration(locations = "classpath:producer-queue.xml")
 3 public class QueueProducerTest {
 4 
 5     @Autowired
 6     private QueueProducer queueProducer;
 7 
 8     @Test
 9     public void queueSendTest(){
10         queueProducer.sendTestMessage("SpringJms-queue模式,吃了嘛?");
11     }
12 }

  6.配置訊息消費者測試類

 1 @RunWith(SpringJUnit4ClassRunner.class)
 2 @ContextConfiguration(locations = "classpath:consumer-queue-acknowledge.xml")
 3 public class QueueConsumerAcknowledgeTest {
 4     @Test
 5     public void queueAcknowledgeReceiveTest(){
 6         try {
 7             // 這裡是為了使訊息監聽持續進行
 8             System.in.read();
 9         } catch (IOException e) {
10             e.printStackTrace();
11         }
12     }
13 }

  7.執行測試

  首先開啟訊息消費者的測試類中的測試方法,然後開啟訊息生產者的測試類中的測試方法

  控制檯列印如下:

 

  也就是在訊息接收發生異常的情況下,訊息的確認機制讓訊息生產者再次重發了6次訊息,這個也是訊息中介軟體預設的重發次數,我們可以通過我在consumer配置檔案中的註釋掉的activeMQRedeliveryPolicy這個bean來設定重發的次數。

  方案二:開啟事務管理

  訊息的生產者QueueProducer類、配置檔案producer-queue.xml、生產者測試類QueueProducerTest繼續沿用上面方案一中的,其他的配置如下:

  1.訊息的監聽類MyMessageListenerQueueTransaction

 1 public class MyMessageListenerQueueTransaction implements SessionAwareMessageListener {
 2 
 3     @Override
 4     public void onMessage(Message message, Session session) throws JMSException {
 5 
 6         // 為了在點對點模式情況下記錄訊息傳送的次數
 7         System.out.println(System.currentTimeMillis()+"請接收開啟了事務管理機制的訊息");
 8 
 9         try {
10             if (1 == 1) {
11                 throw new RuntimeException("出異常了");
12             }
13             TextMessage textMessage = (TextMessage) message;
14             System.out.println(textMessage);
15             System.out.println("queue模式接收到新訊息" + textMessage.getText());
16 
17             session.commit();// 接收成功,提交事務
18         } catch (JMSException e) {
19             session.rollback();// 發生異常,訊息回滾重新發送
20             e.printStackTrace();
21         }
22     }
23 }

  2.配置了事務管理的消費者配置檔案

    — —這裡主要的不同的是第54行以及58-60行,配置了事務管理相關的內容

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <beans xmlns="http://www.springframework.org/schema/beans"
 3        xmlns:context="http://www.springframework.org/schema/context"
 4        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 5        xmlns:amq="http://activemq.apache.org/schema/core"
 6        xmlns:jms="http://www.springframework.org/schema/jms"
 7        xsi:schemaLocation="http://www.springframework.org/schema/beans
 8         http://www.springframework.org/schema/beans/spring-beans.xsd
 9         http://www.springframework.org/schema/context
10         http://www.springframework.org/schema/context/spring-context.xsd">
11 
12     <!--包掃描-->
13     <context:component-scan base-package="com.buwei"></context:component-scan>
14 
15     <!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供-->
16     <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
17         <property name="brokerURL" value="tcp://192.168.25.128:61616"/>
18         <!--配置訊息重發的是一些設定-->
19         <!--<property name="redeliveryPolicy" ref="activeMQRedeliveryPolicy"/>-->
20     </bean>
21 
22     <!--對訊息重發進行屬性設定的bean-->
23     <!--<bean id="activeMQRedeliveryPolicy" class = "org.apache.activemq.RedeliveryPolicy">
24         <property name="maximumRedeliveries" value="2"></property>
25     </bean>-->
26 
27     <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory -->
28     <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
29         <!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory -->
30         <property name="targetConnectionFactory" ref="targetConnectionFactory"/>
31     </bean>
32 
33     <!-- Spring提供的JMS工具類,它可以進行訊息傳送、接收等 -->
34     <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
35         <!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory物件 -->
36         <property name="connectionFactory" ref="connectionFactory"/>
37     </bean>
38 
39     <!--這個是訊息目的地,點對點的  文字資訊-->
40     <bean id="queueTextDestination" class="org.apache.activemq.command.ActiveMQQueue">
41         <constructor-arg value="jms-queue"/>
42     </bean>
43 
44     <!--我的監聽類-->
45     <bean id="myMessageListenerQueue" class="com.buwei.MyMessageListenerQueueTransaction"></bean>
46 
47     <!--訊息監聽容器-->
48     <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer" >
49         <property name="connectionFactory" ref="connectionFactory" />
50         <property name="destination" ref="queueTextDestination" />
51         <property name="messageListener" ref="myMessageListenerQueue" />
52 
53         <!--設定開啟事務管理-->
54         <property name="transactionManager" ref="transactionManager"/>
55     </bean>
56 
57     <!--配置事務管理的bean-->
58     <bean id="transactionManager" class = "org.springframework.jms.connection.JmsTransactionManager">
59         <property name="connectionFactory" ref="connectionFactory"/>
60     </bean>
61 </beans>

   3.配置了事務管理的訊息消費者的測試類

 1 @RunWith(SpringJUnit4ClassRunner.class)
 2 @ContextConfiguration(locations = "classpath:consumer-queue-transaction.xml")
 3 public class QueueConsumerTransactionTest {
 4 
 5     @Test
 6     public void queueTransactionReceiveTest(){
 7         try {   // 這裡是為了使訊息監聽持續進行
 8             System.in.read();
 9         } catch (IOException e) {
10             e.printStackTrace();
11         }
12     }
13 }

  4.執行測試

  同樣的先開啟消費端的測試類中的方法,再開啟生產者的測試類中的測試方法,控制檯列印如下:

  一樣的,在預設情況下,如果發生異常,訊息會回滾6次。

  總結:訊息的重發可以通過設定訊息確認機制或者事務管理的方式來實現,系統預設的可重發次數是6次,加上原來的1次總共是傳送7次。可以通過配置RedeliveryPolicy類來修改預設值。

問題二解決方案如下:

  在消費端開啟持久的訊息訂閱服務,主要的也是在消費者的配置檔案中進行配置

  1.訊息的生產者TopicProducer類

 1 @Component
 2 public class TopicProducer {
 3     @Autowired
 4     private JmsTemplate jmsTemplate;
 5 
 6     @Autowired // 注意Destination是javax.jms.Destination;
 7     private Destination topicTextDestination;
 8 
 9     /**
10      * 釋出訂閱方式傳送
11      * @param message
12      */
13     public void sendTestMessage(final String message){
14         jmsTemplate.send(topicTextDestination, new MessageCreator() {
15             @Override
16             public Message createMessage(Session session) throws JMSException {
17                 return session.createTextMessage(message);
18             }
19         });
20     }
21 }

  2.訊息的監聽類MyMessageListenerTopic

 1 public class MyMessageListenerTopic implements MessageListener {
 2 
 3     @Override
 4     public void onMessage(Message message) {
 5         TextMessage textMessage = (TextMessage)message;
 6         try {
 7             System.out.println("topic模式接收到新訊息"+textMessage.getText());
 8         } catch (JMSException e) {
 9             e.printStackTrace();
10         }
11     }
12 }

  3.訊息的生產者的配置檔案producer-topic.xml

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <beans xmlns="http://www.springframework.org/schema/beans"
 3        xmlns:context="http://www.springframework.org/schema/context"
 4        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 5        xmlns:amq="http://activemq.apache.org/schema/core"
 6        xmlns:jms="http://www.springframework.org/schema/jms"
 7        xsi:schemaLocation="http://www.springframework.org/schema/beans
 8         http://www.springframework.org/schema/beans/spring-beans.xsd
 9         http://www.springframework.org/schema/context
10         http://www.springframework.org/schema/context/spring-context.xsd">
11 
12     <!--包掃描-->
13     <context:component-scan base-package="com.buwei"></context:component-scan>
14 
15     <!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供-->
16     <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
17         <property name="brokerURL" value="tcp://192.168.25.128:61616"/>
18     </bean>
19 
20     <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory -->
21     <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
22         <!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory -->
23         <property name="targetConnectionFactory" ref="targetConnectionFactory"/>
24     </bean>
25 
26     <!-- Spring提供的JMS工具類,它可以進行訊息傳送、接收等 -->
27     <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
28         <!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory物件 -->
29         <property name="connectionFactory" ref="connectionFactory"/>
30     </bean>
31 
32     <!--這個是釋出、訂閱模式 文字資訊-->
33     <bean id="topicTextDestination" class="org.apache.activemq.command.ActiveMQTopic">
34         <constructor-arg value="jms-topic"/>
35     </bean>
36 
37 </beans>

  4.訊息的消費者的配置檔案consumer-topic.xml

  主要在第25行以及第49行的配置,宣告訊息的訂閱者的id,以及將訂閱者id加入到到訊息的監聽容器中的持久化主體訂閱者中

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <beans xmlns="http://www.springframework.org/schema/beans"
 3        xmlns:context="http://www.springframework.org/schema/context"
 4        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 5        xmlns:amq="http://activemq.apache.org/schema/core"
 6        xmlns:jms="http://www.springframework.org/schema/jms"
 7        xsi:schemaLocation="http://www.springframework.org/schema/beans
 8         http://www.springframework.org/schema/beans/spring-beans.xsd
 9         http://www.springframework.org/schema/context
10         http://www.springframework.org/schema/context/spring-context.xsd">
11 
12     <!--包掃描-->
13     <context:component-scan base-package="com.buwei"></context:component-scan>
14 
15     <!-- 真正可以產生Connection的ConnectionFactory,由對應的 JMS服務廠商提供-->
16     <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
17         <property name="brokerURL" value="tcp://192.168.25.128:61616"/>
18     </bean>
19 
20     <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory -->
21     <bean  id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
22         <property name="targetConnectionFactory" ref="targetConnectionFactory"/>
23         <property name="sessionCacheSize" value="100"/>
24         <!--宣告訊息的訂閱者的id-->
25         <property name="clientId" value="buwei"/>
26     </bean>
27 
28     <!-- Spring提供的JMS工具類,它可以進行訊息傳送、接收等 -->
29     <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
30         <!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory物件 -->
31         <property name="connectionFactory" ref="connectionFactory"/>
32     </bean>
33 
34     <!--這個訊息目的地,釋出訂閱的,文字資訊-->
35     <bean id="topicTextDestination"  class="org.apache.activemq.command.ActiveMQTopic">
36         <constructor-arg value="jms-topic"/>
37     </bean>
38 
39     <!--我的監聽類-->
40     <bean id="myMessageListenerTopic" class="com.buwei.MyMessageListenerTopic"></bean>
41 
42     <!--訊息監聽容器-->
43     <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
44         <property name="connectionFactory" ref="connectionFactory" />
45         <property name="destination" ref="topicTextDestination" />
46         <property name="messageListener" ref="myMessageListenerTopic" />
47 
48         <!--指明持久化訊息的訂閱者的名稱,對應connectionFactory中的clientId-->
49         <property name="durableSubscriptionName" value="buwei"></property>
50     </bean>
51 </beans>

  5.生產者的測試類TopicProducerTest

 1 @RunWith(SpringJUnit4ClassRunner.class)
 2 @ContextConfiguration(locations= "classpath:producer-topic.xml ")
 3 public class TopicProducerTest {
 4     @Autowired
 5     private TopicProducer topicProducer;
 6 
 7     @Test
 8     public void  topicSendTest(){
 9         topicProducer.sendTestMessage("SpringJms-topic模式,吃好了");
10     }
11 }

  6.消費者的測試類TopicConsumerTest

 1 @RunWith(SpringJUnit4ClassRunner.class)
 2 @ContextConfiguration(locations = "classpath:consumer-topic.xml")
 3 public class TopicConsumerTest {
 4 
 5     @Test
 6     public void topicReceiveTest(){
 7         try {
 8             System.in.read();
 9         } catch (IOException e) {
10             e.printStackTrace();
11         }
12     }
13 }

  7.開啟測試

  這裡為了實現持久化,我們需要先執行訊息的消費者測試類中的方法以實現訊息的訂閱,然後停止消費者的方法,再來執行兩次生產者測試類中的方法,再次執行消費者測試類中的方法,控制檯列印如下:

  我們可以發現在實現了訂閱機制之後,即使消費者宕機,只要再上線仍然可以收到在宕機期間生產者傳送的訊息。

  但是如果沒有實現訂閱的話那在Topic模式下進行的就是廣播形式,即生產者傳送訊息時,消費端線上即可收到訊息,如果錯過了就是一輩子。。。。

補充:

  死信佇列:沒有被我們正常消費的訊息就會存入到死信隊列當中。

  activeMQ中的訊息資料其實都是存放在檔案中的,可以通過修改activeMQ安裝目錄config下的activemq.xml進行配置。例如如果我們需要重新發送死信隊列當中的訊息就可以重新進行讀取來進行重發。

  當然這裡只是簡單的介紹一下,死信佇列的用處如果有了解的話希望大家能夠補充。:)

   查看了一下網上的一些資源發現解決問題的配置很多種,這裡僅作為自己學習的一種記錄,大家共勉。