一、RocketMQ的基本原理

RocketMQ基本架構圖如下

從這個架構圖上我們可以知道,RocketMQ有4塊核心部分:

  • NameServer:管理Broker的資訊,讓使用MQ的系統感知到叢集裡面的broker

  • Broker:主從架構實現資料多副本儲存和高可用

  • producer:生產者

  • consumer:消費者

二、NameServer

2.1 Broker資訊註冊到哪個NameServer?

每臺broker機器需要向所有的NameServer機器上註冊自己的資訊,防止單臺NameServer掛掉導致Broker資訊不全,保證NameServer的叢集高可用。

2.2 Broker資訊怎麼註冊?

基於Netty的網路通訊。

2.3 Broker掛了如何感知?

  • NameServer感知:30s心跳機制和120s故障感知機制

broker會每隔30秒向NameServer傳送一個的心跳 ,NameServer收到一個心跳會更新對應broker的最近一次心跳事件,然後NamServer會每隔十秒執行一個任務,去檢查一下各個broker的最近一次心跳的時間,如果超過120s沒有收到相應broker的心跳,則判定對應的broker已經掛掉。

三、Broker

3.1 Master-Slave模式

為了保證MQ的資料不丟失而且具備一定的高可用性,我們採用的是主從複製模式。

RocketMQ自身的Master-Slave模式主採取的是Slave主動從Master拉取訊息。

3.2 究竟是如何從Master-Slave中進行讀寫呢?

  • 生產者在寫入訊息時,一般寫入到Master

  • 消費者在拉取訊息時,可能從Master拉取,也可能從Slave拉取,根據Master的負載情況和Slave的同步情況, 由Master給出建議

    • Master負載過高,建議下次從Slave獲取訊息
    • Slave未同步完全,建議下次從Master獲取訊息

3.3 Broker宕機分析

3.3.1 Slave宕機

對系統會存在一點影響,但是影響不大,只不過少了Slave Broker,會導致所有的讀寫壓力都集中在Master Broker上

3.3.2 Master宕機:基於Dledger實現RocketMQ高可用自動切換

選舉方式這裡不做重點介紹。

四、生產者

4.1 MessageQueue是什麼?

我們先看看Topic、Broker、Message之間的關係。

如圖比如說一個TopicA有n條訊息,然後一個TopicA中的n條資料分配放入給4個MessageQueue1-4。

所以本質上來說就是一個數據分片機制,通過MessageQueue將一個Topic的資料拆分為很多資料分片,在每個Broker機器上都儲存一些MessageQueue。通過這個方法可以實現分散式儲存。

4.2 生產者傳送訊息寫入哪個MessageQueue?

因為從前面我們知道,生產者會跟NameServer通訊獲取相應Topic的路由資料,從而知道,一個Topic有幾個MessageQueue,哪些MessageQueue在哪臺Broker機器上,通過對應的規則寫入對應的MessageQueue。

4.2.1 Master Broker故障分析

當MasterBroker宕機,此時SlaveBroker正在切換過程中,有一組Broker就沒有Master可以寫入。

此時我們可以開啟Producer的自動容錯機制開關:sendLatencyFaultEnable,比如說訪問其中一個Broker發現網路延遲有1000ms還無法訪問,我們會自動迴避這個Broker一段時間,比如接下來3000ms內,就不會訪問這個Broker。

過一段時間之後,MasterBroker修復好了,或者說SlaveBroker選舉成功了,就可以提供給別人訪問了。

4.3 Broker資料儲存(核心環節)

Broker資料儲存實際上是MQ最核心的環節:

  • 訊息吞吐量
  • 訊息不丟失

4.3.1 磁碟日誌檔案CommitLog

首先,Producer傳送訊息給Broker,Broker接收到訊息後,把這個訊息直接順序寫入寫入到磁碟上的一個日誌檔案,叫做CommitLog。

  • CommitLog是由很多磁碟檔案組成
  • 每個檔案限定最多1GB

4.3.2 ConsumeQueue儲存對應訊息的偏移量

在Broker中,每一個Topic下的每一個MessageQueue都會有對應一系列的ConsumeQueue檔案。

Broker磁碟儲存類似於檔案樹的形式存在:

