1. 程式人生 > >滴滴出行基於RocketMQ構建企業級訊息佇列服務的實踐

滴滴出行基於RocketMQ構建企業級訊息佇列服務的實踐

本文整理自滴滴出行訊息佇列負責人 江海挺 在Apache RocketMQ開發者沙龍北京站的分享。通過本文,您將瞭解到滴滴出行:

  • 在訊息佇列技術選型方面的思考;
  • 為什麼選擇 RocketMQ 作為出行業務的訊息佇列解決方案;
  • 如何構建自己的訊息佇列服務;
  • 在 RocketMQ 上的擴充套件改造實踐;
  • 在 RocketMQ 上的實踐經驗。

0ff55077723282fea149a085e05dc65c

江海挺:
滴滴出行訊息佇列負責人,Apache RocketMQ Contributor,大學畢業後一直在做訊息佇列領域相關的技術、產品和服務,積累了豐富的實踐經驗,沉澱了不少關於訊息佇列的思考。

滴滴出行的訊息技術選型

1.1 歷史

初期,公司內部沒有專門的團隊維護訊息佇列服務,所以訊息佇列使用方式較多,主要以Kafka為主,有業務直連的,也有通過獨立的服務轉發訊息的。另外有一些團隊也會用RocketMQ、Redis的list,甚至會用比較非主流的beanstalkkd。導致的結果就是,比較混亂,無法維護,資源使用也很浪費。

image_20181014102848377

1.2 為什麼棄用 Kafka

一個核心業務在使用Kafka的時候,出現了叢集資料寫入抖動非常嚴重的情況,經常會有資料寫失敗。

主要有兩點原因:

  • 隨著業務增長,Topic的資料增多,叢集負載增大,效能下降;
  • 我們用的是Kafka0.8.2那個版本,有個bug,會導致副本重新複製,複製的時候有大量的讀,我們儲存盤用的是機械盤,導致磁碟IO過大,影響寫入。

所以我們決定做自己的訊息佇列服務。

image_20181014112905037

首先需要解決業務方訊息生產失敗的問題。因為這個Kafka用的是釋出/訂閱模式,一個topic的訂閱方會有很多,涉及到的下游業務也就非常多,沒辦法一口氣直接替換Kafka,遷移到新的一個訊息佇列服務上。所以我們當時的方案是加了一層代理,然後利用codis作為快取,解決了Kafka不定期寫入失敗的問題,如上圖。當後面的Kafka出現不可寫入的時候,我們就會先把資料寫入到codis中,然後延時進行重試,直到寫成功為止。

1.3 為什麼選擇 RocketMQ

經過一系列的調研和測試之後,我們決定採用RocketMQ,具體原因在後面會介紹。

image_20181014113200764

為了支援多語言環境、解決一些遷移和某些業務的特殊需求,我們又在消費側加上了一個代理服務。然後形成了這麼一個核心框架。業務端只跟代理層互動。中間的訊息引擎,負責訊息的核心儲存。在之前的基本框架之後,我們後面就主要圍繞三個方向做。

  • 遷移,把之前提到的所有五花八門的佇列環境,全部遷移到我們上面。這裡面的遷移方案後面會跟大家介紹一下。
  • 功能迭代和成本效能上的優化。
  • 服務化,業務直接通過平臺介面來申請資源,申請到之後直接使用。

1.4 演進中的架構

image_20181014113633110

這張圖是我們訊息佇列服務的一個比較新的現狀。先縱向看,上面是生產的客戶端,包括了7種語言。然後是我們的生產代理服務。在中間的是我們的訊息儲存層。目前主要的訊息儲存引擎是RocketMQ。然後還有一些在遷移過程中的Kafka。另一個是Chronos,它是我們延遲訊息的一個儲存引擎。

再下面就是消費代理。消費代理同樣提供了多種語言的客戶端,還支援多種協議的訊息主動推送功能,包括HTTP 協議 RESTful方式。結合我們的groovy指令碼功能,還能實現將訊息直接轉存到Redis、Hbase和HDFS上。此外,我們還在陸續接入更多的下游儲存。

除了儲存系統之外,我們也對接了實時計算平臺,例如Flink,Spark,Storm,左邊是我們的使用者控制檯和運維控制檯。這個是我們服務化的重點。使用者在需要使用佇列的時候,就通過介面申請Topic,填寫各種資訊,包括身份資訊,訊息的峰值流量,訊息大小,訊息格式等等。然後消費方通過我們的介面,就可以申請消費。

