1. 程式人生 > >ActiveMQ系列—ActiveMQ效能優化(中2)(處理規則和優化)

ActiveMQ系列—ActiveMQ效能優化(中2)(處理規則和優化)

4、消費者策略:Dispatch Async

討論完了訊息生產者的關鍵效能點,我們再將目光轉向訊息消費者(接收者端);就像本小節開始時描述的那樣,比起訊息生產者來說訊息消費者的效能更能影響ActiveMQ系統的整體效能,因為要成功完成一條訊息的處理,它的工作要遠遠多於訊息生產者。

首先,在預設情況下ActiveMQ服務端採用非同步方式向客戶端推送訊息。也就是說ActiveMQ服務端在向某個消費者會話推送訊息後,不會等待消費者的響應資訊,直到消費者處理完訊息後,主動向服務端返回處理結果。如果您對自己的消費者效能足夠滿意,也可以將這個過程設定為“同步”:

......
// 設定為同步
connectionFactory.setDispatchAsync(false
); ......

5、消費者策略:Prefetch

消費者關鍵策略中,需要重點討論的是消費者“預取數量”——prefetchSize。可以想象,如果消費者端的工作策略是按照某個週期(例如1秒),主動到伺服器端一條一條請求新的訊息,那麼消費者的工作效率一定是極低的;所以ActiveMQ系統中,預設的策略是ActiveMQ服務端一旦有訊息,就主動按照設定的規則推送給當前活動的消費者。其中每次推送都有一定的數量限制,這個限制值就是prefetchSize。

針對Queue工作模型的佇列和Topic工作模型的佇列,ActiveMQ有不同的預設“預取數量”;針對NON_PERSISTENT Message和PERSISTENT Message,ActiveMQ也有不同的預設“預取數量”:

  • PERSISTENT Message—Queue:prefetchSize=1000
  • NON_PERSISTENT Message—Queue:prefetchSize=1000
  • PERSISTENT Message—Topic:prefetchSize=100
  • NON_PERSISTENT Message—Topic:prefetchSize=32766

ActiveMQ中設定的各種預設預取數量一般情況下不需要進行改變。如果您使用預設的非同步方式從伺服器端推送訊息到消費者端,且您對消費者端的效能有足夠的信心,可以加大預取數量的限制。但是非必要情況下,請不要設定prefetchSize=1,因為這樣就是一條一條的取資料;也不要設定為prefetchSize=0,因為這將導致關閉伺服器端的推送機制,改為客戶端主動請求。

  • 可以通過ActiveMQPrefetchPolicy策略物件更改預取數量
......
// 預取策略物件
ActiveMQPrefetchPolicy prefetchPolicy = connectionFactory.getPrefetchPolicy();
// 設定Queue的預取數量為50
prefetchPolicy.setQueuePrefetch(50);
connectionFactory.setPrefetchPolicy(prefetchPolicy);
//進行連線
connection = connectionFactory.createQueueConnection();
connection.start();
......
  • 也可以通過Properties屬性更改(當然還可以加入其他屬性)預取數量:
......
Properties props = new Properties();
props.setProperty("prefetchPolicy.queuePrefetch", "1000");
props.setProperty("prefetchPolicy.topicPrefetch", "1000");
//設定屬性
connectionFactory.setProperties(props);
//進行連線
connection = connectionFactory.createQueueConnection();
connection.start();
......

6、消費者策略:事務和死信

6.1、消費者端事務

JMS規範除了為訊息生產者端提供事務支援以外,還為消費服務端準備了事務的支援。您可以通過在消費者端操作事務的commit和rollback方法,向伺服器告知一組訊息是否處理完成。採用事務的意義在於,一組訊息要麼被全部處理並確認成功,要麼全部被回滾並重新處理。

......
//建立會話(採用commit方式確認一批訊息處理完畢)
session = connection.createSession(true, Session.SESSION_TRANSACTED);
//建立Queue(當然如果有了就不會重複建立)
sendQueue = session.createQueue("/test");
//建立訊息傳送者物件
MessageConsumer consumer = session.createConsumer(sendQueue);
consumer.setMessageListener(new MyMessageListener(session));

......

class MyMessageListener implements MessageListener {
    private int number = 0;

    /**
     * 會話
     */
    private Session session;

