1. 程式人生 > >架構設計:系統間通訊(23)——提高ActiveMQ工作效能(中)

架構設計:系統間通訊(23)——提高ActiveMQ工作效能(中)

6、ActiveMQ處理規則和優化

在ActiveMQ單個服務節點的優化中,除了對ActiveMQ單個服務節點的網路IO模型進行優化外,生產者傳送訊息的策略和消費者處理訊息的策略也關乎整個訊息佇列系統是否能夠高效工作。請看下圖所示的訊息生產者和訊息消費者的簡要工作原理圖:

這裡寫圖片描述

  1. Producer既是訊息生產者,作為一個傳送訊息的客戶端它既可以使用同步訊息傳送模式,也可以使用非同步的訊息傳送模式。另外,訊息生產者在ActiveMQ服務節點產生訊息堆積的情況下,也不能一味的追求傳送效率。還好,這種情況下訊息生產者端有完整的保證機制——Slow Producer。另外,JMS提供事務功能,所以生產者是否開啟事務傳送訊息,將會影響訊息傳送效能。

  2. 在整個訊息處理規則中,ActiveMQ服務節點最極端的情況就是產生訊息堆積(訊息的消費速度持續低於訊息生成速度,就會出現訊息堆積)。那麼ActiveMQ服務節點處理網路IO模型的優化外,最大的優化點就是:如何應對訊息堆積。基本原則是:NON_PERSISTENT Message在記憶體堆積後,轉儲到Temp Store區域(當然也可以設定為不轉儲);PERSISTENT Meaage無論怎樣都會先使用持久化方案儲存到永久儲存區域,再視情況決定是否進行傳送;在這些區域也產生堆積後,通知訊息生產者使用Slow Producer機制

  3. ActiveMQ服務節點在成功完成PERSISTENT Meaage的持久儲存操作後,就會向訊息生產者傳送一個確認資訊,表示該條訊息已處理完成。如果ActiveMQ服務節點接受的是NON_PERSISTENT Message,那麼生產者預設情況下不會等待服務節點的回執

    。我們後續會介紹PERSISTENT Meaage的傳送也可以設定為不等待回執,這樣可以顯著提高生產端的訊息傳送效率。

  4. ActiveMQ服務節點會以一種設定好的策略將訊息傳送給消費者,這個策略的原則是ActiveMQ服務節點主動推送訊息給某一個消費者(不是消費者主動拉取),這樣的方式很好便於ActiveMQ服務節點佔據整個策略的領導地位。在這個策略中,最重要的屬性是prefetchSize:單次獲得的訊息數量。除此以外,消費者端也是可以調整IO網路模型。

  5. 消費者在獲得到訊息後,就可以根據這條訊息進行業務處理了。最後,消費者需要按照處理結果向ActiveMQ服務節點告知這條(或這些)訊息是否處理成功——ACK應答。ActiveMQ中有多種ACK應答方式,它們對效能的影響也不相同。通過這些描述可以看出,消費者的處理效能更能直接影響整個ActiveMQ系統的效能

    。因為消費者不僅要接受訊息、處理訊息還要返回訊息應答。所以如果您的業務有一定的複雜性,造成每一條訊息的處理都會消耗相當的處理時間,那麼最直接的效能改善方式就是採用多個消費者節點

6-1、生產者策略:Send

訊息生產者傳送的訊息主要分為兩種型別:傳送PERSISTENT Meaage和傳送NON_PERSISTENT Message。

6-1-1、傳送NON_PERSISTENT Message

預設情況下,ActiveMQ服務端認為生產者端傳送的是PERSISTENT Message。所以如果要傳送NON_PERSISTENT Message,那麼生產者端就要明確指定。以下語句明確指定訊息傳送者將要傳送NON_PERSISTENT Message:

......
// 設定傳送者傳送的是NON_PERSISTENT Message
MessageProducer sender = session.createProducer(sendQueue);
sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
......

傳送NON_PERSISTENT Message時,訊息傳送方預設使用非同步方式:即是說訊息傳送後傳送方不會等待NON_PERSISTENT Message在服務端的任何回執。那麼問題來了:如果這時服務端已經出現了訊息堆積,並且堆積程度已經達到“無法再接收新訊息”的極限情況了,那麼訊息傳送方如果知曉並採取相應的策略呢?