運維控制檯,主要負責我們叢集的管理,自動化部署,流量排程,狀態顯示之類的功能。最後所有運維和使用者操作會影響線上的配置,都會通過ZooKeeper進行同步。

為什麼選擇RocketMQ

我們圍繞以下兩個緯度進行了對比測試,結果顯示RocketMQ的效果更好。

2.1 測試-topic數量的支援

測試環境:Kafka 0.8.2,RocketMQ 3.4.6,1.0 Gbps Network,16 threads

  • 測試結果如下:

image_20181014114750847

這張圖是Kafka和RocketMQ在不同topic數量下的吞吐測試。橫座標是每秒訊息數,縱座標是測試case。同時覆蓋了有無消費,和不同訊息體的場景。一共8組測試資料,每組資料分別在Topic個數為16、32、64、128、256時獲得的,每個topic包括8個Partition。下面四組資料是傳送訊息大小為128位元組的情況,上面四種是傳送2k訊息大小的情況。on 表示訊息傳送的時候,同時進行訊息消費,off表示僅進行訊息傳送。

先看最上面一組資料,用的是Kafka,開啟消費,每條訊息大小為2048位元組可以看到,隨著Topic數量增加,到256 Topic之後,吞吐極具下降。第二組是是RocketMQ。可以看到,Topic增大之後,影響非常小。第三組和第四組,是上面兩組關閉了消費的情況。結論基本類似,整體吞吐量會高那麼一點點。

下面的四組跟上面的區別是使用了128位元組的小訊息體。可以看到,Kafka吞吐受Topic數量的影響特別明顯。對比來看,雖然topic比較小的時候,RocketMQ吞吐較小,但是基本非常穩定,對於我們這種共享叢集來說比較友好。

2.2 測試-延遲

  • Kafka

測試環境:Kafka 0.8.2.2,topic=1/8/32,Ack=1/all,replica=3

測試結果:

image_20181014153801681

上面的一組的3條線對應Ack=3,需要3個備份都確認後才完成資料的寫入。下面的一組的3條線對應Ack=1,有1個備份收到資料後就可以完成寫入。可以看到下面一組只需要主備份確認的寫入,延遲明顯較低。每組的三條線之間主要是Topic數量的區別,Topic數量增加,延遲也增大了。

  • RocketMQ

測試環境:

RocketMQ 3.4.6,brokerRole=ASYNC/SYNC_MASTER, 2 Slave,

flushDiskType=SYNC_FLUSH/ASYNC_FLUSH

測試結果:

image_20181014153954985

上面兩條是同步刷盤的情況,延遲相對比較高。下面的是非同步刷盤。橙色的線是同步主從,藍色的線是非同步主從。然後可以看到在副本同步複製的情況下,即橙色的線,4w的TPS之內都不超過1ms。用這條橙色的線和上面Kafka的圖中的上面三條線橫向比較來看,Kafka超過1w TPS 就超過1ms了。Kafka的延遲明顯更高。

如何構建自己的訊息佇列

3.1 問題與挑戰

challenge

面臨的挑戰(順時針看)

  • 客戶端語言,需要支援PHP、Go、Java、C++;
  • 只有3個開發人員;
  • 決定用RocketMQ,但是沒看過原始碼;
  • 上線時間緊,線上的Kafka還有問題;
  • 可用性要求高。

使用RocketMQ時的兩個問題:

  • 客戶端語言支援不全,以Java為主,而我們還需要支援PHP、Go、C++;
  • 功能特別多,如tag、property、消費過濾、RETRYtopic、死信佇列、延遲消費之類的功能,但這對我們穩定性維護來說,挑戰非常大。

針對以上兩個問題的解決辦法,如下圖所示:

image_20181014164732619

  • 使用ThriftRPC框架來解決跨語言的問題;
  • 簡化呼叫介面。可以認為只有兩個介面,send用來生產,pull用來消費。

主要策略就是堅持KISS原則(Keep it simple, stupid),保持簡單,先解決最主要的問題,讓訊息能夠流轉起來。然後我們把其他主要邏輯都放在了proxy這一層來做,比如限流、許可權認證、訊息過濾、格式轉化之類的。這樣,我們就能儘可能地簡化客戶端的實現邏輯,不需要把很多功能用各種語言都寫一遍。

3.2 遷移方案

架構確定後,接下來是我們的一個遷移過程。

image_20181014183257694

