ActiveMQ學習筆記(19)----Consumer高階特性(一)
1. Exclusive Consumer
獨有消費者:Queue中的訊息是按照順序被分發到consumer的,然而,當你有多個consumers同時從相同的queue中提取訊息時,你將失去這個保證。因為這些訊息是被多個執行緒併發的處理。有的時候,保證訊息按照順序處理是很重要的。例如:你可能不希望在插入訂單操作結束之前執行更新這個訂單的操作。
ActiveMQ從4.x版本開始支援Exclusive Consumer。Broker會從多個Consumers中挑選一個consumer來處理queue中所有的訊息,從而保證了訊息的有序處理。如果這個consumer失效,那麼broker會自動切換到其他的consumer。可以通過destination options來建立一個Exclusive Consumer,如下:
queue = new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true"); consumer = session.createConsumer(queue);
還可以給consumer設定優先順序,以便針對網路情況進行優化,如下:
queue = new ActiveMQQueue("TEST.QUEUE?consumer.exclusive=true&consumer.priority=10");
2. Consumer Dispatche Async
在activemq4.0以後,你可以選擇broker同步或非同步的把訊息分發給消費者。可以設定dispatchAsync屬性,預設是true,通常情況下這是最佳的。
你也可以通過如下幾種方式修改:
1. 在ConnectionFactory層設定
ActiveMQConnectionFactory.setDispatchAsync(false);
2. 在Connection上設定,這個設定將會覆蓋ConnectionFactory上的設定
ActiveMQConnetion.setDispatchAsync(false);
3. 在Consumer上設定
queue = new ActiveMQQueue("TEST.QUEUE?consumer.dispatchAsync=false");
consumer = session.createConsumer(queue);
3. Consumer Priority
JMS JMSPriority定義了十個訊息優先順序值,0是最低優先順序,9是最高優先順序,另外,客戶端應當將0-4看作普通優先順序,5-9看作加急優先順序。
自定義Consumer Priority優先順序。配置如下:
queue = new ActiveMQQueue("TEST.QUEUE?consumer.priority=10");
consumer = session.createConsumer(queue);
Consumer的Priority的劃分為0~127個級別,127是最高的級別,0是最低的也是ActiveMQ預設的。這種配置可以讓Broker根據consumer的優先順序來發送訊息到較高的優先順序的Consumer上,如果某個較高的Consumer的訊息轉載慢,則Broker會把訊息傳送到僅次於它優先順序的Consumer上。
4. Manage Durable Subscribers
訊息持久化,保證了消費者離線之後,再次進入系統,不會錯過訊息,但是這也會消耗很多的資源,從5.6開始,可以對持久化進行如下管理:
Removing inactive subscribers
我們還希望可以刪除那些不活動的訂閱者,如下:
<broker name="localhost" offlineDurableSubscriberTimeout="86400000" offlineDurableSubscriberTaskSchedule="3600000">
1.offlineDurableSubscriberTimeout:離線多長時間就過期刪除,預設是-1,就是不刪除。
2. offlineDurableSubscriberTaskSchedule: 多長時間檢查一次,預設300000,單位毫秒。
5. Message Groups
Message Goups就是對訊息分組,它是Exclusive Consumer功能的增強。
邏輯上Message Groups可以看成是一種併發的Exclusive Consumer。跟所有的訊息都由唯一的consumer處理不同,JMS訊息屬性的JMSXGroupID用來區分message group.
Message Group特性保證所有具有相同JMSXGroupID的訊息 都會被分發到相同的consumer(只要這個consumer保持active).
另一方面,Message Groups特性也是一種負載均衡的機制。在一個訊息被分發到consumer之前,broker首先檢查訊息JMSXGroupID屬性。如果存在,那麼broker會檢查是否有某個consumer擁有這個message group.如果沒有,那麼broker會選擇一個consumer,並將它關聯到這個message group.此後,這個consumer會接收到這個message group的所有訊息,直到:
1. consumer被關閉
2. Message group被關閉,通過傳送一個訊息,並設定這個訊息的JMSXGroupSeq為-1
建立一個Message Groups,只需要在message物件上設定屬性即可,如下:
message.setStringProperty("JMSXGroupID","GroupA");
關閉一個Message Groups,只需要在message物件上設定屬性即可,如下:
message.setStringProperty("JMSXGroupID","GroupA");
message.setIntProperty("JMSXGroupSeq",-1);