實際上所謂的非同步傳送也並非絕對的非同步,訊息傳送者會在傳送一定大小的訊息後等待服務端進行回執(這個配置只是針對使用非同步方式進行傳送訊息的情況):

......
// 以下語句設定訊息傳送者在累計傳送102400byte大小的訊息後(可能是一條訊息也可能是多條訊息)
// 等待服務端進行回執,以便確定之前傳送的訊息是否被正確處理
// 確定伺服器端是否產生了過量的訊息堆積,需要減慢訊息生產端的生產速度
connectionFactory.setProducerWindowSize(102400);
......

如果您使用的是非同步傳送方式,那麼必須通過這個以上程式碼指明回執點。例如傳送NON_PERSISTENT Message時,這時訊息傳送者預設使用非同步方式。那麼如果您想在傳送NON_PERSISTENT Message時,每次都等待訊息回執,又該如何設定呢?您可以使用connectionFactory提供的“alwaysSyncSend”設定來指定每次都等待服務端的回執:

......
// 設定成:無論怎樣每次都等待伺服器端的回執
// 但關鍵是您確定自己的業務需求真的需要這樣使用嗎?
connectionFactory.setAlwaysSyncSend(true);
......

6-1-2、傳送PERSISTENT Message

如果您不特意指定訊息的傳送型別,那麼訊息生產者預設傳送PERSISTENT Meaage。這樣的訊息傳送到ActiveMQ服務端後將被進行持久化儲存(持久化儲存方案將在後文進行詳細介紹),並且訊息傳送者預設等待ActiveMQ服務端對這條訊息處理情況的回執。

以上這個過程非常耗時,ActiveMQ服務端不但要接受訊息,在記憶體中完成儲存,並且按照ActiveMQ服務端設定的持久化儲存方案對訊息進行儲存(主要的處理時間耗費在這裡)。為了提高ActiveMQ在接受PERSISTENT Meaage時的效能,ActiveMQ允許開發人員遵從JMS API中的設定方式,為訊息傳送端在傳送PERSISTENT Meaage時提供非同步方式:

......
// 使用非同步傳輸
// 上文已經說過,如果傳送的是NON_PERSISTENT Message
// 那麼預設就是非同步方式
connectionFactory.setUseAsyncSend(true);
......

一旦您進行了這樣的設定,就需要設定回執視窗:

......
// 同樣設定訊息傳送者在累計傳送102400byte大小的訊息後
// 等待服務端進行回執,以便確定之前傳送的訊息是否被正確處理
// 確定伺服器端是否產生了過量的訊息堆積,需要減慢訊息生產端的生產速度
connectionFactory.setProducerWindowSize(102400);
......

6-2、生產者策略:事務

JMS規範中支援帶事務的訊息,也就是說您可以啟動一個事務(並由訊息傳送者的連線會話設定一個事務號Transaction ID),然後在事務中傳送多條訊息。這個事務提交前這些訊息都不會進入佇列(無論是Queue還是Topic)。

不進入佇列,並不代表JMS不會在事務提交前將訊息傳送給ActiveMQ服務端。 實際上這些訊息都會發送給服務端,服務端發現這是一條帶有Transaction ID的訊息,就會將先把這條訊息放置在“transaction store”區域中(並且帶有redo日誌,這樣保證在收到rollback指令後能進行取消操作),等待這個Transaction ID被rollback或者commit。

一旦這個Transaction ID被commit,ActiveMQ才會依據自身設定的PERSISTENT Meaage處理規則或者NON_PERSISTENT Meaage處理規則,將Transaction ID對應的message進行入隊操作(無論是Queue還是Topic)。以下程式碼示例瞭如何在生產者端使用事務傳送訊息:

......
//進行連線
connection = connectionFactory.createQueueConnection();
connection.start();

//建立會話(設定一個帶有事務特性的會話)
session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
//建立queue(當然如果有了就不會重複建立)
Queue sendQueue = session.createQueue("/test");
//建立訊息傳送者物件
MessageProducer sender = session.createProducer(sendQueue);

//傳送(JMS是支援事務的)
for(int index = 0 ; index < 10 ; index++) {
    TextMessage outMessage = session.createTextMessage();
    outMessage.setText("這是傳送的訊息內容-------------------" + index);
    // 無論是NON_PERSISTENT message還是PERSISTENT message
    // 都要在commit後才能真正的入隊
    if(index % 2 == 0) {
        sender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    } else {
        sender.setDeliveryMode(DeliveryMode.PERSISTENT);
    }

    // 沒有commit的訊息,也是要先發送給服務端的
    sender.send(outMessage);
}

