1. 程式人生 > >阿里RocketMQ如何解決訊息的順序&重複兩大硬傷?

阿里RocketMQ如何解決訊息的順序&重複兩大硬傷?

http://www.sohu.com/a/129521820_487514

分散式訊息系統作為實現分散式系統可擴充套件、可伸縮性的關鍵元件,需要具有高吞吐量、高可用等特點。而談到訊息系統的設計,就回避不了兩個問題:

  1. 訊息的順序問題

  2. 訊息的重複問題

RocketMQ作為阿里開源的一款高效能、高吞吐量的訊息中介軟體,它是怎樣來解決這兩個問題的?RocketMQ有哪些關鍵特性?其實現原理是怎樣的?

關鍵特性及其實現原理

一、順序訊息

訊息有序指的是可以按照訊息的傳送順序來消費。例如:一筆訂單產生了 3 條訊息,分別是訂單建立、訂單付款、訂單完成。消費時,要按照順序依次消費才有意義。與此同時多筆訂單之間又是可以並行消費的。首先來看如下示例:

假如生產者產生了2條訊息:M1、M2,要保證這兩條訊息的順序,應該怎樣做?你腦中想到的可能是這樣:

你可能會採用這種方式保證訊息順序

假定M1傳送到S1,M2傳送到S2,如果要保證M1先於M2被消費,那麼需要M1到達消費端被消費後,通知S2,然後S2再將M2傳送到消費端。

這個模型存在的問題是,如果M1和M2分別傳送到兩臺Server上,就不能保證M1先達到MQ叢集,也不能保證M1被先消費。換個角度看,如果M2先於M1達到MQ叢集,甚至M2被消費後,M1才達到消費端,這時訊息也就亂序了,說明以上模型是不能保證訊息的順序的。如何才能在MQ叢集保證訊息的順序?一種簡單的方式就是將M1、M2傳送到同一個Server上:

保證訊息順序,你改進後的方法

這樣可以保證M1先於M2到達MQServer(生產者等待M1傳送成功後再發送M2),根據先達到先被消費的原則,M1會先於M2被消費,這樣就保證了訊息的順序。

這個模型也僅僅是理論上可以保證訊息的順序,在實際場景中可能會遇到下面的問題:

網路延遲問題

只要將訊息從一臺伺服器發往另一臺伺服器,就會存在網路延遲問題。如上圖所示,如果傳送M1耗時大於傳送M2的耗時,那麼M2就仍將被先消費,仍然不能保證訊息的順序。即使M1和M2同時到達消費端,由於不清楚消費端1和消費端2的負載情況,仍然有可能出現M2先於M1被消費的情況。

那如何解決這個問題?將M1和M2發往同一個消費者,且傳送M1後,需要消費端響應成功後才能傳送M2。

聰明的你可能已經想到另外的問題:如果M1被髮送到消費端後,消費端1沒有響應,那是繼續傳送M2呢,還是重新發送M1?一般為了保證訊息一定被消費,肯定會選擇重發M1到另外一個消費端2,就如下圖所示。

保證訊息順序的正確姿勢

這樣的模型就嚴格保證訊息的順序,細心的你仍然會發現問題,消費端1沒有響應Server時有兩種情況,一種是M1確實沒有到達(資料在網路傳送中丟失),另外一種消費端已經消費M1且已經發送響應訊息,只是MQ Server端沒有收到。如果是第二種情況,重發M1,就會造成M1被重複消費。也就引入了我們要說的第二個問題,訊息重複問題,這個後文會詳細講解。

回過頭來看訊息順序問題,嚴格的順序訊息非常容易理解,也可以通過文中所描述的方式來簡單處理。總結起來,要實現嚴格的順序訊息,簡單且可行的辦法就是:

保證生產者 - MQServer - 消費者是一對一對一的關係

這樣的設計雖然簡單易行,但也會存在一些很嚴重的問題,比如:

  1. 並行度就會成為訊息系統的瓶頸(吞吐量不夠)

  2. 更多的異常處理,比如:只要消費端出現問題,就會導致整個處理流程阻塞,我們不得不花費更多的精力來解決阻塞的問題。