遷移這個事情,在pub-sub的訊息模型下,會比較複雜。因為下游的資料消費方可能很多,上游的資料沒法做到一刀切流量,這就會導致整個遷移的週期特別長。然後我們為了儘可能地減少業務遷移的負擔,加快遷移的效率,我們在Proxy層提供了雙寫和雙讀的功能。

  • 雙寫:ProcucerProxy同時寫RocketMQ和Kafka;
  • 雙讀:ConsumerProxy同時從RocketMQ和Kafka消費資料。

有了這兩個功能之後,我們就能提供以下兩種遷移方案了。

3.2.1 雙寫

生產端雙寫,同時往Kafka和RocketMQ寫同樣的資料,保證兩邊在整個遷移過程中都有同樣的全量資料。Kafka和RocketMQ有相同的資料,這樣下游的業務也就可以開始遷移。如果消費端不關心丟資料,那麼可以直接切換,切完直接更新消費進度。如果需要保證消費必達,可以先在ConsumerProxy設定消費進度,消費客戶端保證沒有資料堆積後再去遷移,這樣會有一些重複訊息,一般客戶端會保證消費處理的冪等。

生產端的雙寫其實也有兩種方案:

客戶端雙寫,如下圖:

image_20181014184201561

業務那邊不停原來的kafka 客戶端。只是加上我們的客戶端,往RocketMQ裡追加寫。這種方案在整個遷移完成之後,業務還需要把老的寫入停掉。相當於兩次上線。

Producer Proxy雙寫,如下圖:

image_20181014191156474

業務方直接切換生產的客戶端,只往我們的proxy上寫資料。然後我們的proxy負責把資料複製,同時寫到兩個儲存引擎中。這樣在遷移完成之後,我們只需要在Proxy上關掉雙寫功能就可以了。對生產的業務方來說是無感知的,生產方全程只需要改造一次,上一下線就可以了。

所以表面看起來,應該還是第二種方案更加簡單。但是,從整體可靠性的角度來看,一般還是認為第一種相對高一點。因為客戶端到Kafka這一條鏈路,業務之前都已經跑穩定了。一般不會出問題。但是寫我們Proxy就不一定了,在接入過程中,是有可能出現一些使用上的問題,導致資料寫入失敗,這就對業務方測試質量的要求會高一點。然後消費的遷移過程,其實風險是相對比較低的。出問題的時候,可以立即回滾。因為它在老的Kafka上消費進度,是一直保留的,而且在遷移過程中,可以認為是全量雙消費。

以上就是資料雙寫的遷移方案,這種方案的特點就是兩個儲存引擎都有相同的全量資料。

3.2.2 雙讀

特點:保證不會重複消費。對於P2P 或者消費下游不太多,或者對重複消費資料比較敏感的場景比較適用。

image_20181014192956694

這個方案的過程是這樣的,消費先切換。全部遷移到到我們的Proxy上消費,Proxy從Kafka上獲取。這個時候RocketMQ上沒有流量。但是我們的消費Proxy保證了雙消費,一旦RocketMQ有流量了,客戶端同樣也能收到。然後生產方改造客戶端,直接切流到RocketMQ中,這樣就完成了整個流量遷移過程。執行一段時間,比如Kafka裡的資料都過期之後,就可以把消費Proxy上的雙消費關了,下掉Kafka叢集。

整個過程中,生產直接切流,所以資料不會重複儲存。然後在消費遷移的過程中,我們消費Proxy上的group和業務原有的group可以用一個名字,這樣就能實現遷移過程中自動rebalance,這樣就能實現沒有大量重複資料的效果。所以這個方案對重複消費比較敏感的業務會比較適合的。這個方案的整個過程中,消費方和生產方都只需要改造一遍客戶端,上一次線就可以完成。

RocketMQ擴充套件改造

說完遷移方案,這裡再簡單介紹一下,我們在自己的RocketMQ分支上做的一些比較重要的事情。

首先一個非常重要的一點是主從的自動切換。

熟悉RocketMQ的同學應該知道,目前開源版本的RocketMQ broker 是沒有主從自動切換的。如果你的Master掛了,那你就寫不進去了。然後slave只能提供只讀的功能。當然如果你的topic在多個主節點上都建立了,雖然不會完全寫不進去,但是對單分片順序消費的場景,還是會產生影響。所以呢,我們就自己加了一套主從自動切換的功能。

第二個是批量生產的功能。