ConsumeQueue中儲存著對應MessageQueue中的訊息在CommitLog中的物理偏移量地址offset。

如圖:

  1. Broker接受訊息,順序寫入訊息到CommitLog中

  2. 同時找到對應的TopicA/MessageQueue1/ConsumeQueue0寫入對應的實體地址

  3. TopicA/MessageQueue1/ConsumeQueue0的實體地址,即為CommitLog檔案中一個訊息的引用

即:Topic的每個MessageQueue都對應了Broker機器上的多個ConsumeQueue檔案,這些ConsumeQueue共同組成儲存了MessageQueue的所有訊息在CommitLog檔案中的物理offset偏移量。

4.3.3 Broker寫入磁碟CommitLog怎麼近乎記憶體寫效能?

磁碟檔案順序寫+OS PageCache寫入+OS非同步刷盤的策略

如圖:

  1. 資料寫入CommitLog時候,不是直接寫入磁碟,而是寫入OS的PageCache記憶體緩衝中
  2. 後臺開啟執行緒,非同步刷盤到CommitLog中

這樣的話基本上可以讓訊息寫入CommitLog的效能跟直接寫入記憶體裡面是差不多的,所以Broker才能具有高吞吐量。

4.3.4 非同步刷盤和同步刷盤

  • 非同步刷盤:高吞吐寫入+丟失資料風險
  • 同步刷盤:吞吐量下降+資料不丟失

對於日誌型別這種場景,可以允許資料的丟失,但是要求比較高的吞吐量,可以採用非同步刷盤的方式。另外非核心的業務場景,不涉及重要核心資料變更的場景,也可以使用非同步刷盤,比如訂單支付成功,傳送簡訊這種場景。但是對於涉及到核心的資料變更的場景,就需要使用同步刷盤,比如訂單支付成功後扣減庫存。

五、消費者

5.1 一個Topic上多個MessageQueue怎麼被消費?

原則:一個Consumer機器可以消費處理多個MessageQueue,一個MessageQueue只能被一個相同ConsumerGroup中的同一個Consumer消費。

5.2 Broker收到訊息拉取請求,返回給消費者處理提交消費進度

Broker收到訊息拉取請求後,會找到對應的MessageQueue中開始消費的位置,在ConsumeQueue讀取裡面對應位置的的訊息在CommitLog中的offset

如圖:

  1. consumer找到要消費的MessageQueue對應的ConsumeQueue對應要消費的位置

  2. 消費完成之後消費者返回一個消費狀態,broker會儲存我們的消費位置

  3. 接下來可以根據這個消費位置進行下一步消費,不需要從頭拉取

5.3 消費者消費訊息的效能問題

生產者是基於os cache提升寫效能的,broker收到一條訊息,會寫入CommitLog檔案,但是會先把CommitLog檔案中的資料寫入os cache(作業系統管理的快取中),然後os開啟後臺執行緒,非同步的將os cache快取中的CommitLog檔案的資料刷入磁碟。

在消費者消費資訊的時候:

第一步,我們會去讀取ConsumeQueue中的offset偏移量,此時大量的讀取壓力全部都在ConsumeQueue,ConsumeQueue檔案的讀效能是很大程度上會影響訊息拉取的效能和吞吐量。

所以,Broker對ConsumeQueue檔案也是基於os cache來進行優化的。

實際上,ConsumeQueue主要只是存放訊息的offset,所以每個檔案很小,佔不了多少磁碟空間,完全可以被os快取在記憶體裡。所以幾乎可以說訊息的讀取效能達到記憶體級別。

第二步,根據讀取到的offset去CommitLog裡讀取訊息的完整資料。此時會有兩種可能

  • 第一種:如果讀取的是剛剛寫入到CommitLog的資料,那麼大概率他們還停留在os cache中,此時可以順利的直接從os cache中讀取CommitLog中的資料,這個就是直接讀取記憶體,效能很高。
  • 第二種:讀取較早之前的CommitLog的資料,已經被刷入磁碟不在os cache裡面了,此時只能從磁碟上的檔案讀取了,這個效能稍微差一點。