但我們的最終目標是要叢集的高容錯性和高吞吐量。這似乎是一對不可調和的矛盾,那麼阿里是如何解決的?

世界上解決一個計算機問題最簡單的方法:“恰好”不需要解決它!——沈詢

有些問題,看起來很重要,但實際上我們可以通過合理的設計或者將問題分解來規避。如果硬要把時間花在解決問題本身,實際上不僅效率低下,而且也是一種浪費。從這個角度來看訊息的順序問題,我們可以得出兩個結論:

  1. 不關注亂序的應用實際大量存在

  2. 佇列無序並不意味著訊息無序

所以從業務層面來保證訊息的順序而不僅僅是依賴於訊息系統,是不是我們應該尋求的一種更合理的方式?

最後我們從原始碼角度分析RocketMQ怎麼實現傳送順序訊息的。

RocketMQ通過輪詢所有佇列的方式來確定訊息被髮送到哪一個佇列(負載均衡策略)。比如下面的示例中,訂單號相同的訊息會被先後傳送到同一個佇列中:

在獲取到路由資訊以後,會根據MessageQueueSelector實現的演算法來選擇一個佇列,同一個OrderId獲取到的肯定是同一個佇列。

二、訊息重複

上面在解決訊息順序問題時,引入了一個新的問題,就是訊息重複。那麼RocketMQ是怎樣解決訊息重複的問題呢?還是“恰好”不解決。

成訊息重複的根本原因是:網路不可達。只要通過網路交換資料,就無法避免這個問題。所以解決這個問題的辦法就是繞過這個問題。那麼問題就變成了:如果消費端收到兩條一樣的訊息,應該怎樣處理?

  1. 消費端處理訊息的業務邏輯保持冪等性

  2. 保證每條訊息都有唯一編號且保證訊息處理成功與去重表的日誌同時出現

第1條很好理解,只要保持冪等性,不管來多少條重複訊息,最後處理的結果都一樣。第2條原理就是利用一張日誌表來記錄已經處理成功的訊息的ID,如果新到的訊息ID已經在日誌表中,那麼就不再處理這條訊息。

第1條解決方案,很明顯應該在消費端實現,不屬於訊息系統要實現的功能。第2條可以訊息系統實現,也可以業務端實現。正常情況下出現重複訊息的概率其實很小,如果由訊息系統來實現的話,肯定會對訊息系統的吞吐量和高可用有影響,所以最好還是由業務端自己處理訊息重複的問題,這也是RocketMQ不解決訊息重複的問題的原因。

RocketMQ不保證訊息不重複,如果你的業務需要保證嚴格的不重複訊息,需要你自己在業務端去重。

三、事務訊息

RocketMQ除了支援普通訊息,順序訊息,另外還支援事務訊息。首先討論一下什麼是事務訊息以及支援事務訊息的必要性。我們以一個轉帳的場景為例來說明這個問題:Bob向Smith轉賬100塊。

在單機環境下,執行事務的情況,大概是下面這個樣子:

單機環境下轉賬事務示意圖

當用戶增長到一定程度,Bob和Smith的賬戶及餘額資訊已經不在同一臺伺服器上了,那麼上面的流程就變成了這樣:

叢集環境下轉賬事務示意圖

這時候你會發現,同樣是一個轉賬的業務,在叢集環境下,耗時居然成倍的增長,這顯然是不能夠接受的。那如何來規避這個問題?

大事務 = 小事務 + 非同步

將大事務拆分成多個小事務非同步執行。這樣基本上能夠將跨機事務的執行效率優化到與單機一致。轉賬的事務就可以分解成如下兩個小事務:

小事務+非同步訊息

圖中執行本地事務(Bob賬戶扣款)和傳送非同步訊息應該保證同時成功或者同時失敗,也就是扣款成功了,傳送訊息一定要成功,如果扣款失敗了,就不能再發送訊息。那問題是:我們是先扣款還是先發送訊息呢?