session.commit();
......

以上程式碼中,在“connection.createSession”這個方法中一共有兩個引數(這句程式碼在上文中已經出現過多次)。第一個布林型引數很好理解,就是標示這個連線會話是否啟動事務;第二個整型引數標示了訊息消費者的“應答模型”,我們會在下文中進行詳細介紹。

6-3、生產者策略:ProducerFlowControl

生產流控制,是ActiveMQ訊息生產者端最為重要的效能策略,它主要設定了在ActiveMQ服務節點在產生訊息堆積,並超過限制大小的情況下,如何進行訊息生產者端的限流。

具體來說,如果以上情況在ActiveMQ出現,那麼當生產者端再次接受ActiveMQ的訊息回執時,ActiveMQ就會讓訊息生產者進入等待狀態或者在傳送者端直接丟擲JMSException。當然您也可以配置ActiveMQ不進行ProducerFlowControl,如果您對自己ActiveMQ服務端的底層效能和消費者端的效能足夠自信的話。

在ActiveMQ的主配置檔案activemq.xml中,關於ProducerFlowControl策略的控制標籤是“destinationPolicy”和它的子標籤。請看如下配置示例:

......
<destinationPolicy>
  <policyMap>
    <policyEntries>
      <policyEntry topic=">" producerFlowControl="false"/>
    </policyEntries>
  </policyMap>
</destinationPolicy>
......

以上示例配合所有的Topic模式的佇列不進行producerFlowControl策略控制。當然還可以為佇列配置啟用producerFlowControl策略:

......
<policyEntry queue=">" producerFlowControl="true" memoryLimit="200mb">

</policyEntry>
......

以上配置項表示為ActiveMQ中的所有Queue模式的佇列啟用producerFlowControl策略,並且限制每個Queue資訊的最大記憶體儲存限制(memoryLimit)為200MB;這是指的最多使用200MB的記憶體區域,而不是說Queue中訊息的總大小為200MB。例如,在ActiveMQ 5.X+ 版本中NON_PERSISTENT Message會被轉出到 temp store區域,所以有可能您觀察到的現象是,無論怎樣堆積NON_PERSISTENT Message訊息,Queue的使用記憶體始終無法達到200MB。

......
<policyEntry queue=">" producerFlowControl="true" memoryLimit="200mb">
      <pendingQueuePolicy>
          <vmQueueCursor/>
      </pendingQueuePolicy>
</policyEntry>
......

以上配置表示只使用記憶體儲存Queue中的所有訊息,特別是NON_PERSISTENT Message只儲存在記憶體中,不使用temp store區域進行轉儲。在官方文件中,有關於policyEntry標籤的所有配置選項都有完整說明:http://activemq.apache.org/per-destination-policies.html

6-4、消費者策略:Dispatch Async

討論完了訊息生產者的關鍵效能點,我們再將目光轉向訊息消費者(接收者端);就像本小節開始時描述的那樣,比起訊息生產者來說訊息消費者的效能更能影響ActiveMQ系統的整體效能,因為要成功完成一條訊息的處理,它的工作要遠遠多於訊息生產者。

首先,在預設情況下ActiveMQ服務端採用非同步方式向客戶端推送訊息。也就是說ActiveMQ服務端在向某個消費者會話推送訊息後,不會等待消費者的響應資訊,直到消費者處理完訊息後,主動向服務端返回處理結果。如果您對自己的消費者效能足夠滿意,也可以將這個過程設定為“同步”:

......
// 設定為同步
connectionFactory.setDispatchAsync(false);
......

6-5、消費者策略:Prefetch

消費者關鍵策略中,需要重點討論的是消費者“預取數量”——prefetchSize。可以想象,如果消費者端的工作策略是按照某個週期(例如1秒),主動到伺服器端一條一條請求新的訊息,那麼消費者的工作效率一定是極低的;所以ActiveMQ系統中,預設的策略是ActiveMQ服務端一旦有訊息,就主動按照設定的規則推送給當前活動的消費者。其中每次推送都有一定的數量限制,這個限制值就是prefetchSize。

