1. 程式人生 > >ActiveMQ(24):Consumer高級特性之Slow Consumer Handling(慢消費者的處理)

ActiveMQ(24):Consumer高級特性之Slow Consumer Handling(慢消費者的處理)

jms activemq slow consumer handling 慢消費者的處理

一、Prefetch機制

ActiveMQ通過Prefetch機制來提高性能,方式是在客戶端的內存裏可能會緩存一定數量的消息。緩存消息的數量由prefetch limit來控制。當某個consumer的prefetch buffer已經達到上限,那麽broker不會再向consumer分發消息,直到consumer向broker發送消息的確認,確認後的消息將會從緩存中去掉。


可以通過在ActiveMQConnectionFactory或者ActiveMQConnection上設置ActiveMQPrefetchPolicy對象來配置prefetch policy。也可以通過connection options或者destination options來配置。例如:

tcp://localhost:61616?jms.prefetchPolicy.all=50

tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1

queue = new ActiveMQQueue("TEST.QUEUE?consumer.prefetchSize=10");


prefetch size的缺省值如下:

1:persistent queues (default value: 1000)

2:non-persistent queues (default value: 1000)

3:persistent topics (default value: 100)

4:non-persistent topics (default value: Short.MAX_VALUE -1)


二、慢Consumer處理

慢消費者會在非持久的topics上導致問題:一旦消息積壓起來,會導致broker把大量消息保存在內存中,broker也會因此而變慢。目前ActiveMQ使用Pending Message Limit Strategy來解決這個問題。除了prefetch buffer之外,你還要配置緩存消息的上限,超過這個上限後,新消息到來時會丟棄舊消息。


通過在配置文件的destination map中配置PendingMessageLimitStrategy,可以為不用的topic namespace配置不同的策略。


Pending Message Limit Strategy(等待消息限制策略)目前有以下兩種:

1: Constant Pending Message Limit Strategy

Limit可以設置0、>0、-1三種方式:

0表示:不額外的增加其預存大小。

>0表示:再額外的增加其預存大小。

-1表示:不增加預存也不丟棄舊的消息。

這個策略使用常量限制,配置如下:

<constantPendingMessageLimitStrategy limit="50"/>

2:Prefetch Rate Pending Message Limit Strategy

這種策略是利用Consumer的之前的預存的大小乘以其倍數等於現在的預存大小。比如:<prefetchRatePendingMessageLimitStrategy multiplier="2.5"/>

說明:在以上兩種方式中,如果設置0意味著除了prefetch之外不再緩存消息;如果設置-1意味著禁止丟棄消息。

三、配置消息的丟棄策略

消息的丟棄策略,目前有三種方式:

1:oldestMessageEvictionStrategy:這個策略丟棄最舊的消息。

2:oldestMessageWithLowestPriorityEvictionStrategy:這個策略丟棄最舊的,而且具有最低優先級的消息。

3:uniquePropertyMessageEvictionStrategy:從5.6開始,可以根據自定義的屬性來進行拋棄,比如<uniquePropertyMessageEvictionStrategy propertyName=“STOCK” />,這就表示拋棄屬性名稱為Stock的消息


配置示例:

<destinationPolicy>
    <policyMap>
        <policyEntries>
            <policyEntry topic="FOO.>">
                <dispatchPolicy>
                    <roundRobinDispatchPolicy />
                </dispatchPolicy>
            </policyEntry>
            <policyEntry topic="ORDERS.>">
                <dispatchPolicy>
                    <strictOrderDispatchPolicy />
                </dispatchPolicy>
            </policyEntry>
            <policyEntry topic="PRICES.>">
                <!-- lets force old messages to be discarded for slow consumers -->
                <pendingMessageLimitStrategy>
                    <constantPendingMessageLimitStrategy limit="10"/>
                </pendingMessageLimitStrategy>
            </policyEntry>
            <policyEntry tempTopic="true" advisoryForConsumed="true" />
            <policyEntry tempQueue="true" advisoryForConsumed="true" />
        </policyEntries>
    </policyMap>
</destinationPolicy>



本文出自 “我愛大金子” 博客,請務必保留此出處http://1754966750.blog.51cto.com/7455444/1924940

ActiveMQ(24):Consumer高級特性之Slow Consumer Handling(慢消費者的處理)