首先看下先發送訊息的情況,大致的示意圖如下:

事務訊息:先發送訊息

存在的問題是:如果訊息傳送成功,但是扣款失敗,消費端就會消費此訊息,進而向Smith賬戶加錢。

先發訊息不行,那就先扣款吧,大致的示意圖如下:

事務訊息-先扣款

存在的問題跟上面類似:如果扣款成功,傳送訊息失敗,就會出現Bob扣錢了,但是Smith賬戶未加錢。

可能大家會有很多的方法來解決這個問題,比如:直接將發訊息放到Bob扣款的事務中去,如果傳送失敗,丟擲異常,事務回滾。這樣的處理方式也符合“恰好”不需要解決的原則。

這裡需要說明一下:如果使用Spring來管理事物的話,大可以將傳送訊息的邏輯放到本地事物中去,傳送訊息失敗丟擲異常,Spring捕捉到異常後就會回滾此事物,以此來保證本地事物與傳送訊息的原子性。

RocketMQ支援事務訊息,下面來看看RocketMQ是怎樣來實現的。

RocketMQ實現傳送事務訊息

RocketMQ第一階段傳送Prepared訊息時,會拿到訊息的地址,第二階段執行本地事物,第三階段通過第一階段拿到的地址去訪問訊息,並修改訊息的狀態。

細心的你可能又發現問題了,如果確認訊息傳送失敗了怎麼辦?RocketMQ會定期掃描訊息叢集中的事物訊息,如果發現了Prepared訊息,它會向訊息傳送端(生產者)確認,Bob的錢到底是減了還是沒減呢?如果減了是回滾還是繼續傳送確認訊息呢?RocketMQ會根據傳送端設定的策略來決定是回滾還是繼續傳送確認訊息。這樣就保證了訊息傳送與本地事務同時成功或同時失敗。

那我們來看下RocketMQ原始碼,是如何處理事務訊息的。客戶端傳送事務訊息的部分(完整程式碼請檢視:rocketmq-example工程下的com.alibaba.rocketmq.example.transaction.TransactionProducer):

接著檢視sendMessageInTransaction方法的原始碼,總共分為3個階段:傳送Prepared訊息、執行本地事務、傳送確認訊息。

endTransaction方法會將請求發往broker(mq server)去更新事務訊息的最終狀態:

  1. 根據sendResult找到Prepared訊息 ,sendResult包含事務訊息的ID

  2. 根據localTransaction更新訊息的最終狀態

如果endTransaction方法執行失敗,資料沒有傳送到broker,導致事務訊息的 狀態更新失敗,broker會有回查執行緒定時(預設1分鐘)掃描每個儲存事務狀態的表格檔案,如果是已經提交或者回滾的訊息直接跳過,如果是prepared狀態則會向Producer發起CheckTransaction請求,Producer會呼叫DefaultMQProducerImpl.checkTransactionState()方法來處理broker的定時回撥請求,而checkTransactionState會呼叫我們的事務設定的決斷方法來決定是回滾事務還是繼續執行,最後呼叫endTransactionOneway讓broker來更新訊息的最終狀態。

再回到轉賬的例子,如果Bob的賬戶的餘額已經減少,且訊息已經發送成功,Smith端開始消費這條訊息,這個時候就會出現消費失敗和消費超時兩個問題,解決超時問題的思路就是一直重試,直到消費端消費訊息成功,整個過程中有可能會出現訊息重複的問題,按照前面的思路解決即可。

消費事務訊息

這樣基本上可以解決消費端超時問題,但是如果消費失敗怎麼辦?阿里提供給我們的解決方法是:人工解決。大家可以考慮一下,按照事務的流程,因為某種原因Smith加款失敗,那麼需要回滾整個流程。如果訊息系統要實現這個回滾流程的話,系統複雜度將大大提升,且很容易出現Bug,估計出現Bug的概率會比消費失敗的概率大很多。這也是RocketMQ目前暫時沒有解決這個問題的原因,在設計實現訊息系統時,我們需要衡量是否值得花這麼大的代價來解決這樣一個出現概率非常小的問題,這也是大家在解決疑難問題時需要多多思考的地方。

