1. 程式人生 > >ActiveMQ(20):Consumer高級特性之重新投遞(Redelivery Policy)

ActiveMQ(20):Consumer高級特性之重新投遞(Redelivery Policy)

jms activemq 重新投遞

一、簡介

ActiveMQ在接收消息的Client有以下幾種操作的時候,需要重新傳遞消息:

1:Client用了transactions,且在session中調用了rollback()

2:Client用了transactions,且在調用commit()之前關閉

3:Client在CLIENT_ACKNOWLEDGE的傳遞模式下,在session中調用了recover()

二、定制想要的再次傳送策略

可以通過設置ActiveMQConnectionFactory和ActiveMQConnection來定制想要的再次傳送策略,可用的Redelivery屬性如下:

1:collisionAvoidanceFactor:設置防止沖突範圍的正負百分比,只有啟用useCollisionAvoidance參數時才生效。

也就是在延遲時間上再加一個時間波動範圍。默認值為0.15

2:maximumRedeliveries:最大重傳次數,達到最大重連次數後拋出異常。為-1時不限制次數,為0時表示不進行重

傳。默認值為6。

3:maximumRedeliveryDelay:最大傳送延遲,只在useExponentialBackOff為true時有效(V5.5),假設首次重連間

隔為10ms,倍數為2,那麽第二次重連時間間隔為 20ms,第三次重連時間間隔為40ms,當重連時間間隔大的最大重

連時間間隔時,以後每次重連時間間隔都為最大重連時間間隔。默認為-1。

4:initialRedeliveryDelay:初始重發延遲時間,默認1000L

5:redeliveryDelay:重發延遲時間,當initialRedeliveryDelay=0時生效,默認1000L

6:useCollisionAvoidance:啟用防止沖突功能,默認false

7:useExponentialBackOff:啟用指數倍數遞增的方式增加延遲時間,默認false

8:backOffMultiplier:重連時間間隔遞增倍數,只有值大於1和啟用useExponentialBackOff參數時才生


在接受的Client可以如下設置:

ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("failover:(tcp://192.168.175.13:61616,tcp://192.168.175.13:61676)?randomize=false");
RedeliveryPolicy policy = new RedeliveryPolicy();
policy.setMaximumRedeliveries(3);
cf.setRedeliveryPolicy(policy)

三、死隊列

3.1 簡介

當消息試圖被傳遞的次數超過配置中maximumRedeliveries屬性的值時,那麽,broker會認定該消息是一個死消息,並被把該消息發送到

死隊列中。 默認,aciaveMQ中死隊列被聲明為“ActivemMQ.DLQ”,所有不能消費的消息都被傳遞到該死隊列中。 你可以在

acivemq.xml中配置individualDeadLetterStrategy屬性,示例如下:

<policyEntry queue= "> " >
    <deadLetterStrategy>
        <individualDeadLetterStrategy queuePrefix= "DLQ." useQueueForQueueMessages= "true" />
    </deadLetterStrategy>
</policyEntry>

3.2 自動刪除過期消息

有時需要直接刪除過期的消息而不需要發送到死隊列中,可以使用屬性processExpired=false來設置,示例如下:

<policyEntry queue= "> " >
    <deadLetterStrategy>
        <sharedDeadLetterStrategy processExpired= "false" />
    </deadLetterStrategy>
</policyEntry>

3.3 存放非持久消息到死隊列中

默認情況下,Activemq不會把非持久的死消息發送到死隊列中。非持久性如果你想把非持久的消息發送到死隊列中,

需要設置屬性processNonPersistent=“true”,示例如下:

<policyEntry queue= "> " >
    <deadLetterStrategy>
        <sharedDeadLetterStrategy processNonPersistent= "true" />
    </deadLetterStrategy>
</policyEntry>

四、為每一個Destination配置一個Redelivery Policy

在V5.7之後,你可以為每一個Destination配置一個Redelivery Policy。示例如:

ActiveMQConnection connection ... // Create a connection
RedeliveryPolicy queuePolicy = new RedeliveryPolicy();
queuePolicy.setInitialRedeliveryDelay(0);
queuePolicy.setRedeliveryDelay(1000);
queuePolicy.setUseExponentialBackOff(false);
queuePolicy.setMaximumRedeliveries(2);

RedeliveryPolicy topicPolicy = new RedeliveryPolicy();
topicPolicy.setInitialRedeliveryDelay(0);
topicPolicy.setRedeliveryDelay(1000);
topicPolicy.setUseExponentialBackOff(false);
topicPolicy.setMaximumRedeliveries(3);
// Receive a message with the JMS API
RedeliveryPolicyMap map = connection.getRedeliveryPolicyMap();
map.put(new ActiveMQTopic(">"), topicPolicy);
map.put(new ActiveMQQueue(">"), queuePolicy);

五、定制想要的再次傳送策略(spring.xml)

<!--創建連接工廠 -->  
<amq:connectionFactory id="amqConnectionFactory"
    brokerURL="${activemq.brokerURL}" 
    userName="${activemq.userName}" 
    password="${activemq.password}">
    <property name="redeliveryPolicy" ref="activeMQRedeliveryPolicy" />  <!-- 引用重發機制 --> 
</amq:connectionFactory>
    
<!-- 定義ReDelivery(重發機制)機制 ,重發時間間隔是100毫秒,最大重發次數是3次 http://www.kuqin.com/shuoit/20140419/339344.html -->  
<bean id="activeMQRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">  
    <!--是否在每次嘗試重新發送失敗後,增長這個等待時間 -->  
    <property name="useExponentialBackOff" value="true"></property>  
    <!--重發次數,默認為6次   這裏設置為5次 -->  
    <property name="maximumRedeliveries" value="5"></property>  
    <!--重發時間間隔,默認為1秒 -->  
    <property name="initialRedeliveryDelay" value="2000"></property>  
    <!--第一次失敗後重新發送之前等待2000毫秒,第二次失敗再等待2000 * 1毫秒,這裏的1就是value -->  
    <property name="backOffMultiplier" value="2"></property>  
    <!--
      	最大傳送延遲,只在useExponentialBackOff為true時有效(V5.5),假設首次重連間隔為10ms,倍數為2,
       	那麽第   二次重連時間間隔為 20ms,第三次重連時間間隔為40ms,當重連時間間隔大的最大重連時間間隔時,
       	以後每次重連時間間隔都為最大重連時間間隔。
    -->  
<!--         <property name="maximumRecdeliveryDelay" value="10000"></property>   -->
</bean> 

<!-- 定義Queue監聽器 -->
<jms:listener-container transaction-manager="jmsTransactionManager" destination-type="queue" container-type="default" connection-factory="connectionFactory" acknowledge="auto">
    <jms:listener destination="test.queue" ref="queueReceiver1"/>
</jms:listener-container>
    
<bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
    <property name="connectionFactory" ref="amqConnectionFactory" />
</bean>

技術分享




本文出自 “我愛大金子” 博客,請務必保留此出處http://1754966750.blog.51cto.com/7455444/1923649

ActiveMQ(20):Consumer高級特性之重新投遞(Redelivery Policy)