1. 程式人生 > >RocketMQ基礎概念及使用總結

RocketMQ基礎概念及使用總結

一.瞭解RocketMQ?

rocketMQ是阿里開源的一款十分優秀的訊息佇列,rocketMQ具有很多其他訊息佇列不具有的特性,更重要的是rocketMQ是用java開發的學習成本較低,並且經歷了雙11的資料洪峰的考驗。rocketMQ已經加入了apache,成為apache的頂級專案,最近阿里的另一款開源專案dubbo也重新開始維護。

阿里在RocketMQ 專案基礎上衍生的專案如下:

com.taobao.metaq v3.0 = RocketMQ + 淘寶個性化需求為淘寶應用提供訊息服務。

om.alipay.zpullmsg v1.0 = RocketMQ + 支付寶個性化需求為支付寶應用提供訊息服務

com.alibaba.commonmq v1.0 = Notify + RocketMQ + B2B 個性化需求為 B2B 應用提供訊息服務

目前RocketMQ的程式碼託管在github上: 老的地址:https://github.com/alibaba/RocketMQ 新的地址:https://github.com/apache/incubator-rocketmq

中介軟體比較:

https://rocketmq.apache.org/docs/motivation/

https://help.aliyun.com/document_detail/52577.html?spm=5176.7946988.881668.1.754942betpCaPZ

二.RocketMQ 是什麼?

 1.是一個佇列模型的訊息中介軟體,具有高效能、高可靠、高實時、分散式特點。

 2.Producer、Consumer佇列都可以分散式。

 3.Producer向一些佇列輪流傳送訊息,佇列集合稱為 Topic,Consumer 如果做廣播消費,則一個consumer例項消費這個Topic 對應的所有佇列,如果做叢集消費,則多個Consumer 例項平均消費這個topic對應的佇列集合。(預設是叢集消費)

 4.能夠保證嚴格的訊息順序(因為效能原因,不能保證訊息不重複,因為總有網路不可達的情況發生,需業務端保證)。

 5.提供豐富的訊息拉取模式

 6.高效的訂閱者水平擴充套件能力

 7.實時的訊息訂閱機制

 8.億級訊息堆積能力

 9.較少的依賴

三.RocketMQ的基本概念 1.Name Server

  它是一個幾乎無狀態節點,可叢集部署,節點之間無任何資訊同步。(2.X版本之前rocketMQ使用zookeeper做topic路由管理)。Name Server 是專為 RocketMQ設計的輕量級名稱服務,程式碼小於1000行,具有簡單、可叢集橫吐擴充套件、無狀態等特點。將要支援的主備自動切換功能會強依賴 Name Server。 2. Broker

Broker 部署相對複雜,Broker分為Master與Slave,一個Master可以對應多個Slave,但是一個Slave只能對應一個Master,Master與Slave的對應關係通過指定相同的BrokerName,不同的BrokerId來定義,BrokerId為0表示Master,非0表示Slave。Master也可以部署多個。每個Broker與Name Server 叢集中的所有節點建立長連線,定時註冊Topic資訊到所有Name Server。

  3. Producer

Producer 與Name Server叢集中的其中一個節點(隨機選擇,但不同於上一次)建立長連線,定期從Name Server取Topic路由資訊,並向提供Topic服務的Master建立長連線,且定時向Master傳送心跳。Producer完全無狀態,可叢集部署。 4. Consumer

Consumer與Name Server叢集中的其中一個節點(隨機選擇,但不同於上一次)建立長連線,定期從Name Server 取Topic路由資訊,並向提供Topic服務的Master、Slave建立長連線,且定時向Master、Slave傳送心跳。Consumer既可以從Master訂閱訊息,也可以從Slave訂閱訊息,訂閱規則由Broker配置決定(目前版本沒有找到可配置的地方,可以在原碼裡修改)。

5.Producer Group