20160321補充:在3.2.6版本中移除了事務訊息的實現,所以此版本不支援事務訊息,具體情況請參考rocketmq的issues:

  • https://github.com/alibaba/RocketMQ/issues/65

  • https://github.com/alibaba/RocketMQ/issues/138

  • https://github.com/alibaba/RocketMQ/issues/156

四、Producer如何傳送訊息

Producer輪詢某topic下的所有佇列的方式來實現傳送方的負載均衡,如下圖所示:

producer傳送訊息負載均衡

首先分析一下RocketMQ的客戶端傳送訊息的原始碼:

在整個應用生命週期內,生產者需要呼叫一次start方法來初始化,初始化主要完成的任務有:

  1. 如果沒有指定namesrv地址,將會自動定址

  2. 啟動定時任務:更新namesrv地址、從namsrv更新topic路由資訊、清理已經掛掉的broker、向所有broker傳送心跳...

  3. 啟動負載均衡的服務

初始化完成後,開始傳送訊息,傳送訊息的主要程式碼如下:

程式碼中需要關注的兩個方法tryToFindTopicPublishInfo和selectOneMessageQueue。前面說過在producer初始化時,會啟動定時任務獲取路由資訊並更新到本地快取,所以tryToFindTopicPublishInfo會首先從快取中獲取topic路由資訊,如果沒有獲取到,則會自己去namesrv獲取路由資訊。selectOneMessageQueue方法通過輪詢的方式,返回一個佇列,以達到負載均衡的目的。

如果Producer傳送訊息失敗,會自動重試,重試的策略:

  1. 重試次數 < retryTimesWhenSendFailed(可配置)

  2. 總的耗時(包含重試n次的耗時) < sendMsgTimeout(傳送訊息時傳入的引數)

  3. 同時滿足上面兩個條件後,Producer會選擇另外一個佇列傳送訊息

五、訊息儲存

RocketMQ的訊息儲存是由consume queue和commit log配合完成的。

1、Consume Queue

consume queue是訊息的邏輯佇列,相當於字典的目錄,用來指定訊息在物理檔案commit log上的位置。

我們可以在配置中指定consumequeue與commitlog儲存的目錄

每個topic下的每個queue都有一個對應的consumequeue檔案,比如:

Consume Queue檔案組織,如圖所示:

Consume Queue檔案組織示意圖

  1. 根據topic和queueId來組織檔案,圖中TopicA有兩個佇列0,1,那麼TopicA和QueueId=0組成一個ConsumeQueue,TopicA和QueueId=1組成另一個ConsumeQueue。

  2. 按照消費端的GroupName來分組重試佇列,如果消費端消費失敗,訊息將被髮往重試佇列中,比如圖中的%RETRY%ConsumerGroupA。

  3. 按照消費端的GroupName來分組死信佇列,如果消費端消費失敗,並重試指定次數後,仍然失敗,則發往死信佇列,比如圖中的%DLQ%ConsumerGroupA。

死信佇列(Dead Letter Queue)一般用於存放由於某種原因無法傳遞的訊息,比如處理失敗或者已經過期的訊息。

Consume Queue中儲存單元是一個20位元組定長的二進位制資料,順序寫順序讀,如下圖所示:

consumequeue檔案儲存單元格式

  1. CommitLog Offset是指這條訊息在Commit Log檔案中的實際偏移量

  2. Size儲存中訊息的大小

  3. Message Tag HashCode儲存訊息的Tag的雜湊值:主要用於訂閱時訊息過濾(訂閱時如果指定了Tag,會根據HashCode來快速查詢到訂閱的訊息)

2、Commit Log

CommitLog:訊息存放的物理檔案,每臺broker上的commitlog被本機所有的queue共享,不做任何區分。

