1. 程式人生 > >activemq-重發、去重

activemq-重發、去重

.html 重啟 etc iterator 延遲 隊列 void fetch 維護

activemq的consumer端也有窗口機制,通過prefetchSize就可以設置窗口大小。加入窗口是為了批量獲取數據,同時可以設置optimizeAcknowledge來優化確認回復,優化確認一方面可以減輕client負擔(不需要頻繁的確認消息)、減少通信開銷,另一方面由於延遲了確認(默認ack了0.65*prefetchSize個消息才確認),broker再次發送消息時又可以批量發送,如果只是開啟了prefetchSize,每條消息都去確認的話,broker在收到確認後也只是發送一條消息,當然也可以手動延遲確認。

consumer會維護兩個隊列,pendingList和dispatchedList,前者存放從broker已接受但未消費(未回調onMessage)的message,後者用於存放已消費但未確認的message(可用於recover,即redelivery)。

activemq的重發機制是session為單位的,並且重發只發生在client端,並不會向broker請求重發消息,只會在重發後向broker發送一個redelivered命令,如果某消息的redelivered次數達到閾值,這條消息就會被清除並送入DLQ。

 1 public void recover() throws JMSException {
 2 
 3   checkClosed();
 4   if (getTransacted()) {
 5     throw new IllegalStateException("This session is transacted");
6 } 7 //該session的每個consumer都會recover 8 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 9 ActiveMQMessageConsumer c = iter.next(); 10 c.rollback(); 11 } 12 13 }

同樣的,message的確認也是session級別的

1 public void acknowledge() throws JMSException {
2 for (Iterator<ActiveMQMessageConsumer> iter = consumers.iterator(); iter.hasNext();) { 3 ActiveMQMessageConsumer c = iter.next(); 4 c.acknowledge(); 5 } 6 }

通過持久化、確認機制,broker可以保證消息不丟失,即如果consumer未確認消息,consumer都可以再次得到該消息,但broker並不擔保消息被client唯一消費。onMessage處理消息時出錯,consumer會自動發起recover;重啟consumer後,consumer會得到之前未確認的消息;consumer回復了確認,但確認命令還未得到broker處理時,broker掛掉了,broker重啟後,consumer依舊會收到之前確認過的消息。這些情況都會產生重復消息,消息的去重需要client自己保證,最簡單直接的方式就是處理完消息時,將消息業務唯一標識符入庫,每次處理消息時都檢查是否存在該標識符。

參考:http://activemq.apache.org/message-redelivery-and-dlq-handling.html

  http://activemq.apache.org/redelivery-policy.html

activemq-重發、去重