用來表示一個収送訊息應用,一個Producer Group下包含多個Producer例項,可以是多臺機器,也可以是一臺機器的多個迕程,或者一個程序的多個Producer物件。一個Producer Group可以傳送多個Topic訊息,Producer Group作用如下:

1.標識一類 Producer

2.可以通過運維工具查詢這個傳送訊息應用下有多個Producer例項

3.傳送分散式事務訊息時,如果 Producer中途意外宕機,Broker會主動回撥 Producer Group內的任意一臺機器來確認事務狀態。 6.Consumer Group

用來表示一個消費訊息應用,一個Consumer Group下包含多個Consumer例項,可以是多臺機器,也可以是多個程序,或者是一個程序的多個Consumer物件。一個Consumer Group下的多個Consumer以均攤方式消費訊息,如果設定為廣播方式,那麼這個 Consumer Group下的每個例項都消費全量資料。

三.生產訊息

producer配置

傳送訊息注意事項

  1.一個應用盡可能用一個Topic,訊息子型別用tags來標識,tags可以由應用自由設定。只有傳送訊息設定了tags,消費方在訂閱訊息時,才可以利用tags在broker做訊息過濾。 message.setTags("TagA");

  2.每個訊息在業務局面的唯一標識碼,要設定到keys欄位,方便將來定位訊息丟失問題。伺服器會為每個訊息建立索引(雜湊索引),應用可以通過topic,key來查詢這條訊息內 容,以及訊息被誰消費。由於是雜湊索引,請務必保證key儘可能唯一,這樣可以避免潛在的雜湊衝突。 //訂單Id String orderId = "20034568923546"; message.setKeys(orderId);

  3.訊息傳送成功或者失敗,要列印訊息日誌,務必要列印sendresult和key欄位。

  4.send訊息方法,只要不拋異常,就代表傳送成功。但是傳送成功會有多個狀態,在sendResult裡定義。

SEND_OK 訊息傳送成功

FLUSH_DISK_TIMEOUT 訊息傳送成功,但是伺服器刷盤超時,訊息已經迕入伺服器佇列,只有此時伺服器宕機,訊息才會丟失

FLUSH_SLAVE_TIMEOUT 訊息傳送成功,但是伺服器同步到 Slave時超時,訊息已經迕入伺服器佇列,只有此時伺服器宕機,訊息才會丟失

SLAVE_NOT_AVAILABLE 訊息傳送成功,但是此時 slave 不可用,訊息已經迕入伺服器佇列,只有此時伺服器宕機,訊息才會丟失

 對於精確傳送順序訊息的應用,由於順序訊息的侷限性,可能會涉及到主備自動切換問題,所以如果sendresult中的status欄位不等於SEND_OK,就應該嘗試重試。對於其他應用,則沒有必要返樣。

   5. 對於訊息不可丟失應用,務必要有訊息重發機制,例如如果訊息傳送失敗,儲存到資料庫,能有定時程式嘗試重發,或者人工觸發重發。

訊息傳送失敗如何處理

Producer的send方法本身支援內部重試,重試邏輯如下:

 1.至多重試 3 次。

 2.如果傳送失敗,則輪轉到下一個 Broker。

 3.這個方法的總耗時時間不超過 sendMsgTimeout設定的值,預設10s。所以,如果本身向broker傳送訊息產生超時異常,就不會再做重試。

以上策略仍然不能保證訊息一定傳送成功,為保證訊息一定成功,建議應用這樣做:

如果呼叫send同步方法傳送失敗,則嘗試將訊息儲存到db,由後臺執行緒定時重試,保證訊息一定到達Broker。

上述 db 重試方式為什麼沒有整合到 MQ客戶端內部做,而是要求應用自己去完成,阿里主要是基於以下幾點考慮:

1.MQ的客戶端設計為無狀態模式,方便任意的水平擴充套件,且對機器資源的消耗僅僅是cpu、記憶體、網路。