檔案的預設位置如下,仍然可通過配置檔案修改:

${user.home} store${commitlog}${fileName}

CommitLog的訊息儲存單元長度不固定,檔案順序寫,隨機讀。訊息的儲存結構如下表所示,按照編號順序以及編號對應的內容依次儲存。

Commit Log儲存單元結構圖

3、訊息儲存實現

訊息儲存實現,比較複雜,也值得大家深入瞭解,後面會單獨成文來分析(目前正在收集素材),這小節只以程式碼說明一下具體的流程。

4、訊息的索引檔案

如果一個訊息包含key值的話,會使用IndexFile儲存訊息索引,檔案的內容結構如圖:

訊息索引

索引檔案主要用於根據key來查詢訊息的,流程主要是:

  1. 根據查詢的 key 的 hashcode%slotNum 得到具體的槽的位置(slotNum 是一個索引檔案裡面包含的最大槽的數目,例如圖中所示 slotNum=5000000)

  2. 根據 slotValue(slot 位置對應的值)查詢到索引項列表的最後一項(倒序排列,slotValue 總是指向最新的一個索引項)

  3. 遍歷索引項列表返回查詢時間範圍內的結果集(預設一次最大返回的 32 條記錄)

六、訊息訂閱

RocketMQ訊息訂閱有兩種模式,一種是Push模式,即MQServer主動向消費端推送;另外一種是Pull模式,即消費端在需要時,主動到MQServer拉取。但在具體實現時,Push和Pull模式都是採用消費端主動拉取的方式。

首先看下消費端的負載均衡:

消費端負載均衡

消費端會通過RebalanceService執行緒,10秒鐘做一次基於topic下的所有佇列負載:

  1. 遍歷Consumer下的所有topic,然後根據topic訂閱所有的訊息

  2. 獲取同一topic和Consumer Group下的所有Consumer

  3. 然後根據具體的分配策略來分配消費佇列,分配的策略包含:平均分配、消費端配置等

如同上圖所示:如果有 5 個佇列,2 個 consumer,那麼第一個 Consumer 消費 3 個佇列,第二 consumer 消費 2 個佇列。這裡採用的就是平均分配策略,它類似於分頁的過程,TOPIC下面的所有queue就是記錄,Consumer的個數就相當於總的頁數,那麼每頁有多少條記錄,就類似於某個Consumer會消費哪些佇列。

通過這樣的策略來達到大體上的平均消費,這樣的設計也可以很方面的水平擴充套件Consumer來提高消費能力。

消費端的Push模式是通過長輪詢的模式來實現的,就如同下圖:

Push模式示意圖

Consumer端每隔一段時間主動向broker傳送拉訊息請求,broker在收到Pull請求後,如果有訊息就立即返回資料,Consumer端收到返回的訊息後,再回調消費者設定的Listener方法。如果broker在收到Pull請求時,訊息佇列裡沒有資料,broker端會阻塞請求直到有資料傳遞或超時才返回。

當然,Consumer端是通過一個執行緒將阻塞佇列LinkedBlockingQueue<PullRequest>中的PullRequest傳送到broker拉取訊息,以防止Consumer一致被阻塞。而Broker端,在接收到Consumer的PullRequest時,如果發現沒有訊息,就會把PullRequest扔到ConcurrentHashMap中快取起來。broker在啟動時,會啟動一個執行緒不停的從ConcurrentHashMap取出PullRequest檢查,直到有資料返回。

七、RocketMQ的其他特性

前面的6個特性都是基本上都是點到為止,想要深入瞭解,還需要大家多多檢視原始碼,多多在實際中運用。當然除了已經提到的特性外,RocketMQ還支援:

  1. 定時訊息

  2. 訊息的刷盤策略

  3. 主動同步策略:同步雙寫、非同步複製

  4. 海量訊息堆積能力

  5. 高效通訊

  6. .......