這兩種狀態很好區分,比如說消費者一直在快速的拉取和消費處理,跟上了broker的訊息寫入速率,這麼來說os cache中每次CommitLog的訊息還沒來得及被刷入磁碟中的時候就被消費者消費了;但是比如說broker負載很高,拉取訊息的效能很低,跟不上生產者的速率,那麼資料會儲存在磁碟中進行讀取。

5.4 Master Broker什麼時候通知你去Slave Broker讀取?

根據以上,我們可以判斷了什麼時候Master Broker負載會高,也就是當消費者讀取訊息的時候,要從磁碟中載入大量的資料出來,此時Master Broker就會知道本次的負載會比較高,通知消費者下次從Slave Broker去拉取資料。

本質上就是對比當前沒有拉取訊息的數量和大小,以及最多可以存放在os cache記憶體裡的消****息的大小,如果沒有拉取的訊息超過了最大能使用的記憶體的量,那麼之後會頻繁的從磁碟載入資料,此時就讓你從slave broker去載入資料了!

六、問題分析

舉一個簡單的例子作為分析的入口,將從各個環節可能發生的問題進行深入分析,如圖:

  1. 使用者進行一筆生活繳費

  2. 訂單系統推送繳費訂單支付訊息到RocketMQ

  3. 紅包系統接受訂單訊息

  4. 發紅包給使用者

6.1 訊息傳送失敗

訊息傳送失敗的原因多種多樣,存在於多個環節,我們一一分析。

6.1.1 系統推送訊息丟失

第一個環節就是,訂單系統推送訊息到MQ的過程中,由於網路等因素導致訊息丟失。

6.1.2 RocketMQ的事務訊息原理分析

為了解決系統推送訊息丟失問題,RocketMQ有一個非常強悍的功能就是事務訊息,能夠確保我們訊息一定會成功寫入MQ裡面,不會半路搞丟。

如圖是在本系統中的一個基本事務訊息的流程圖。

  1. 訂單系統先發送half訊息到MQ中,試探MQ是否正常

如果此階段,half訊息傳送給MQ失敗,會執行一系列回滾操作,關閉這個訂單的狀態,因為後續的訊息都操作不了

  1. 當half訊息成功被RocketMQ接收時

    1. 返回half訊息的成功響應,進入第3步
    2. 返回的響應未收到,但是此時MQ已經儲存下來了一條half訊息,進入第5步
  2. 得知half訊息傳送成功之後,訂單系統可以更新資料庫,此時會有兩種情況對應兩種不同的提交

    1. 更新資料庫等操作一切順利,向RocketMQ傳送一個commit請求
    2. 由於網路異常或者資料庫掛了等,為了執行資料庫更新等操作,更新不了訂單狀態,傳送rollback請求
    3. 傳送rollback或者commit失敗,跳轉到第5步
  3. RocketMQ收到commit或者rollback請求

    1. 收到rollback請求刪除half訊息
    2. 收到commit請求改變half訊息狀態為已提交,紅包系統可以開始消費訊息
  4. 未收到commit和rollback請求的訊息,RocketMQ會有補償機制,回撥介面去判斷訂單的狀態是已關閉,則傳送rollback進行回滾。

6.1.3 RocketMQ的事務訊息底層分析

如圖解釋如下:

  • 消費系統對half訊息不可見的原因: 我們知道,消費者是是通過ConsumeQueue獲取到對應的CommitLog裡面的訊息,如圖,消費系統對half訊息不可見的原因是因為half訊息在未提交的時候,MQ維護了一個內部的TRANS_HALF_TOPIC,此時紅包系統只獲取TopicA中的MessageQueue中ConsumeQueue。

  • 返回half訊息成功的響應時機: 當half訊息寫入成功到TRANS_HALF_TOPIC中的ConsumeQueue的時候,就回認為寫入訊息成功,返回給對應的訂單系統成功響應。

  • 補償機制: RocketMQ會啟動一個定時任務,定時掃描half訊息狀態,如果還是為half訊息,則回撥訂單系統介面,判斷狀態。

  • 如何標記訊息回滾或提交: 訊息回滾並不是直接刪除,而是內部維護了一個OP_TOPIC,用一個OP操作來標記half訊息的狀態。

  • 執行commit操作後消費系統可見: 執行commit操作之後,OP操作會標記half為commit狀態,並且把對應訊息在TRANS_HALF_TOPIC中的訊息offset寫入到TOPICA中,此時訊息可見