    public MyMessageListener(Session session) {
        this.session = session;
    }

    @Override
    public void onMessage(Message message) {
        // 列印這條訊息
        System.out.println("Message = " + message);
        // 如果條件成立,就向伺服器確認這批訊息處理成功
        // 伺服器將從佇列中刪除這些訊息
        if(number++ % 3 == 0) {
            try {
                this.session.commit();
            } catch (JMSException e) {
                e.printStackTrace(System.out);
            }
        }
    }
}

以上程式碼演示的是消費者通過事務commit的方式,向伺服器確認一批訊息正常處理完成的方式。請注意程式碼示例中的“session = connection.createSession(true, Session.SESSION_TRANSACTED);”語句。第一個引數表示連線會話啟用事務支援;第二個引數表示使用commit或者rollback的方式進行向伺服器應答。

這是呼叫commit的情況,那麼如果呼叫rollback方法又會發生什麼情況呢?呼叫rollback方法時,在rollback之前已處理過的訊息(注意,並不是所有預取的訊息)將重新發送一次到消費者端(傳送給同一個連線會話)。並且訊息中redeliveryCounter(重發計數器)屬性將會加1。請看如下所示的程式碼片段和執行結果:

@Override
public void onMessage(Message message) {
    // 列印這條訊息
    System.out.println("Message = " + message);
    // rollback這條訊息
    this.session.rollback();
}

以上程式碼片段中,我們不停的回滾正在處理的這條訊息,通過打印出來的資訊可以看到,這條訊息被不停的重發:

Message = ActiveMQTextMessage {...... redeliveryCounter = 0, text = 這是傳送的訊息內容-------------------20}
Message = ActiveMQTextMessage {...... redeliveryCounter = 1, text = 這是傳送的訊息內容-------------------20}
Message = ActiveMQTextMessage {...... redeliveryCounter = 2, text = 這是傳送的訊息內容-------------------20}
Message = ActiveMQTextMessage {...... redeliveryCounter = 3, text = 這是傳送的訊息內容-------------------20}
Message = ActiveMQTextMessage {...... redeliveryCounter = 4, text = 這是傳送的訊息內容-------------------20}

可以看到同一條記錄被重複的處理,並且其中的redeliveryCounter屬性不斷累加。

6.2、重發和死信佇列

但是訊息處理失敗後,不斷的重發訊息肯定不是一個最好的處理辦法:如果一條訊息被不斷的處理失敗,那麼最可能的情況就是這條訊息承載的業務內容本身就有問題。那麼無論重發多少次,這條訊息還是會處理失敗。

為了解決這個問題,ActiveMQ中引入了“死信佇列”(Dead Letter Queue)的概念。即一條訊息再被重發了多次後(預設為重發6次redeliveryCounter==6),將會被ActiveMQ移入“死信佇列”。開發人員可以在這個Queue中檢視處理出錯的訊息,進行人工干預。

這裡寫圖片描述

預設情況下“死信佇列”只接受PERSISTENT Message,如果NON_PERSISTENT Message超過了重發上限,將直接被刪除。以下配置資訊可以讓NON_PERSISTENT Message在超過重發上限後,也移入“死信佇列”:

<policyEntry queue=">">  
    <deadLetterStrategy>  
        <sharedDeadLetterStrategy processNonPersistent="true" />  
    </deadLetterStrategy>  
</policyEntry>

另外,上文提到的預設重發次數redeliveryCounter的上限也是可以進行設定的,為了保證訊息異常情況下儘可能小的影響消費者端的處理效率,實際工作中建議將這個上限值設定為3。原因上文已經說過,如果訊息本身的業務內容就存在問題,那麼重發多少次也沒有用。

RedeliveryPolicy redeliveryPolicy = connectionFactory.getRedeliveryPolicy();
// 設定最大重發次數
redeliveryPolicy.setMaximumRedeliveries(3);

實際上ActiveMQ的重發機制還有包括以上提到的rollback方式在內的多種方式:

  • 在支援事務的消費者連線會話中呼叫rollback方法;
  • 在支援事務的消費者連線會話中,使用commit方法明確告知伺服器端訊息已處理成功前,會話連線就終止了(最可能是異常終止);
  • 在需要使用ACK模式的會話中,使用訊息的acknowledge方式明確告知伺服器端訊息已處理成功前,會話連線就終止了(最可能是異常終止)。