針對Queue工作模型的佇列和Topic工作模型的佇列,ActiveMQ有不同的預設“預取數量”;針對NON_PERSISTENT Message和PERSISTENT Message,ActiveMQ也有不同的預設“預取數量”:

  • PERSISTENT Message—Queue:prefetchSize=1000
  • NON_PERSISTENT Message—Queue:prefetchSize=1000
  • PERSISTENT Message—Topic:prefetchSize=100
  • NON_PERSISTENT Message—Topic:prefetchSize=32766

ActiveMQ中設定的各種預設預取數量一般情況下不需要進行改變。如果您使用預設的非同步方式從伺服器端推送訊息到消費者端,且您對消費者端的效能有足夠的信心,可以加大預取數量的限制。但是非必要情況下,請不要設定prefetchSize=1,因為這樣就是一條一條的取資料;也不要設定為prefetchSize=0,因為這將導致關閉伺服器端的推送機制,改為客戶端主動請求

  • 可以通過ActiveMQPrefetchPolicy策略物件更改預取數量
......
// 預取策略物件
ActiveMQPrefetchPolicy prefetchPolicy = connectionFactory.getPrefetchPolicy();
// 設定Queue的預取數量為50
prefetchPolicy.setQueuePrefetch(50);
connectionFactory.setPrefetchPolicy(prefetchPolicy);
//進行連線
connection = connectionFactory.createQueueConnection();
connection.start();
......
  • 也可以通過Properties屬性更改(當然還可以加入其他屬性)預取數量:
......
Properties props = new Properties();
props.setProperty("prefetchPolicy.queuePrefetch", "1000");
props.setProperty("prefetchPolicy.topicPrefetch", "1000");
//設定屬性
connectionFactory.setProperties(props);
//進行連線
connection = connectionFactory.createQueueConnection();
connection.start();
......

6-5、消費者策略:事務和死信

6-5-1、消費者端事務

JMS規範除了為訊息生產者端提供事務支援以外,還為消費服務端準備了事務的支援。您可以通過在消費者端操作事務的commit和rollback方法,向伺服器告知一組訊息是否處理完成。採用事務的意義在於,一組訊息要麼被全部處理並確認成功,要麼全部被回滾並重新處理

......
//建立會話(採用commit方式確認一批訊息處理完畢)
session = connection.createSession(true, Session.SESSION_TRANSACTED);
//建立Queue(當然如果有了就不會重複建立)
sendQueue = session.createQueue("/test");
//建立訊息傳送者物件
MessageConsumer consumer = session.createConsumer(sendQueue);
consumer.setMessageListener(new MyMessageListener(session));

......

class MyMessageListener implements MessageListener {
    private int number = 0;

    /**
     * 會話
     */
    private Session session;

    public MyMessageListener(Session session) {
        this.session = session;
    }

    @Override
    public void onMessage(Message message) {
        // 列印這條訊息
        System.out.println("Message = " + message);
        // 如果條件成立,就向伺服器確認這批訊息處理成功
        // 伺服器將從佇列中刪除這些訊息
        if(number++ % 3 == 0) {
            try {
                this.session.commit();
            } catch (JMSException e) {
                e.printStackTrace(System.out);
            }
        }
    }
}

以上程式碼演示的是消費者通過事務commit的方式,向伺服器確認一批訊息正常處理完成的方式。請注意程式碼示例中的“session = connection.createSession(true, Session.SESSION_TRANSACTED);”語句。第一個引數表示連線會話啟用事務支援;第二個引數表示使用commit或者rollback的方式進行向伺服器應答。

這是呼叫commit的情況,那麼如果呼叫rollback方法又會發生什麼情況呢?呼叫rollback方法時,在rollback之前已處理過的訊息(注意,並不是所有預取的訊息)將重新發送一次到消費者端(傳送給同一個連線會話)。並且訊息中redeliveryCounter(重發計數器)屬性將會加1。請看如下所示的程式碼片段和執行結果

@Override
public void onMessage(Message message) {
    // 列印這條訊息
    System.out.println("Message = " + message);
    // rollback這條訊息
    this.session.rollback();
}

以上程式碼片段中,我們不停的回滾正在處理的這條訊息,通過打印出來的資訊可以看到,這條訊息被不停的重發:

Message = ActiveMQTextMessage {...... redeliveryCounter = 0, text = 這是傳送的訊息內容-------------------20}
Message = ActiveMQTextMessage {...... redeliveryCounter = 1, text = 這是傳送的訊息內容-------------------20}
Message = ActiveMQTextMessage {...... redeliveryCounter = 2, text = 這是傳送的訊息內容-------------------20}
Message = ActiveMQTextMessage {...... redeliveryCounter = 3, text = 這是傳送的訊息內容-------------------20}
Message = ActiveMQTextMessage {...... redeliveryCounter = 4, text = 這是傳送的訊息內容-------------------20}

可以看到同一條記錄被重複的處理,並且其中的redeliveryCounter屬性不斷累加。

6-5-2、重發和死信佇列

但是訊息處理失敗後,不斷的重發訊息肯定不是一個最好的處理辦法:如果一條訊息被不斷的處理失敗,那麼最可能的情況就是這條訊息承載的業務內容本身就有問題。那麼無論重發多少次,這條訊息還是會處理失敗。

為了解決這個問題,ActiveMQ中引入了“死信佇列”(Dead Letter Queue)的概念。即一條訊息再被重發了多次後(預設為重發6次redeliveryCounter==6),將會被ActiveMQ移入“死信佇列”。開發人員可以在這個Queue中檢視處理出錯的訊息,進行人工干預。

這裡寫圖片描述

預設情況下“死信佇列”只接受PERSISTENT Message,如果NON_PERSISTENT Message超過了重發上限,將直接被刪除。以下配置資訊可以讓NON_PERSISTENT Message在超過重發上限後,也移入“死信佇列”:

<policyEntry queue=">">  
    <deadLetterStrategy>  
        <sharedDeadLetterStrategy processNonPersistent="true" />  
    </deadLetterStrategy>  
</policyEntry>

另外,上文提到的預設重發次數redeliveryCounter的上限也是可以進行設定的,為了保證訊息異常情況下儘可能小的影響消費者端的處理效率,實際工作中建議將這個上限值設定為3。原因上文已經說過,如果訊息本身的業務內容就存在問題,那麼重發多少次也沒有用。

RedeliveryPolicy redeliveryPolicy = connectionFactory.getRedeliveryPolicy();
// 設定最大重發次數
redeliveryPolicy.setMaximumRedeliveries(3);

實際上ActiveMQ的重發機制還有包括以上提到的rollback方式在內的多種方式:

  • 在支援事務的消費者連線會話中呼叫rollback方法。

  • 在支援事務的消費者連線會話中,使用commit方法明確告知伺服器端訊息已處理成功前,會話連線就終止了(最可能是異常終止)

  • 在需要使用ACK模式的會話中,使用訊息的acknowledge方式明確告知伺服器端訊息已處理成功前,會話連線就終止了(最可能是異常終止)

但是以上幾種重發機制有一些小小的差異,主要體現在redeliveryCounter屬性的作用區域。簡而言之,第一種方法redeliveryCounter屬性的作用區域是本次連線會話,而後兩種redeliveryCounter屬性的作用區域是在整個ActiveMQ系統範圍。

6-6、消費者策略:ACK

消費者端,除了可以使用事務方式來告知ActiveMQ服務端一批訊息已經成功處理外,還可以通過JMS規範中定義的acknowledge模式來實現同樣功能。事實上acknowledge模式更為常用

6-6-1、基本使用

如果選擇使用acknowledge模式,那麼你至少有4種方式使用它,且這四種方式的效能區別很大:

  • AUTO_ACKNOWLEDGE方式:這種方式下,當消費者端通過receive方法或者MessageListener監聽方式從服務端得到訊息後(無論是pul方式還是push方式),消費者連線會話會自動認為消費者端對訊息的處理是成功的。但請注意,這種方式下消費者端不一定是向服務端一條一條ACK訊息

  • CLIENT_ACKNOWLEDGE方式:這種方式下,當消費者端通過receive方法或者MessageListener監聽方式從服務端得到訊息後(無論是pul方式還是push方式),必須顯示呼叫訊息中的acknowledge方法。如果不這樣做,ActiveMQ伺服器端將不會認為這條訊息處理成功:

public void onMessage(Message message) {
    //====================
    //這裡進行您的業務處理
    //====================
    try {
        // 顯示呼叫ack方法
        message.acknowledge();
    } catch (JMSException e) {
        e.printStackTrace();
    }
}
  • DUPS_OK_ACKNOWLEDGE方式:批量確認方式。消費者端會按照一定的策略向伺服器端間隔傳送一個ack標示,表示某一批訊息已經處理完成。DUPS_OK_ACKNOWLEDGE方式 和 AUTO_ACKNOWLEDGE方式在某些情況下是一致的,這個在後文會講到。

  • INDIVIDUAL_ACKNOWLEDGE方式:單條確認方式。這種方式是ActiveMQ單獨提供的一種方式,其常量定義的位置都不在javax.jms.Session規範介面中,而是在org.apache.activemq.ActiveMQSession這個類中。這種方式消費者端將會逐條向ActiveMQ服務端傳送ACK資訊。所以這種ACK方式的效能很差,除非您有特別的業務要求,否則不建議使用

6-6-2、工作方式和效能

筆者建議首先考慮使用AUTO_ACKNOWLEDGE方式確認訊息,如果您這樣做,那麼一定請使用optimizeACK優化選項,並且重新設定prefetchSize數量為一個較小值(因為1000條的預設值在這樣的情況下就顯得比較大了):

......

//ack優化選項(實際上預設情況下是開啟的)
connectionFactory.setOptimizeAcknowledge(true);
//ack資訊最大發送週期(毫秒)
connectionFactory.setOptimizeAcknowledgeTimeOut(5000);
connection = connectionFactory.createQueueConnection();
connection.start();
......

AUTO_ACKNOWLEDGE方式的根本意義是“延遲確認”,消費者端在處理訊息後暫時不會發送ACK標示,而是把它快取在連線會話的一個pending 區域,等到這些訊息的條數達到一定的值(或者等待時間超過設定的值),再通過一個ACK指令告知服務端這一批訊息已經處理完成;而optimizeACK選項(指明AUTO_ACKNOWLEDGE採用“延遲確認”方式)只有當消費者端使用AUTO_ACKNOWLEDGE方式時才會起效

“延遲確認”的數量閥值:prefetch * 0.65
“延遲確認”的時間閥值:> optimizeAcknowledgeTimeOut

DUPS_OK_ACKNOWLEDGE方式也是一種“延遲確認”策略,如果目標佇列是Queue模式,那麼它的工作策略與AUTO_ACKNOWLEDGE方式是一樣的。也就是說,如果這時prefetchSize =1 或者沒有開啟optimizeACK,也會逐條訊息傳送ACK標示;如果目標佇列是Topic模式,那麼無論optimizeACK是否開啟,都會在消費的訊息個數>=prefetch * 0.5時,批量確認這些訊息。

6-7、消費者和生產者效能總結

本小節我們介紹了基於ActiveMQ構建的訊息佇列系統中,生產者和消費者需要關注的重要效能點。但是整個ActiveMQ中的效能還需要各位讀者在實際工作中,一點一點的去挖掘。這裡我們根據已經介紹過的效能關注點進行總結:

  • 傳送NON_PERSISTENT Message和傳送PERSISTENT Message是有效能差異的。引起這種差異的原因是前者不需要進行持久化儲存;但是這樣的效能差異在某些情況下會縮小,例如傳送NON_PERSISTENT Message時,由於消費者效能不夠導致訊息堆積,這時NON_PERSISTENT Message會被轉儲到物理磁碟上的“temp store”區域。

  • 傳送帶有事務的訊息和傳送不帶有事務的訊息,在伺服器端的處理效能也是有顯著區別的。引起這種差異的原因是帶有事務的訊息會首先記錄在伺服器端的“transaction store”區域,並且伺服器端會帶有redo日誌,這樣保證傳送者端在傳送commit指令或者rollback指令時,伺服器會完整相應的處理。

  • ActiveMQ中,為訊息生產者所設定的ProducerFlowControl策略非常重要,它確定訊息在ActiveMQ服務端產生大量堆積的情況下,ActiveMQ將減緩接收訊息,保證了ActiveMQ能夠穩定的工作。您可以通過配置檔案設定ProducerFlowControl策略的生效閥值,甚至可以關閉ProducerFlowControl策略(當然不建議這樣做)。

  • 訊息生產者端和訊息消費者端都可以通過“非同步”方式和伺服器進行通訊(但是意義不一樣)。在生產者端傳送非同步訊息,一定要和ProducerWindowSize(回執視窗期)的設定共同使用;在消費者非同步接受訊息時,要記住有Prefetch這個關鍵的預取數值,並且PrefetchSize在非必要情況下不要設定為1。很顯然適合的PrefetchSize將改善服務端和消費者端的效能。

  • JMS規範中,訊息消費者端也是支援事務的。所謂消費者端的事務是指:一組訊息要麼全部被commit(這時消費者會向服務端傳送ACK表示),要麼全部被rollback(這時同一個消費者端會向自己重發這些訊息,並且這些訊息的redeliveryCounter屬性+1);進行訊息的重發是非常消耗消費者端效能的一件事情,這是因為在這個連線會話中,被Prefetch但是還沒有被處理的訊息將一直等待重發的訊息最終被確認。

  • 為了避免帶有錯誤業務資訊的訊息被無止境的重發,從而影響整個訊息系統的效能。在ActiveMQ中為超過MaximumRedeliveries閥值(預設值為6,但是很明顯預設值太高了,建議設定為3)的訊息準備了“死信佇列”。

  • 只有伺服器收到了一條或者一組訊息的ACK標示,才會認為這條或者這組訊息被成功過的處理了。在消費者端有4種ACK工作模式,建議優先選擇AUTO_ACKNOWLEDGE。如果您這樣做了,那麼請一定重新改小預取數量、設定OptimizeAcknowledge為true、重設OptimizeAcknowledgeTimeOut時間。這樣才能保證AUTO_ACKNOWLEDGE方式工作在“延遲確認”模式下,以便優化ACK效能。