6.1.4 思考:一定要用事務訊息嗎?

上面這麼複雜的事務訊息機制可能導致整體的效能比較差,而且吞吐量會比較低,我們一定要用事務訊息嗎?

可以基於同步傳送訊息+反覆多次重試的方案

6.1.5 訊息成功傳送到MQ中了,就一定不會丟了嗎?

我們可以分析的到,事務訊息能夠保證我們的訊息從生產者成功傳送到broker中對應的消費者需要消費的Topic中,我們認為他的訊息推送成功。

問題一:

但是這個訊息推送僅僅先是推送到os cache快取中,僅僅只是可以被消費系統看到,由於訊息積壓等原因,還沒來得及去獲取這條訊息,還沒來得及刷到ConsumeQueue的磁碟檔案中去,此時萬一機器突然宕機,os cache中的資料全部丟失,此時訊息必然丟失,消費系統無法讀到這條訊息。

如圖示意:

解決

為了解決這個問題,一定要確保訊息零丟失的話,我們的解決辦法就是將非同步刷盤調整為同步刷盤

放棄了非同步刷盤的高吞吐量,確保訊息資料的零丟失,也就是說只要MQ返回響應half訊息傳送成功了,此時訊息就已經進入了磁碟檔案了。

問題二:

就算os cache的訊息寫入ConsumeQueue的磁碟檔案了,紅包沒來得及消費這條訊息的時候,磁碟突然就壞了,一樣會導致訊息丟失。

所以說,無論是通過同步傳送訊息+反覆多次重試的方案,還是事務訊息的方案,哪怕保證寫入MQ成功了,訊息未必不會丟失。

解決:

對Broker使用主從架構的模式,每一個MasterBroker至少有一個SlaveBroker去同步他的資料,而且一條訊息寫入成功,必須讓SlaveBroker也寫入成功,保證資料有多個副本的冗餘。

6.1.6 紅包系統拿到了訊息就一定會消費訊息嗎?

不一定。

問題分析:

因為當紅包系統拿到訊息資料進記憶體裡時,此時還沒有執行發紅包的邏輯,然後此時紅包系統就已經提交了這條訊息的offset到broker中告訴broker已經消費掉了這條訊息,訊息位置會往後移。然後此時紅包系統宕機,這條訊息就會丟失,永遠執行不了發紅包的邏輯。

RocketMQ解決方案: 利用訊息監聽器同步處理訊息

在RocketMQ的Consumer的預設消費模式下,我們在訊息監聽器中接收到一批訊息之後,會執行處理訊息的邏輯,處理完成之後才會返回SUCCESS狀態提交offset到broker中,如果處理時宕機,不會返回SUCCESS狀態給broker,broker會繼續將這個訊息給下一個Consumer消費。

6.2 訊息傳送全鏈路零丟失方案總結

6.2.1 傳送訊息到MQ的零丟失

  • 同步傳送訊息+反覆多次嘗試
  • 事務訊息機制

6.2.2 MQ收到訊息之後的零丟失

  • 同步刷盤策略:解決os cache未能刷入磁碟問題
  • 主從架構同步機制:解決單個broker磁碟檔案損壞問題

6.2.3 消費訊息的零丟失

  • 採用同步處理訊息方式

6.2.4 適用場景

首先,訊息零丟失方案會必然的導致從頭到尾的效能下降和MQ的吞吐量下降

一般和金錢、交易以及核心資料相關的系統和核心鏈路,可以用這套零訊息丟失方案:比如支付系統、訂單系統等。

6.3 訊息傳送重複

重複發紅包等問題

6.3.1 傳送方重複傳送

如圖:

  • 使用者支付繳費訂單時候,會通知訂單系統傳送訂單支付訊息

  • 此時訂單系統響應超時

  • 支付系統再次重試呼叫訂單介面通知傳送訊息

  • 兩個訂單都成功,推送兩條相同的訊息到MQ

  • 紅包系統收到兩條訊息傳送兩個紅包