其中涉及到的很多設計思路和解決方法都值得我們深入研究:

  1. 訊息的儲存設計:既要滿足海量訊息的堆積能力,又要滿足極快的查詢效率,還要保證寫入的效率。

  2. 高效的通訊元件設計:高吞吐量,毫秒級的訊息投遞能力都離不開高效的通訊。

  3. .......

RocketMQ最佳實踐

一、Producer最佳實踐

  1. 一個應用盡可能用一個 Topic,訊息子型別用 tags 來標識,tags 可以由應用自由設定。只有傳送訊息設定了tags,消費方在訂閱訊息時,才可以利用 tags 在 broker 做訊息過濾。

  2. 每個訊息在業務層面的唯一標識碼,要設定到 keys 欄位,方便將來定位訊息丟失問題。由於是雜湊索引,請務必保證 key 儘可能唯一,這樣可以避免潛在的雜湊衝突。

    訊息傳送成功或者失敗,要列印訊息日誌,務必要列印 sendresult 和 key 欄位。

  3. 對於訊息不可丟失應用,務必要有訊息重發機制。例如:訊息傳送失敗,儲存到資料庫,能有定時程式嘗試重發或者人工觸發重發。

  4. 某些應用如果不關注訊息是否傳送成功,請直接使用sendOneWay方法傳送訊息。

二、Consumer最佳實踐

  1. 消費過程要做到冪等(即消費端去重)

  2. 儘量使用批量方式消費方式,可以很大程度上提高消費吞吐量。

  3. 優化每條訊息消費過程

三、其他配置

線上應該關閉autoCreateTopicEnable,即在配置檔案中將其設定為false。

RocketMQ在傳送訊息時,會首先獲取路由資訊。如果是新的訊息,由於MQServer上面還沒有建立對應的Topic,這個時候,如果上面的配置開啟的話,會返回預設TOPIC的(RocketMQ會在每臺broker上面建立名為TBW102的TOPIC)路由資訊,然後Producer會選擇一臺Broker傳送訊息,選中的broker在儲存訊息時,發現訊息的topic還沒有建立,就會自動建立topic。後果就是:以後所有該TOPIC的訊息,都將傳送到這臺broker上,達不到負載均衡的目的。

所以基於目前RocketMQ的設計,建議關閉自動建立TOPIC的功能,然後根據訊息量的大小,手動建立TOPIC。

RocketMQ設計相關

RocketMQ的設計假定:

  • 每臺PC機器都可能宕機不可服務

  • 任意叢集都有可能處理能力不足

  • 最壞的情況一定會發生

  • 內網環境需要低延遲來提供最佳使用者體驗

RocketMQ的關鍵設計:

  • 分散式叢集化

  • 強資料安全

  • 海量資料堆積

  • 毫秒級投遞延遲(推拉模式)

這是RocketMQ在設計時的假定前提以及需要到達的效果。我想這些假定適用於所有的系統設計。隨著我們系統的服務的增多,每位開發者都要注意自己的程式是否存在單點故障,如果掛了應該怎麼恢復、能不能很好的水平擴充套件、對外的介面是否足夠高效、自己管理的資料是否足夠安全...... 多多規範自己的設計,才能開發出高效健壯的程式。

參考資料

  • RocketMQ使用者指南

    https://pan.baidu.com/s/1kTWsE8J

  • RocketMQ原理簡介

    https://pan.baidu.com/s/1bogcpgN

  • RocketMQ最佳實踐

    https://pan.baidu.com/s/1kTXE4PD

  • 阿里分散式開放訊息服務(ONS)原理與實踐2

    http://v.youku.com/v_show/id_XODY4ODE3OTY0.html?from=s1.8-1-1.2

  • 阿里分散式開放訊息服務(ONS)原理與實踐3

    http://v.youku.com/v_show/id_XODY5ODcxNjI0.html?from=s1.8-1-1.2

  • RocketMQ原理解析

    http://blog.csdn.net/column/details/learningrocketmq.html

水平有限,難免疏漏,如有問題請留言。

經作者同意授權轉載

來源:簡書

作者:CHEN川