1. 程式人生 > >ActiveMQ 重發機制與確認機制 實踐

ActiveMQ 重發機制與確認機制 實踐

方法 Coding ssa this 消息發送 als cer 出隊 引用

一、配置spring-activemq.xml

 1 <?xml version="1.0" encoding="UTF-8"?>
 2 <beans xmlns="http://www.springframework.org/schema/beans"
 3        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 4        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"
> 5 6 <!-- 第三方MQ工廠: ConnectionFactory --> 7 <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> 8 <!-- ActiveMQ Address --> 9 <property name="brokerURL" value="${activemq.brokerURL}"/> 10 <property
name="userName" value="${activemq.userName}"/> 11 <property name="password" value="${activemq.password}"/> 12 <!-- 是否異步發送 --> 13 <property name="useAsyncSend" value="true"/> 14 <!-- 引用重發機制 --> 15 <property name="redeliveryPolicy" ref
="activeMQRedeliveryPolicy" /> 16 <!-- 消息傳輸監聽器 處理網絡及服務器異常 --> 17 <!--<property name="transportListener">--> 18 <!--<bean class="com.schooling.activemq.ActiveMQTransportListener"/>--> 19 <!--</property>--> 20 </bean> 21 22 <!-- 23 ActiveMQ為我們提供了一個PooledConnectionFactory,通過往裏面註入一個ActiveMQConnectionFactory 24 可以用來將Connection、Session和MessageProducer池化,這樣可以大大的減少我們的資源消耗,要依賴於 activemq-pool包 25 --> 26 <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"> 27 <property name="connectionFactory" ref="targetConnectionFactory"/> 28 <property name="maxConnections" value="${activemq.pool.maxConnections}"/> 29 </bean> 30 31 <!-- 定義ReDelivery(重發機制)機制 ,重發時間間隔是100毫秒,最大重發次數是3次 --> 32 <bean id="activeMQRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy"> 33 <!--是否在每次嘗試重新發送失敗後,增長這個等待時間 --> 34 <property name="useExponentialBackOff" value="true"/> 35 <!--重發次數,默認為6次 這裏設置為1次 --> 36 <property name="maximumRedeliveries" value="2"/> 37 <!--重發時間間隔,默認為1秒 --> 38 <property name="initialRedeliveryDelay" value="1000"/> 39 <!--第一次失敗後重新發送之前等待500毫秒,第二次失敗再等待500 * 2毫秒,這裏的2就是value --> 40 <property name="backOffMultiplier" value="2"/> 41 <!--最大傳送延遲,只在useExponentialBackOff為true時有效(V5.5),假設首次重連間隔為10ms,倍數為2,那麽第二次重連時間間隔為 20ms, 42 第三次重連時間間隔為40ms,當重連時間間隔大的最大重連時間間隔時,以後每次重連時間間隔都為最大重連時間間隔。 --> 43 <property name="maximumRedeliveryDelay" value="1000"/> 44 </bean> 45 46 <!-- Spring用於管理真正的ConnectionFactory的ConnectionFactory --> 47 <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> 48 <!-- 目標ConnectionFactory對應真實的可以產生JMS Connection的ConnectionFactory --> 49 <property name="targetConnectionFactory" ref="pooledConnectionFactory"/> 50 </bean> 51 52 <!--這個是目的地--> 53 <bean id="msgQueue" class="org.apache.activemq.command.ActiveMQQueue"> 54 <constructor-arg value="${activemq.queueName}"/> 55 </bean> 56 57 <!-- Spring提供的JMS工具類,它可以進行消息發送、接收等 --> 58 <!-- 隊列模板 --> 59 <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> 60 <!-- 這個connectionFactory對應的是我們定義的Spring提供的那個ConnectionFactory對象 --> 61 <property name="connectionFactory" ref="connectionFactory"/> 62 <property name="defaultDestinationName" value="${activemq.queueName}"/> 63 </bean> 64 65 <!-- 配置自定義監聽:MessageListener --> 66 <bean id="msgQueueMessageListener" class="com.schooling.activemq.consumer.MsgQueueMessageListener"/> 67 68 <!-- 將連接工廠、目標對了、自定義監聽註入jms模板 --> 69 <bean id="sessionAwareListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> 70 <property name="connectionFactory" ref="connectionFactory"/> 71 <property name="destination" ref="msgQueue"/> 72 <property name="messageListener" ref="msgQueueMessageListener"/> 73 <!--應答模式是 INDIVIDUAL_ACKNOWLEDGE--> 74 <property name="sessionAcknowledgeMode" value="4"/> 75 </bean> 76 77 </beans>

二、生產者

 1 @Service("activeMQProducer")
 2 public class ActiveMQProducer {
 3 
 4     private JmsTemplate jmsTemplate;
 5 
 6     public JmsTemplate getJmsTemplate() {
 7         return jmsTemplate;
 8     }
 9 
10     @Autowired
11     public void setJmsTemplate(JmsTemplate jmsTemplate) {
12         this.jmsTemplate = jmsTemplate;
13     }
14 
15     public void sendMessage(final String info) {
16         jmsTemplate.send(new MessageCreator() {
17             public Message createMessage(Session session) throws JMSException {
18                 return session.createTextMessage(info);
19             }
20         });
21     }
22 }

三、消費者(監聽模式)

 1 public class MsgQueueMessageListener implements SessionAwareMessageListener<Message> {
 2 
 3     @Override
 4     public void onMessage(Message message, Session session) throws JMSException {
 5 
 6         if (message instanceof TextMessage) {
 7 
 8             String msg = ((TextMessage) message).getText();
 9 
10             System.out.println("============================================================");
11             System.out.println("消費者收到的消息:" + msg);
12             System.out.println("============================================================");
13 
14             try {
15                 if ("我是隊列消息002".equals(msg)) {
16                     throw new RuntimeException("故意拋出的異常");
17                 }
18                 // 只要被確認後  就會出隊,接受失敗沒有確認成功,會在原隊列裏面
19                 message.acknowledge();
20             } catch (Exception e) {
21                 // 此不可省略 重發信息使用
22                 session.recover();
23             }
24         }
25     }
26 }

四、測試方法

1 @Test
2 public void send() {
3     for (int i = 1; i < 5; i++) {
4         this.activeMQProducer.sendMessage("我是隊列消息00" + i);
5     }
6     while (true) {}
7 }

五、測試結果

技術分享圖片

技術分享圖片

六、測試小結

“我是隊列消息002”由於異常,未接收成功。在重發2次都失敗的情況下被發送到“死信隊列”。其他4條信息都接收成功。

ActiveMQ 重發機制與確認機制 實踐