1. 程式人生 > >ActiveMQ學習筆記(19)----Consumer高階特性(一)

ActiveMQ學習筆記(19)----Consumer高階特性(一)

1. Exclusive Consumer

  獨有消費者:Queue中的訊息是按照順序被分發到consumer的,然而,當你有多個consumers同時從相同的queue中提取訊息時,你將失去這個保證。因為這些訊息是被多個執行緒併發的處理。有的時候,保證訊息按照順序處理是很重要的。例如:你可能不希望在插入訂單操作結束之前執行更新這個訂單的操作。

  ActiveMQ從4.x版本開始支援Exclusive Consumer。Broker會從多個Consumers中挑選一個consumer來處理queue中所有的訊息,從而保證了訊息的有序處理。如果這個consumer失效,那麼broker會自動切換到其他的consumer。可以通過destination options來建立一個Exclusive Consumer,如下:

queue = new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true");
consumer = session.createConsumer(queue);

  還可以給consumer設定優先順序,以便針對網路情況進行優化,如下:

queue = new  ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true&consumer.priority=10");

2. Consumer Dispatche Async

  在activemq4.0以後,你可以選擇broker同步或非同步的把訊息分發給消費者。可以設定dispatchAsync屬性,預設是true,通常情況下這是最佳的。

  你也可以通過如下幾種方式修改:

  1. 在ConnectionFactory層設定

  ActiveMQConnectionFactory.setDispatchAsync(false);

  2. 在Connection上設定,這個設定將會覆蓋ConnectionFactory上的設定

  ActiveMQConnetion.setDispatchAsync(false);

  3. 在Consumer上設定

  queue = new ActiveMQQueue("TEST.QUEUE?consumer.dispatchAsync=false");

  consumer = session.createConsumer(queue);

3. Consumer Priority

  JMS JMSPriority定義了十個訊息優先順序值,0是最低優先順序,9是最高優先順序,另外,客戶端應當將0-4看作普通優先順序,5-9看作加急優先順序。

  自定義Consumer Priority優先順序。配置如下:

  queue = new ActiveMQQueue("TEST.QUEUE?consumer.priority=10");

  consumer = session.createConsumer(queue);

  Consumer的Priority的劃分為0~127個級別,127是最高的級別,0是最低的也是ActiveMQ預設的。這種配置可以讓Broker根據consumer的優先順序來發送訊息到較高的優先順序的Consumer上,如果某個較高的Consumer的訊息轉載慢,則Broker會把訊息傳送到僅次於它優先順序的Consumer上。

4. Manage Durable Subscribers

  訊息持久化,保證了消費者離線之後,再次進入系統,不會錯過訊息,但是這也會消耗很多的資源,從5.6開始,可以對持久化進行如下管理:

  Removing inactive subscribers

  我們還希望可以刪除那些不活動的訂閱者,如下:

<broker name="localhost" offlineDurableSubscriberTimeout="86400000" offlineDurableSubscriberTaskSchedule="3600000">

  1.offlineDurableSubscriberTimeout:離線多長時間就過期刪除,預設是-1,就是不刪除。

  2. offlineDurableSubscriberTaskSchedule: 多長時間檢查一次,預設300000,單位毫秒。

5. Message Groups

  Message Goups就是對訊息分組,它是Exclusive Consumer功能的增強。

  邏輯上Message Groups可以看成是一種併發的Exclusive Consumer。跟所有的訊息都由唯一的consumer處理不同,JMS訊息屬性的JMSXGroupID用來區分message group.

  Message Group特性保證所有具有相同JMSXGroupID的訊息 都會被分發到相同的consumer(只要這個consumer保持active).

  另一方面,Message Groups特性也是一種負載均衡的機制。在一個訊息被分發到consumer之前,broker首先檢查訊息JMSXGroupID屬性。如果存在,那麼broker會檢查是否有某個consumer擁有這個message group.如果沒有,那麼broker會選擇一個consumer,並將它關聯到這個message group.此後,這個consumer會接收到這個message group的所有訊息,直到:

  1. consumer被關閉

  2. Message group被關閉,通過傳送一個訊息,並設定這個訊息的JMSXGroupSeq為-1

  建立一個Message Groups,只需要在message物件上設定屬性即可,如下:

  message.setStringProperty("JMSXGroupID","GroupA");

  關閉一個Message Groups,只需要在message物件上設定屬性即可,如下:

  message.setStringProperty("JMSXGroupID","GroupA");

  message.setIntProperty("JMSXGroupSeq",-1);