RocketMQ4.0之後的版本是支援批量生產功能的。但是限制了,只能是同一個ConsumerQueue的。這個對於我們的Proxy服務來說,不太友好,因為我們的proxy是有多個不同的topic的,所以我們就擴充套件了一下,讓它能夠支援不同Topic、不同Consume Queue。原理上其實差不多,只是在傳輸的時候,把Topic和Consumer Queue的資訊都編碼進去。

第三個,元資訊管理的改造。

目前RocketMQ單機能夠支援的Topic數量,基本在幾萬這麼一個量級,在增加上去之後,元資訊的管理就會非常耗時,對整個吞吐的效能影響相對來說就會非常大。然後我們有個場景又需要支援單機百萬左右的Topic數量,所以我們就改造了一下元資訊管理部分,讓RocketMQ單機能夠支撐的Topic數量達到了百萬。

後面一些就不太重要了,比如集成了我們公司內部的一些監控和部署工具,修了幾個bug,也給提了PR。最新版都已經fix掉了。

RocketMQ使用經驗

接下來,再簡單介紹一下,我們在RocketMQ在使用和運維上的一些經驗。主要是涉及在磁碟IO效能不夠的時候,一些引數的調整。

5.1 讀老資料的問題

我們都知道,RocketMQ的資料是要落盤的,一般只有最新寫入的資料才會在PageCache中。比如下游消費資料,因為一些原因停了一天之後,又突然起來消費資料。這個時候就需要讀磁碟上的資料。然後RocketMQ的訊息體是全部儲存在一個append only的 commitlog 中的。如果這個叢集中混雜了很多不同topic的資料的話,要讀的兩條訊息就很有可能間隔很遠。最壞情況就是一次磁碟IO讀一條訊息。這就基本等價於隨機讀取了。如果磁碟的IOPS(Input/Output Operations Per Second)扛不住,還會影響資料的寫入,這個問題就嚴重了。

值得慶幸的是,RocketMQ提供了自動從Slave讀取老資料的功能。這個功能主要由slaveReadEnable這個引數控制。預設是關的(slaveReadEnable = false bydefault)。推薦把它開啟,主從都要開。這個引數開啟之後,在客戶端消費資料時,會判斷,當前讀取訊息的物理偏移量跟最新的位置的差值,是不是超過了記憶體容量的一個百分比(accessMessageInMemoryMaxRatio= 40 by default)。如果超過了,就會告訴客戶端去備機上消費資料。如果採用非同步主從,也就是brokerRole等於ASYNC_AMSTER的時候,你的備機IO打爆,其實影響不太大。但是如果你採用同步主從,那還是有影響。所以這個時候,最好掛兩個備機。因為RocketMQ的主從同步複製,只要一個備機響應了確認寫入就可以了,一臺IO打爆,問題不大。

5.2 過期資料刪除

RocketMQ預設資料保留72個小時(fileReservedTime=72)。然後它預設在凌晨4點開始刪過期資料(deleteWhen="04")。你可以設定多個值用分號隔開。因為資料都是定時刪除的,所以在磁碟充足的情況,資料的最長保留會比你設定的還多一天。又由於預設都是同一時間,刪除一整天的資料,如果用了機械硬碟,一般磁碟容量會比較大,需要刪除的資料會特別多,這個就會導致在刪除資料的時候,磁碟IO被打滿。這個時候又要影響寫入了。

為了解決這個問題,可以嘗試多個方法,一個是設定檔案刪除的間隔,有兩個引數可以設定,

  • deleteCommitLogFilesInterval = 100(毫秒)。每刪除10個commitLog檔案的時間間隔;
  • deleteConsumeQueueFilesInterval=100(毫秒)。每刪除一個ConsumeQueue檔案的時間間隔。

另外一個就是增加刪除頻率,把00-23都寫到deleteWhen,就可以實現每個小時都刪資料。

5.3 索引

預設情況下,所有的broker都會建立索引(messageIndexEnable=true)。這個索引功能可以支援按照訊息的uniqId,訊息的key來查詢訊息體。索引檔案實現的時候,本質上也就是基於磁碟的個一個hashmap。如果broker上訊息數量比較多,查詢的頻率比較高,這也會造成一定的IO負載。所以我們的推薦方案是在Master上關掉了index功能,只在slave上開啟。然後所有的index查詢全部在slave上進行。當然這個需要簡單修改一下MQAdminImpl裡的實現。因為預設情況下,它會向Master發出請求。

歡迎加入企業級網際網路架構交流釘釘群,群號:21704851

-> 歡迎關注“阿里巴巴中介軟體”,加入中介軟體開發者群,與技術同行。

_