但是以上幾種重發機制有一些小小的差異,主要體現在redeliveryCounter屬性的作用區域。簡而言之,第一種方法redeliveryCounter屬性的作用區域是本次連線會話,而後兩種redeliveryCounter屬性的作用區域是在整個ActiveMQ系統範圍。

7、消費者策略:ACK

消費者端,除了可以使用事務方式來告知ActiveMQ服務端一批訊息已經成功處理外,還可以通過JMS規範中定義的acknowledge模式來實現同樣功能。事實上acknowledge模式更為常用。

7.1、基本使用

如果選擇使用acknowledge模式,那麼你至少有4種方式使用它,且這四種方式的效能區別很大:

  • AUTO_ACKNOWLEDGE方式:這種方式下,當消費者端通過receive方法或者MessageListener監聽方式從服務端得到訊息後(無論是pul方式還是push方式),消費者連線會話會自動認為消費者端對訊息的處理是成功的。但請注意,這種方式下消費者端不一定是向服務端一條一條ACK訊息;

  • CLIENT_ACKNOWLEDGE方式:這種方式下,當消費者端通過receive方法或者MessageListener監聽方式從服務端得到訊息後(無論是pul方式還是push方式),必須顯示呼叫訊息中的acknowledge方法。如果不這樣做,ActiveMQ伺服器端將不會認為這條訊息處理成功:

public void onMessage(Message message) {
    //====================
    //這裡進行您的業務處理
    //====================
    try {
        // 顯示呼叫ack方法
        message.acknowledge();
    } catch (JMSException e) {
        e.printStackTrace();
    }
}
  • DUPS_OK_ACKNOWLEDGE方式:批量確認方式。消費者端會按照一定的策略向伺服器端間隔傳送一個ack標示,表示某一批訊息已經處理完成。DUPS_OK_ACKNOWLEDGE方式和 AUTO_ACKNOWLEDGE方式在某些情況下是一致的,這個在後文會講到;

  • INDIVIDUAL_ACKNOWLEDGE方式:單條確認方式。這種方式是ActiveMQ單獨提供的一種方式,其常量定義的位置都不在javax.jms.Session規範介面中,而是在org.apache.activemq.ActiveMQSession這個類中。這種方式消費者端將會逐條向ActiveMQ服務端傳送ACK資訊。所以這種ACK方式的效能很差,除非您有特別的業務要求,否則不建議使用。

7.2、工作方式和效能

筆者建議首先考慮使用AUTO_ACKNOWLEDGE方式確認訊息,如果您這樣做,那麼一定請使用optimizeACK優化選項,並且重新設定prefetchSize數量為一個較小值(因為1000條的預設值在這樣的情況下就顯得比較大了):

......

//ack優化選項(實際上預設情況下是開啟的)
connectionFactory.setOptimizeAcknowledge(true);
//ack資訊最大發送週期(毫秒)
connectionFactory.setOptimizeAcknowledgeTimeOut(5000);
connection = connectionFactory.createQueueConnection();
connection.start();
......

AUTO_ACKNOWLEDGE方式的根本意義是“延遲確認”,消費者端在處理訊息後暫時不會發送ACK標示,而是把它快取在連線會話的一個pending 區域,等到這些訊息的條數達到一定的值(或者等待時間超過設定的值),再通過一個ACK指令告知服務端這一批訊息已經處理完成;而optimizeACK選項(指明AUTO_ACKNOWLEDGE採用“延遲確認”方式)只有當消費者端使用AUTO_ACKNOWLEDGE方式時才會起效:

“延遲確認”的數量閥值:prefetch * 0.65
“延遲確認”的時間閥值:> optimizeAcknowledgeTimeOut

DUPS_OK_ACKNOWLEDGE方式也是一種“延遲確認”策略,如果目標佇列是Queue模式,那麼它的工作策略與AUTO_ACKNOWLEDGE方式是一樣的。也就是說,如果這時prefetchSize =1 或者沒有開啟optimizeACK,也會逐條訊息傳送ACK標示;如果目標佇列是Topic模式,那麼無論optimizeACK是否開啟,都會在消費的訊息個數>=prefetch * 0.5時,批量確認這些訊息。