1. 程式人生 > >ActiveMQ訊息傳送機制以及ACK機制

ActiveMQ訊息傳送機制以及ACK機制

    AcitveMQ是作為一種訊息儲存和分發元件,涉及到client與broker端資料互動的方方面面,它不僅要擔保訊息的儲存安全性,還要提供額外的手段來確保訊息的分發是可靠的。

一. ActiveMQ訊息傳送機制

    Producer客戶端使用來發送訊息的, Consumer客戶端用來消費訊息;它們的協同中心就是ActiveMQ broker,broker也是讓producer和consumer呼叫過程解耦的工具,最終實現了非同步RPC/資料交換的功能。隨著ActiveMQ的不斷髮展,支援了越來越多的特性,也解決開發者在各種場景下使用ActiveMQ的需求。比如producer支援非同步呼叫;使用flow control機制讓broker協同consumer的消費速率;consumer端可以使用prefetchACK來最大化訊息消費的速率;提供"重發策略"等來提高訊息的安全性等。在此我們不詳細介紹。

    一條訊息的生命週期如下:


三星的Win8平板如何現在就有產品了,不是剛釋出麼
  

     圖片中簡單的描述了一條訊息的生命週期,不過在不同的架構環境中,message的流動行可能更加複雜.將在稍後有關broker的架構中詳解..一條訊息從producer端發出之後,一旦被broker正確儲存,那麼它將會被consumer消費,然後ACK,broker端才會刪除;不過當訊息過期或者儲存裝置溢位時,也會終結它。


三星的Win8平板如何現在就有產品了,不是剛釋出麼
 

     這是一張很複雜,而且有些凌亂的圖片;這張圖片中簡單的描述了:1)producer端如何傳送訊息 2) consumer端如何消費訊息 3) broker端如何排程。如果用文字來描述圖示中的概念,恐怕一言難盡。圖示中,提及到prefetchAck,以及訊息同步、非同步傳送的基本邏輯;這對你瞭解下文中的ACK機制將有很大的幫助。

二. optimizeACK

    "可優化的ACK",這是ActiveMQ對於consumer在訊息消費時,對訊息ACK的優化選項,也是consumer端最重要的優化引數之一,你可以通過如下方式開啟:

    1) 在brokerUrl中增加如下查詢字串: 

String brokerUrl = "tcp://localhost:61616?" + 
                   "jms.optimizeAcknowledge=true" + 
                   "&jms.optimizeAcknowledgeTimeOut=30000" + 
                   "&jms.redeliveryPolicy.maximumRedeliveries=6";
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerUrl);

    2) 在destinationUri中,增加如下查詢字串:

String queueName = "test-queue?customer.prefetchSize";
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination queue = session.createQueue(queueName);

    我們需要在brokerUrl指定optimizeACK選項,在destinationUri中指定prefetchSize(預獲取)選項,其中brokerUrl引數選項是全域性的,即當前factory下所有的connection/session/consumer都會預設使用這些值;而destinationUri中的選項,只會在使用此destination的consumer例項中有效;如果同時指定,brokerUrl中的引數選項值將會被覆蓋。optimizeAck表示是否開啟“優化ACK”,只有在為true的情況下,prefetchSize(下文中將會簡寫成prefetch)以及optimizeAcknowledgeTimeout引數才會有意義。此處需要注意"optimizeAcknowledgeTimeout"選項只能在brokerUrl中配置。

    prefetch值建議在destinationUri中指定,因為在brokerUrl中指定比較繁瑣;在brokerUrl中,queuePrefetchSize和topicPrefetchSize都需要單獨設定:"&jms.prefetchPolicy.queuePrefetch=12&jms.prefetchPolicy.topicPrefetch=12"等來逐個指定。

    如果prefetchACK為true,那麼prefetch必須大於0;當prefetchACK為false時,你可以指定prefetch為0以及任意大小的正數。不過,當prefetch=0是,表示consumer將使用PULL(拉取)的方式從broker端獲取訊息,broker端將不會主動push訊息給client端,直到client端傳送PullCommand時;當prefetch>0時,就開啟了broker push模式,此後只要當client端消費且ACK了一定的訊息之後,會立即push給client端多條訊息。

    當consumer端使用receive()方法同步獲取訊息時,prefetch可以為0和任意正值;當prefetch=0時,那麼receive()方法將會首先發送一個PULL指令並阻塞,直到broker端返回訊息為止,這也意味著訊息只能逐個獲取(類似於Request<->Response),這也是Activemq中PULL訊息模式;當prefetch > 0時,broker端將會批量push給client 一定數量的訊息(<= prefetch),client端會把這些訊息(unconsumedMessage)放入到本地的佇列中,只要此佇列有訊息,那麼receive方法將會立即返回,當一定量的訊息ACK之後,broker端會繼續批量push訊息給client端。

    當consumer端使用MessageListener非同步獲取訊息時,這就需要開發設定的prefetch值必須 >=1,即至少為1;在非同步消費訊息模式中,設定prefetch=0,是相悖的,也將獲得一個Exception。

    此外,我們還可以brokerUrl中配置“redelivery”策略,比如當一條訊息處理異常時,broker端可以重發的最大次數;和下文中提到REDELIVERED_ACK_TYPE互相協同。當訊息需要broker端重發時,consumer會首先在本地的“deliveredMessage佇列”(Consumer已經接收但還未確認的訊息佇列)刪除它,然後向broker傳送“REDELIVERED_ACK_TYPE”型別的確認指令,broker將會把指令中指定的訊息重新新增到pendingQueue(亟待發送給consumer的訊息佇列)中,直到合適的時機,再次push給client。

    到目前為止,或許你知道了optimizeACK和prefeth的大概意義,不過我們可能還會有些疑惑!!optimizeACK和prefetch配合,將會達成一個高效的訊息消費模型:批量獲取訊息,並“延遲”確認(ACK)prefetch表達了“批量獲取”訊息的語義,broker端主動的批量push多條訊息給client端,總比client多次傳送PULL指令然後broker返回一條訊息的方式要優秀很多,它不僅減少了client端在獲取訊息時阻塞的次數和阻塞的時間,還能夠大大的減少網路開支。optimizeACK表達了“延遲確認”的語義(ACK時機),client端在消費訊息後暫且不傳送ACK,而是把它快取下來(pendingACK),等到這些訊息的條數達到一定閥值時,只需要通過一個ACK指令把它們全部確認;這比對每條訊息都逐個確認,在效能上要提高很多。由此可見,prefetch優化了訊息傳送的效能,optimizeACK優化了訊息確認的效能。

    當consumer端訊息消費的速率很高(相對於producer生產訊息),而且訊息的數量也很大時(比如訊息源源不斷的生產),我們使用optimizeACK + prefetch將會極大的提升consumer的效能。不過反過來:

    1) 如果consumer端消費速度很慢(對訊息的處理是耗時的),過大的prefetchSize,並不能有效的提升效能,反而不利於consumer端的負載均衡(只針對queue);按照良好的設計準則,當consumer消費速度很慢時,我們通常會部署多個consumer客戶端,並使用較小的prefetch,同時關閉optimizeACK,可以讓訊息在多個consumer間“負載均衡”(即均勻的傳送給每個consumer);如果較大的prefetchSize,將會導致broker一次性push給client大量的訊息,但是這些訊息需要很久才能ACK(訊息積壓),而且在client故障時,還會導致這些訊息的重發。

    2) 如果consumer端消費速度很快,但是producer端生成訊息的速率較慢,比如生產者10秒鐘生成10條訊息,但是consumer一秒就能消費完畢,而且我們還部署了多個consumer!!這種場景下,建議開啟optimizeACK,但是需要設定較小的prefetchSize;這樣可以保證每個consumer都能有"活幹",否則將會出現一個consumer非常忙碌,但是其他consumer幾乎收不到訊息。

    3) 如果訊息很重要,特別是不原因接收到”redelivery“的訊息,那麼我們需要將optimizeACK=false,prefetchSize=1

    既然optimizeACK是”延遲“確認,那麼就引入一種潛在的風險:在訊息被消費之後還沒有來得及確認時,client端發生故障,那麼這些訊息就有可能會被重新發送給其他consumer,那麼這種風險就需要client端能夠容忍“重複”訊息。

    prefetch值預設為1000,當然這個值可能在很多場景下是偏大的;我們暫且不考慮ACK_MODE(參見下文),通常情況下,我們只需要簡單的統計出單個consumer每秒的最大消費訊息數即可,比如一個consumer每秒可以處理100個訊息,我們期望consumer端每2秒確認一次,那麼我們的prefetchSize可以設定為100 * 2 /0.65大概為300。無論如何設定此值,client持有的訊息條數最大為:prefetch + “DELIVERED_ACK_TYPE訊息條數”(DELIVERED_ACK_TYPE參見下文)

     即使當optimizeACK為true,也只會當session的ACK_MODE為AUTO_ACKNOWLEDGE時才會生效,即在其他型別的ACK_MODE時consumer端仍然不會“延遲確認”,即:

consumer.optimizeAck = connection.optimizeACK && session.isAutoAcknowledge()

    當consumer.optimizeACK有效時,如果客戶端已經消費但尚未確認的訊息(deliveredMessage)達到prefetch * 0.65,consumer端將會自動進行ACK;同時如果離上一次ACK的時間間隔,已經超過"optimizeAcknowledgeTimout"毫秒,也會導致自動進行ACK。

    此外簡單的補充一下,批量確認訊息時,只需要在ACK指令中指明“firstMessageId”和“lastMessageId”即可,即訊息區間,那麼broker端就知道此consumer(根據consumerId識別)需要確認哪些訊息。

 
三. ACK模式與型別介紹


    JMS API中約定了Client端可以使用四種ACK_MODE,在javax.jms.Session介面中:

  • AUTO_ACKNOWLEDGE = 1    自動確認
  • CLIENT_ACKNOWLEDGE = 2    客戶端手動確認   
  • DUPS_OK_ACKNOWLEDGE = 3    自動批量確認
  • SESSION_TRANSACTED = 0    事務提交併確認

    此外AcitveMQ補充了一個自定義的ACK_MODE:

  • INDIVIDUAL_ACKNOWLEDGE = 4    單條訊息確認