(接下文:我們將開始介紹ActiveMQ的持久化儲存方案和高可用方案)

相關推薦

架構設計系統通訊23——提高ActiveMQ工作效能

6、ActiveMQ處理規則和優化 在ActiveMQ單個服務節點的優化中,除了對ActiveMQ單個服務節點的網路IO模型進行優化外,生產者傳送訊息的策略和消費者處理訊息的策略也關乎整個訊息佇列系統是否能夠高效工作。請看下圖所示的訊息生產者和訊息消費

架構設計系統通訊36——Apache Camel快速入門

架構設計:系統間通訊(36)——Apache Camel快速入門(上) :http://blog.csdn.net/yinwenjie(未經允許嚴禁用於商業用途!) https://blog.csdn.net/yinwenjie/article/details/51692340 1、本專題主

架構設計系統通訊34——被神化的ESB

1、概述 從本篇文章開始,我們將花一到兩篇的篇幅介紹ESB(企業服務匯流排)技術的基本概念,為讀者們理清多個和ESB技術有關名詞。我們還將在其中為讀者闡述什麼情況下應該使用ESB技術。接下來,為了加深讀者對ESB技術的直觀理解,我們將利用Apache Came

架構設計系統通訊16——服務治理與Dubbo 中篇預熱

1、前序 上篇文章中(《架構設計:系統間通訊(15)——服務治理與Dubbo 上篇》),我們以示例的方式講解了阿里DUBBO服務治理框架基本使用。從這節開始我們將對DUBBO的主要模組的設計原理進行講解,從而幫助讀者理解DUBBO是如何工作的。(由於這個章節的內容比較多,包括了知識準備、DUBBO框架概述

架構設計系統通訊6——IO通訊模型和Netty 上篇

1、Netty介紹 在Netty官網上,對於Netty的介紹是: Netty is a NIO client server framework which enables quick and easy development of network ap

架構設計系統通訊15——服務治理與Dubbo 上篇

1、上篇中“自定義服務治理框架”的問題 在之前的文章中(《架構設計:系統間通訊(13)——RPC例項Apache Thrift 下篇(1)》、《架構設計:系統間通訊(14)——RPC例項Apache Thrift 下篇(2)》),我們基於服務治理的基本原理,自

架構設計系統通訊21——ActiveMQ的安裝與使用

1、前言 之前我們通過兩篇文章(架構設計:系統間通訊(19)——MQ:訊息協議(上)、架構設計:系統間通訊(20)——MQ:訊息協議(下))從理論層面上為大家介紹了訊息協議的基本定義,並花了較大篇幅向讀者介紹了三種典型的訊息協議:XMPP協議、Stomp協議和

架構設計系統通訊10——RPC的基本概念

1、概述 經過了詳細的資訊格式、網路IO模型的講解,並且通過JAVA RMI的講解進行了預熱。從這篇文章開始我們將進入這個系列博文的另一個重點知識體系的講解:RPC。在後續的幾篇文章中,我們首先講解RPC的基本概念,一個具體的RPC實現會有哪些基本要素構成,然

架構設計系統通訊40——自己動手設計ESB1

1、概述 在我開始構思這幾篇關於“自己動手設計ESB中介軟體”的文章時,曾有好幾次動過放棄的念頭。原因倒不是因為對冗長的文章產生了惰性,而是ESB中所涉及到的技術知識和需要突破的設計難點實在是比較多,再冗長的幾篇博文甚至無法對它們全部進行概述,另外如果在思路上

架構設計系統通訊39——Apache Camel快速入門下2

4-2-1、LifecycleStrategy LifecycleStrategy介面按照字面的理解是一個關於Camel中元素生命週期的規則管理器,但實際上LifecycleStrategy介面的定義更確切的應該被描述成一個監聽器: 當Camel

架構設計系統通訊24——提高ActiveMQ工作效能

7、ActiveMQ的持久訊息儲存方案 前文已經講過,當ActiveMQ接收到PERSISTENT Message訊息後就需要藉助持久化方案來完成PERSISTENT Message的儲存。這個介質可以是磁碟檔案系統、可以是ActiveMQ的內建資料庫

架構設計系統通訊37——Apache Camel快速入門

(補上文:Endpoint重要的漏講內容) 3-1-2、特殊的Endpoint Direct Endpoint Direct用於在兩個編排好的路由間實現Exchange訊息的連線,上一個路由中由最後一個元素處理完的Exchange物件,將被髮送至由D

架構設計系統通訊28——Kafka及場景應用1

在本月初的寫作計劃中,我本來只打算粗略介紹一下Kafka(同樣是因為進度原因)。但是,最近有很多朋友要求我詳細講講Kafka的設計和使用,另外兩年前我在研究Kafka準備將其應用到生產環境時,由於沒有仔細理解Kafka的設計結構所導致的問題最後也還沒有進行交

架構設計系統通訊26——ActiveMQ叢集方案

3、ActiveMQ熱備方案 ActiveMQ熱備方案,主要保證ActiveMQ的高可用性。這種方案並不像上節中我們主要討論的ActiveMQ高效能方案那樣,同時有多個節點都處於工作狀態,也就是說這種方案並不提高ActiveMQ叢集的效能;而是從叢集中的多

架構設計系統通訊38——Apache Camel快速入門下1

3-5-2-3迴圈動態路由 Dynamic Router 動態迴圈路由的特點是開發人員可以通過條件表示式等方式,動態決定下一個路由位置。在下一路由位置處理完成後Exchange將被重新返回到路由判斷點,並由動態迴圈路由再次做出新路徑的判斷。如此迴圈執行

架構設計系統通訊2——概述從“聊天”開始下篇

【轉】https://blog.csdn.net/yinwenjie/article/details/48344989 4-3、NIO通訊框架 目前流行的NIO框架非常的多。在論壇上、網際網路上大家討論和使用最多的有以下幾種: 原生JAVA NIO框架:

架構設計系統通訊——ActiveMQ叢集方案

1、綜述 通過之前的文章,我們討論了ActiveMQ的基本使用,包括單個ActiveMQ服務節點的效能特徵,關鍵調整引數;我們還介紹了單個ActiveMQ節點上三種不同的持久化儲存方案,並討論了這三種不同的持久化儲存方案的配置和效能特點。但是這還遠遠不夠,因為在生產環境

架構設計系統通訊——MQ訊息協議

1、概述從本文開始,我們介紹另一型別的系統間通訊及輸:MQ訊息佇列。首先我們將討論幾種常用訊息佇列協議的基本原理和工作方式,包括MQTT、XMPP、Stomp、AMQP、OpenWire等。然後在這個基礎上介紹兩款MQ產品:ActiveMQ和RabbitMQ,它們是現在業務系

架構設計系統存儲28——分布式文件系統Ceph掛載

all 兩個文件 原因 之前 來看 大數據 details 失敗 variable (接上文《架構設計:系統存儲(27)——分布式文件系統Ceph(安裝)》) 3. 連接到Ceph系統 3-1. 連接客戶端 完畢Ceph文件系統的創建過程後。就

架構設計系統儲存18——Redis叢集方案高效能

1、概述 通過上一篇文章(《架構設計:系統儲存(17)——Redis叢集方案:高可用》)的內容,Redis主從複製的基本功能和進行Redis高可用叢集監控的Sentinel基本功能基本呈現給了讀者。雖然本人並不清楚上一篇根據筆者實際工作經驗所撰寫的文章有什麼重