有類似很多這種訊息重試,介面重試的情況都會有訊息重複傳送的可能性,還比如說當你傳送訊息成功到MQ,MQ返回的SUCCESS的響應由於網路原因未收到,重試機制會再次傳送訊息,導致訊息重複。

解決方案:冪等性機制

  • 業務判斷法:RocketMQ支援訊息查詢功能

  1. 由於訂單系統呼叫超時,重試呼叫介面

  2. 當訂單系統發訊息之前,傳送請求到MQ查詢是否存在這條訊息

  3. 如果MQ已經存在,則不重複傳送

  • Redis快取:

Redis快取思想也比較簡單,只需要根據對應的訂單資訊去快取裡面查詢一下是否已經發送給MQ了。

但是這種解決方案也不是絕對的安全,因為你訊息傳送成功給MQ了還沒來得及寫Redis系統就掛了,之後也會被重複呼叫。

總結以上兩種解決方案,我們不建議在訊息的傳送環節保證訊息的不重複傳送,會影響介面效能。

6.3.2 消費方重複消費

  • 消費方消費訊息,執行完了發紅包邏輯後,應該返回SUCCESS狀態,提交消費進度

  • 但是剛發完紅包,沒來得及提交offset消費進度,紅包系統重啟

  • MQ沒收到offset消費進度返回,將這個訊息繼續傳送到消費系統進行消費

  • 二次傳送紅包。

解決方案:

  • 依據在生產者方設定訊息的messageKey,然後每一條訊息在消費方依據這個唯一的messageKey,進行冪等判斷:

  • 業務判斷,判斷這個業務的環節是否執行成功,如果沒有成功則消費,成功則捨棄訊息

  • 維護一個訊息表,當新的訊息到達的時候,根據新訊息的id在該表中查詢是否已經存在該id,如果存在則表明訊息已經被消費過,那麼丟棄該訊息不再進行業務操作即可

  • 若是訊息,然後執行insert資料庫方法,可以建立一個唯一主鍵,插入會保證不會重複

6.4 死信佇列

通過以上的學習,我已經基本解決了MQ訊息不丟失以及不會重複處理訊息的問題,在正常流程下基本上沒有什麼問題。但是會出現以下問題。

我們一直都是假設的一個場景就是紅包系統的MessageListener監聽回撥函式接收到訊息都能順利的處理好訊息邏輯,傳送紅包,落庫等操作,返回SUCCESS,提交offset到broker中去,然後從broker中獲取下一批訊息來處理。

如圖:

問題:

但是如果在我們MessageListener處理訊息邏輯時候,紅包資料庫宕機了,沒辦法完成發紅包的邏輯,此時出現對訊息處理的異常,我們應該怎麼處理呢?

解決方案:

在MessageListener中,除了返回SUCCESS狀態,我們還可以返回RECONSUME_LATER狀態,也就是用try-cache包裹住我們的業務程式碼,成功則返回SUCCESS狀態,順利進行後續操作,如果出現異常則返回RECONSUME_LATER狀態。

當RocketMQ收到我們返回的RECONSUME_LATER狀態之後,會將這批訊息放到對應消費組的重試佇列中。

重試佇列裡面的訊息會再次發給消費組,預設最多重試16次,如果重試16次失敗則進入死信佇列。

死信佇列:

對於死信佇列,一般我們可以專門開一個後臺執行緒,訂閱這個死信佇列,對死信佇列中的訊息,一直不停的嘗試。

6.5 訊息亂序

6.5.1 業務場景

大資料團隊要獲取到訂單系統的binlog,然後儲存一份在自己的大資料儲存系統中

資料庫binlog:記錄資料庫的增刪改查操作。

大資料團隊不能直接跑複雜的大SQL在訂單系統的資料庫中跑出來一些資料報表,這樣會嚴重影響訂單系統的效能,為了優化方案,我們採用類似基於Canal這樣的中介軟體去監聽訂單資料的binlog,然後把這個binlog發到MQ中去,然後我們的大資料系統自己用MQ裡獲取binlog,自己在自己的大資料儲存中執行增刪改查操作,得到我們需要的報表,如圖下:

