1. 程式人生 > >再談消息隊列技術

再談消息隊列技術

復雜 broker 需要 分享 雙向 並發 關閉 ring sta

上周,我們舉辦了第二屆技術沙龍,我這邊主要演講了消息隊列技術的議題,現分享給大家:

在我們團隊內部,隨著消息應用中心(任務中心)的廣泛應用,有時候我們感覺不到消息隊列的存在,但這不影響消息隊列在高可用、分布式、高並發架構下的核心地位。

消息隊列都應用到了哪些實際的應用場景中?

一、再談消息隊列的應用場景

  1. 異步處理:例如短信通知、終端狀態推送、App推送、用戶註冊等
  2. 數據同步:業務數據推送同步
  3. 重試補償:記賬失敗重試
  4. 系統解耦:通訊上下行、終端異常監控、分布式事件中心
  5. 流量消峰:秒殺場景下的下單處理
  6. 發布訂閱:HSF的服務狀態變化通知、分布式事件中心
  7. 高並發緩沖:日誌服務、監控上報

但是,我們對消息隊列的底層技術和原理還是不了解,那麽我們馬上開始吧…

二、消息隊列的一些基本概念和簡單原理

1. Broker

Broker的概念來自與Apache ActiveMQ,通俗的講就是MQ的服務器。

2. 消息的生產者、消費者

消息生產者Producer:發送消息到消息隊列。

消息消費者Consumer:從消息隊列接收消息。

技術分享

3. 點對點消息隊列模型

消息生產者向一個特定的隊列發送消息,消息消費者從該隊列中接收消息;

消息的生產者和消費者可以不同時處於運行狀態。

每一個成功處理的消息都由消息消費者簽收確認(Acknowledge)。如圖:

技術分享

4. 發布訂閱消息模型-Topic

發布訂閱消息模型中,支持向一個特定的主題Topic發布消息,0個或多個訂閱者接收來自這個消息主題的消息。在這種模型下,發布者和訂閱者彼此不知道對方。實際操作過程中,

發布訂閱消息模型中,支持向一個特定的主題Topic發布消息,0個或多個訂閱者接收來自這個消息主題的消息。在這種模型下,發布者和訂閱者彼此不知道對方。實際操作過程中,

必須先訂閱,再發送消息,而後接收訂閱的消息,這個順序必須保證。

技術分享

5. 消息的順序性保證

基於Queue消息模型,利用FIFO先進先出的特性,可以保證消息的順序性。

6. 消息的ACK確認機制

即消息的Ackownledge

確認機制,

為了保證消息不丟失,消息隊列提供了消息Acknowledge機制,即ACK機制,當Consumer確認消息已經被消費處理,發送一個ACK給消息隊列,此時消息隊列便可以刪除這個消息了。如果Consumer宕機/關閉,沒有發送ACK,消息隊列將認為這個消息沒有被處理,會將這個消息重新發送給其他的Consumer重新消費處理。

7. 消息的持久化

消息的持久化,對於一些關鍵的核心業務來說是非常重要的,啟用消息持久化後,消息隊列宕機重啟後,消息可以從持久化存儲恢復,消息不丟失,可以繼續消費處理。

8. 消息的同步和異步收發

同步:消息的收發支持同步收發的方式。

同時還有另一種同步方式:同步收發場景下,消息生產者和消費者雙向應答模式,例如:張三寫封信送到郵局中轉站,然後李四從中轉站獲得信,然後在寫一份回執信,放到中轉站,然後張三去取,當然張三寫信的時候就得寫明回信地址

消息的接收如果以同步的方式(Pull)進行接收,如果隊列中為空,此時接收將處於同步阻塞狀態,會一直等待,直到消息的到達。

異步:消息的收發同樣支持異步方式:異步發送消息,不需要等待消息隊列的接收確認;異步接收消息,以Push的方式觸發消息消費者接收消息。

9. 消息的事務支持

消息的收發處理支持事務,例如:在任務中心場景中,一次處理可能涉及多個消息的接收、處理,這處於同一個事務範圍內,如果一個消息處理失敗,事務回滾,消息重新回到隊列中。

三、我們對消息隊列的實際使用

我們使用了兩種消息隊列組件:

RabbitMQ:高可用、高可靠消息應用場景,例如記賬失敗重試、通知服務,消息不允許丟

Kafka:高性能消息應用場景,例如日誌、監控,消息允許丟失。

在此之上,我們封裝了消息應用中心、日誌服務等核心組件和服務。那麽,消息應用中心和日誌都用到了消息隊列什麽技術? 幹貨來了…

1. 消息應用中心

消息應用中心(任務中心)使用了消息隊列的異步處理、數據同步、重試補償、系統解耦、流量消峰等特性。其中:

消息應用中心(任務中心),支持RabbitMQ和Kafka兩種消息通道,支持在任務元數據層面設置

任務:就是一個包含了任務執行上下文的消息,同時代表了異步處理

任務發送者(ITaskSender)發送任務:消息的生產者將任務消息發送的消息隊列

任務類型:消息隊列名稱,例如:HaKeepAcco***Queue,充電補償記賬隊列

消息隊列:任務的臨時存儲

任務中心:任務集中處理,消息消費者

任務處理完成:消息Ack確認

任務的多級重試:多個重試消息隊列,HaSysTaskStore2Queue

2. 日誌組件

日誌組件,使用了消息隊列的高並發緩沖和發布訂閱特性。其中:

日誌組件使用Kafka作為消息通道,因為Kafka的性能好,吞吐量大, 可以容忍偶爾的消息數據丟失

日誌組件使用發布訂閱的消息模型

日誌組件包含日誌服務SDK和日誌HSF服務,二者都是消息的生產者Producer

日誌類型:消息的Topic主題

日誌處理器:消息的消費者、Topic的訂閱、日誌數據處理(Hbase\ES\其他)

3. RPC服務狀態變化通知

RPC服務狀態變化通知,使用了消息隊列的發布訂閱特性。其中:

RPC服務狀態變化通知,使用了RabbitMQ消息隊列技術

使用發布訂閱的消息模型

Topic:RPCServiceState

RPCService.Proxy:RPC服務狀態變化消息的訂閱者

RPC服務註冊、發布:消息的生產者,發送RPC服務狀態變化消息。

四、消息隊列使用的最佳實踐

1. RabbitMQ的連接,底層都是Socket連接,長連接 or 短連接?

RabbitMQ每個在創建每個連接的同時,會自動創建一個監視線程來定時(默認60s)偵測連接的狀態,如果連接斷開,觸發ConnectionShutdown事件。

用長連接,還是用短連接??

發送端:建議使用短連接,用完即釋放,避免長連接帶來的端口占用,因為發送端無處不在,發送操作短而急促。

接收端:建議使用長連接,時刻接收處理消息,因為消息的接收消費比較集中,接收操作久而彌堅。

2. 網絡是有抖動的,連接的斷開是正常的,如何應對?

發送端:發送失敗重試

接收端:註冊ConnectionShutdown事件同時捕獲消息接收異常,重新建立連接,接收消費消息

3. RabbitMQ Exchange(Topic)模式下帶來的消息隊列數量激增

只是創建了一個Exchange(Topic),為什麽會增加這麽多Queue。

因為,每個Topic的訂閱都是綁定一個Queue用作消息的消費。

技術分享

4. 需求的演變,消息結構的變更,如何平滑過度?

消息是byte[]數組,我們將復雜對象消息二進制序列化。

接收到消息後,我們將二進制數組反序列化為實體類。

當我們的實體類消息體的結構發生變化後,因為受二進制序列化處理的

影響,導致無法反序列化。

解決方案:

消息體預留一些string類型的擴展字段

消息隊列版本化,支持多個版本的消息體。

5. Kafka Consumer Group

同一Topic的一條消息只能被同一個Group內的一個Consumer消費

多個Consumer Group可同時消費同一條消息

技術分享

6. 消息的積壓

消息的積壓產生的原因:消息接收消費的速率低,發送的速度>接收的速度。

消息積壓後的影響:

消息大量積壓後,當新的消費者連接上MQ並開始接收消息時,發送速率會大幅降低。

消息隊列集群的壓力增加,大量的消息要持久化存儲和同步。

如何減少消息積壓:快速消費消息,同時保持消息體的不要過大

周國慶

2017/7/3

再談消息隊列技術