2.如果MQ客戶端內部整合一個KV儲存模組,那麼資料只有同步落盤才能較可靠,而同步落盤本身效能開銷較大,所以通常會採用非同步落盤,又由於應用關閉過程不受MQ運維人員控制,可能經常會發生kill -9 這樣暴力方式關閉,造成資料沒有及時落盤而丟失。

3.Producer所在機器的可靠性較低,一般為虛擬機器,不適合儲存重要資料。

 綜上,建議重試過程交由應用來控制。

四.消費訊息

消費過程要做到冪等(即消費端去重)

RocketMQ 無法避免訊息重複,所以如果業務對消費重複非常敏感,務必要在業務局面去重,有以下幾種去重方式:

1.將訊息的唯一鍵,可以是 msgId,也可以是訊息內容中的唯一標識欄位,例如訂單Id等,消費之前判斷是否在 Db或Tair(全域性KV儲存)中存在,如果不存在則插入入,並消費,否則跳過。(實際過程要考慮原子性問題,判斷是否存在可以嘗試插入,如果報主鍵衝突,則插入失敗,直接跳過) msgId一定是全句唯一識別符號,但是可能會存在同樣的訊息有兩個不同 msgId的情況(有多種原因),這種情況可能會使業務上重複消費,建議最好使用訊息內容中的唯一標識欄位去重。

2.使用業務局面的狀態機去重 。

RocketMQ的Consumer都是從Broker拉訊息來消費,但是為了能做到實時收訊息,RocketMQ使用長輪詢方式,可以保證訊息實時性同Push方式一致。'

push consumer配置

pull consumer配置

message資料結構

針對producer

在Producer端,使用com.alibaba.rocketmq.common.message.Message這個資料結構,由於Broker會為Message增加資料結構,所以訊息到達Consumer後,會在Message基礎之上增加多個欄位,Consumer看到的是com.alibaba.rocketmq.common.message.MessageExt返個數據結構,MessageExt繼承於Message。

批量方式消費

某些業務流程如果支援批量方式消費,則可以很大程度上提高消費吞吏量,例如訂單扣款類應用,一次處理一個訂單耗時1秒鐘,一次處理10個訂單可能也只耗時2秒鐘,這樣即可大幅度提高消費的吞吐量,通過設定consumer的consumeMessageBatchMaxSize這個引數,預設是1,即一次只消費一條訊息,例如設定為N,那麼每次消費的 訊息數小於等於N。

五.RocketMQ 儲存特點

RocketMQ 參考了Kafka的持久化方式,充分利用Linux檔案系統記憶體cache來提高效能。

Consumer消費訊息過程,使用了零拷貝,零拷貝包含以下兩種方式 :

1.使用 mmap + write 方式 優點:即使頻繁呼叫,使用小塊檔案傳輸,效率也很高缺點:不能很好的利用DMA方式,會比sendfile多消耗CPU,記憶體安全性控制複雜,需要避免JVM Crash問題。

2.使用 sendfile 方式優點:可以利用DMA方式,消耗CPU較少,大塊檔案傳輸效率高,無記憶體安全新問題。 缺點:小塊檔案效率低於mmap方式,只能是BIO方式傳輸,不能使用NIO。

RocketMQ 選擇了第一種方式,mmap+write方式,因為有小塊資料傳輸的需求,效果會比sendfile更好。

資料儲存結構

RocketMQ的所有訊息都是持久化的,先寫入系統PAGECACHE,然後刷盤,可以保證記憶體不磁碟都有一份資料,訪問時,直接從記憶體讀取。

叢集管理夠工具

http://192.168.102.35:8080/rocketmq-console/cluster/list.do

參考文件:

rocketmq開發指南-v3.2.4 RocketMQ運維指令

--------------------- 作者:SIMILAR_ZHANG 來源:CSDN 原文:https://blog.csdn.net/qq_32711825/article/details/78579864 版權宣告:本文為博主原創文章,轉載請附上博文連結!