6.5.2 亂序問題原理分析

  • Canal監聽到binlog日誌中,操作資料庫的順序為先執行insert插入操作,後update更新操作。

  • 因為我們訊息可能會發送到不同MessageQueue中的不同的ConsumeQueue中去

  • 然後同一個消費組的大資料消費系統獲取到insert binlog和update binlog,這兩個是並行操作的,所以不能確定哪個訊息先獲取到執行,可能會出現訊息亂序。

6.5.3 訊息亂序解決方案

出現上面問題的原因,根本問題就是一個訂單binlog分別進入了兩個MessageQueue中,解決這個問題的方法其實非常簡單,就是得想辦法讓同一個訂單的binlog進入到一個MessageQueue裡面去。

方法很簡單:我們可以根據訂單id對MessageQueue的數量取模來對應每個訂單究竟去哪個MessageQueue。

訊息亂序解決方案不能和重試佇列混用。

6.6 延遲訊息

6.6.1 業務場景

大量訂單點選提交未支付,超過30min需要自動退款,我們研究需要定時退款掃描問題。

如圖:

當一個訂單下單之後,沒有支付會進入訂單資料庫儲存,如果30分鐘內沒有支付,就必須訂單系統自動關閉這個訂單。

可能我們就需要有一個後臺的執行緒,不停的去掃描訂單資料庫裡所有的未支付狀態的訂單,超過30分鐘了就必須把訂單狀態更新為關閉。這裡會有一個問題,訂單系統的後臺執行緒必須不停的掃描各種未支付的訂單,可能每個未支付的訂單在30分鐘之內會被掃描很多遍。這個掃描訂單的服務是一個麻煩的問題。

針對這種場景,RocketMQ的延遲訊息就登場了。

如圖:

  • 建立一個訂單,傳送一條延遲訊息到MQ中去

  • 需要等待30分鐘之後,才能被訂單掃描服務消費

  • 當訂單掃描服務在30分鐘後消費了一條訊息,就針對這條訊息查詢訂單資料庫

  • 看過了30分鐘了,他的支付狀態如果是未支付,則關閉,這樣只會被掃描到一次

所以RocketMQ的延遲訊息,是非常常用並且非常有用的一個功能

6.7 經驗總結

6.7.1 運用tags過濾資料

在一些真正的生產專案中,我們需要合理的規劃Topic和裡面的tags,一個Topic代表了某一類的業務訊息型別資料,我們可以通過裡面的tags來對同一個topic的一些訊息進行過濾。

6.7.2 基於訊息key來定位訊息是否丟失

我們在訊息零丟失方案中,萬一訊息真的丟失了,我們怎麼去排查呢?在RocketMQ中我們可以給訊息設定對應的Key值,比如設定一個訂單ID:message.setKeys(orderId),這樣這個訊息就和一個key繫結起來,當這個訊息傳送到broker中去,會根據對應message的數量構建hash索引,存放在IndexFile索引檔案中,我們可以通過MQ提供的命令去查詢。

6.7.3 訊息零丟失方案的補充

在我們這種大型的金融級的系統,或者跟錢有關的支付系統等等,需要有超高級別的高可用保障機制,所以對於零訊息丟失解決方案來說,萬一一整個MQ叢集徹底崩潰了,我們需要有更完善的措施來保證我們訊息不會丟失。

此時生產者傳送不了訊息到MQ,所以我們生產者就應該把訊息在本地進行持久化,可以是存在本地磁碟,也可以是在資料庫裡去存起來,MQ恢復之後,再把持久化的訊息投遞到MQ中去。

6.7.4 提高消費者的吞吐量

最簡單的方法去提高消費者的吞吐量,就是提高消費者的並行度,比如說部署更多的Consumer機器去消費訊息。但是我們需要明確的一點就是對應的MessageQueue也要增加,因為一個MessageQueue只能被一個Consumer機器消費。

第二個辦法是我們可以增加Consumer的執行緒數量,消費執行緒樂隊,消費速度越快。

第三個辦法是我們可以開啟消費者的批量消費功能(有對應的引數設定)。

6.7.5 要不要消費歷史記錄

