ActiveMQ(24):Consumer高級特性之Slow Consumer Handling(慢消費者的處理)
一、Prefetch機制
ActiveMQ通過Prefetch機制來提高性能,方式是在客戶端的內存裏可能會緩存一定數量的消息。緩存消息的數量由prefetch limit來控制。當某個consumer的prefetch buffer已經達到上限,那麽broker不會再向consumer分發消息,直到consumer向broker發送消息的確認,確認後的消息將會從緩存中去掉。
可以通過在ActiveMQConnectionFactory或者ActiveMQConnection上設置ActiveMQPrefetchPolicy對象來配置prefetch policy。也可以通過connection options或者destination options來配置。例如:
tcp://localhost:61616?jms.prefetchPolicy.all=50
tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1
queue = new ActiveMQQueue("TEST.QUEUE?consumer.prefetchSize=10");
prefetch size的缺省值如下:
1:persistent queues (default value: 1000)
2:non-persistent queues (default value: 1000)
3:persistent topics (default value: 100)
4:non-persistent topics (default value: Short.MAX_VALUE -1)
二、慢Consumer處理
慢消費者會在非持久的topics上導致問題:一旦消息積壓起來,會導致broker把大量消息保存在內存中,broker也會因此而變慢。目前ActiveMQ使用Pending Message Limit Strategy來解決這個問題。除了prefetch buffer之外,你還要配置緩存消息的上限,超過這個上限後,新消息到來時會丟棄舊消息。
通過在配置文件的destination map中配置PendingMessageLimitStrategy,可以為不用的topic namespace配置不同的策略。
Pending Message Limit Strategy(等待消息限制策略)目前有以下兩種:
1: Constant Pending Message Limit Strategy
Limit可以設置0、>0、-1三種方式:
0表示:不額外的增加其預存大小。
>0表示:再額外的增加其預存大小。
-1表示:不增加預存也不丟棄舊的消息。
這個策略使用常量限制,配置如下:
<constantPendingMessageLimitStrategy limit="50"/>
2:Prefetch Rate Pending Message Limit Strategy
這種策略是利用Consumer的之前的預存的大小乘以其倍數等於現在的預存大小。比如:<prefetchRatePendingMessageLimitStrategy multiplier="2.5"/>
說明:在以上兩種方式中,如果設置0意味著除了prefetch之外不再緩存消息;如果設置-1意味著禁止丟棄消息。
三、配置消息的丟棄策略
消息的丟棄策略,目前有三種方式:
1:oldestMessageEvictionStrategy:這個策略丟棄最舊的消息。
2:oldestMessageWithLowestPriorityEvictionStrategy:這個策略丟棄最舊的,而且具有最低優先級的消息。
3:uniquePropertyMessageEvictionStrategy:從5.6開始,可以根據自定義的屬性來進行拋棄,比如<uniquePropertyMessageEvictionStrategy propertyName=“STOCK” />,這就表示拋棄屬性名稱為Stock的消息
配置示例:
<destinationPolicy> <policyMap> <policyEntries> <policyEntry topic="FOO.>"> <dispatchPolicy> <roundRobinDispatchPolicy /> </dispatchPolicy> </policyEntry> <policyEntry topic="ORDERS.>"> <dispatchPolicy> <strictOrderDispatchPolicy /> </dispatchPolicy> </policyEntry> <policyEntry topic="PRICES.>"> <!-- lets force old messages to be discarded for slow consumers --> <pendingMessageLimitStrategy> <constantPendingMessageLimitStrategy limit="10"/> </pendingMessageLimitStrategy> </policyEntry> <policyEntry tempTopic="true" advisoryForConsumed="true" /> <policyEntry tempQueue="true" advisoryForConsumed="true" /> </policyEntries> </policyMap> </destinationPolicy>
本文出自 “我愛大金子” 博客,請務必保留此出處http://1754966750.blog.51cto.com/7455444/1924940
ActiveMQ(24):Consumer高級特性之Slow Consumer Handling(慢消費者的處理)