Consumer是支援設定在哪裡開始消費的,常見的有兩種:從Topic的第一條資料消費(CONSUME_FROM_LAST_OFFSET),或者是從最後一次消費過的訊息之後開始消費(CONSUME_FROM_FIRTST_OFFSET),我們一般都是設定選擇後者。

七、百萬訊息積壓問題

7.1 業務場景

如圖所示:在一個系統中,由生產者系統和消費者系統兩個環節組成,生產者不斷的向MQ裡面寫入訊息,消費者不斷的從MQ中消費訊息。突然有一天消費者依賴的一些資料庫掛了,消費者就處理不了當下的業務邏輯,訊息也不能正常的被消費,此時生產者還在正常的向MQ中寫入訊息,結果在高峰期內,就往MQ中寫入了百萬條訊息,都積壓在了MQ裡面了。

7.2 解決方案

第一, 最簡單粗暴的方法,如果我們的訊息能夠容忍百萬訊息的丟失,那麼我們可以直接修改消費者系統的程式碼,丟棄所有的訊息,那麼百萬訊息很快就被處理完了,但是往往對於絕大多數系統而言,我們不能使用這種辦法。

第二, 我們需要等待消費者依賴的資料庫恢復之後,根據線上的Topic的MEssageQueue來判斷後續如何處理。

MessageQueue數量多:

  • 比如說我現在一個Topic中有20個MessageQueue,有4個消費者系統在消費,那麼我每個消費者就會從5個MessageQueue中獲取訊息進行消費,畢竟積壓了百萬訊息,僅僅依賴4個消費者是遠遠不夠的。

  • 所以我們可以臨時申請16臺機器多部署16個消費者,這樣20個消費者去同時消費20個MessageQueue,速度提高了5倍,積壓的百萬訊息很快就能處理完畢。

  • 但是此時我們要考慮的是,增加了5倍的消費能力,那麼資料庫的壓力就增加了5倍,這個是我們需要考慮的

如圖:

MessageQueue數量少:

  • 比如說一個Topic總共就只有4個MessageQueue,然後就只有4個消費者系統,這時候沒辦法擴容消費系統

  • 所以此時我們可以臨時修改那4個消費者系統的程式碼,讓他們獲取的訊息不寫入資料庫,而是寫入一個新的topic

  • 新的Topic有新增的20個MessageQUeue,部署20臺臨時增加的消費者系統去消費新的Topic中的Message。

如圖:

八、MQ叢集資料遷移問題:雙讀+雙寫

要做MQ的叢集遷移,不是簡單的粗暴的把Producer更新停機,新的程式碼重新上線發到新的MQ中去。

一般來說,我們需要做到兩件事情:

  • 雙寫: 要遷移的時候,我們需要在所有的Producer系統中,要引入一個雙寫的程式碼,讓他同時往新老兩個MQ中寫入訊息,多寫幾天,起碼要持續一個星期,我們會發現這兩個MQ的資料幾乎一模一樣了,但是雙寫肯定是不夠的的,我們還要同時進行雙讀。

  • 雙讀: 也就是說我在雙寫的時候,所有的Consumer系統都需要同時從新老兩個MQ裡面獲取訊息,分別都用一模一樣的邏輯處理,只不過從老MQ中還是去走核心邏輯處理,可以落庫儲存之類的操作,但是新的MQ我們可以用一樣的邏輯處理,但是不能把處理的結果具體落庫,我們可以寫入一個臨時的儲存中。

  • 觀察: 雙寫和雙讀一段時間之後,我們通過消費端對比,發現處理訊息數量一致。

  • 切換: 正式實施切換,停機Producer系統,再重新修改上線,全部修改為新MQ,此時他資料並不會丟失,因為之前已經雙寫一段時間了,然後Consumer系統程式碼修改上線。

點關注,不迷路

好了各位,以上就是這篇文章的全部內容了,我後面會每週都更新幾篇高質量的大廠面試和常用技術棧相關的文章。感謝大夥能看到這裡,如果這個文章寫得還不錯, 求三連!!! 感謝各位的支援和認可,我們下篇文章見!

我是 九靈 ,有需要交流的童鞋可以關注公眾號:Java 補習課! 如果本篇部落格有任何錯誤,請批評指教,不勝感激 !