來自位元組跳動的管梓越同學一篇關於Apache Hudi在位元組跳動推薦系統中EB級資料量實踐的分享。

接下來將分為場景需求、設計選型、功能支援、效能調優、未來展望五部分介紹Hudi在位元組跳動推薦系統中的實踐。

在推薦系統中,我們在兩個場景下使用資料湖

  1. 我們使用BigTable作為整個系統近線處理的資料儲存,這是一個公司自研的元件TBase,提供了BigTable的語義和搜尋推薦廣告場景下一些需求的抽象,並遮蔽底層儲存的差異。為了更好的理解,這裡可以把它直接看做一個HBase。在這過程中為了能夠服務離線對資料的分析挖掘需求,需要將資料匯出到離線儲存中。在過去使用者或是使用MR/Spark直接訪問儲存,或是通過掃庫的方式獲取資料,不符合OLAP場景下的資料訪問特性。因此我們基於資料湖構建BigTable的CDC,提高資料時效,減少近線系統訪問壓力,提供高效的OLAP訪問和使用者友好的SQL消費方式。
  2. 除此之外,我們還在特徵工程與模型訓練的場景中使用資料湖。我們從內部和外部分別獲得兩類實時資料流,一個是來自系統內部迴流的Instance,包含了推薦系統Serving時獲得的Feature。另一種是來自端上埋點/多種複雜外部資料來源的反饋,這類資料作為Label,和之前的feature共同組成了完整的機器學習樣本。針對這個場景,我們需要實現一個基於主鍵的拼接操作,將Instance和Label Merge到一起。開窗範圍可能長達數十天,千億行量級。需要支援高效得列式選取和謂詞下推。同時還需要支援併發Update等相關能力。

在這兩個場景下存在如下挑戰

  1. 資料的非常不規整。相比Binlog,WAL沒法獲得一行的全部資訊,同時資料大小變化非常大。
  2. 吞吐量比較大,單表吞吐超百GB/s,單表PB級儲存。
  3. 資料Schema 複雜。資料存在高維、稀疏等現象。表列數從1000-10000+都有。並且有大量複雜資料型別。

在引擎選型時,我們考察過Hudi,Iceberg,DeltaLake三個最熱門的資料湖引擎。三者在我們的場景下各有優劣,最終基於Hudi對上下游生態的開放,對全域性索引的支援,對若干儲存邏輯提供了定製化的開發介面等原因,選擇了Hudi作為儲存引擎。

  • 針對實時寫入,選擇了時效性更好的MOR。
  • 考察了索引型別,首先因為WAL不能每次都獲取到資料的分割槽,所以必須要全域性索引。在幾種全域性索引實現中,為了實現高效能的寫入,HBase是唯一的選擇。另外兩種的實現決定了都和HBase在效能有本質上的差距。
  • 在計算引擎上和API上,當時Hudi對Flink的支援還不是特別完善,所以選擇了更為成熟的Spark,為了能靈活實現一些定製功能和邏輯,也因為DataFrame的API語義限制比較多,所以選擇了更底層的RDD API。

功能支援包括儲存語義的MVCC和Schema註冊系統。

首先為了支援WAL語義的寫入,我們實現了針對MVCC的Payload,基於Avro自定義了一套帶時間戳的資料結構實現。並通過檢視訪問的方式對使用者遮蔽了這套邏輯。除此之外還實現了HBase Append的語義,可以實現對List型別的追加寫而非覆蓋寫。

由於Hudi本身的Schema從Write的資料中獲取,這種方式和其他系統對接不是很方便,以及我們需要一些基於Schema的擴充套件功能,所以我們構建了一個元資料中心來提供元資料相關的操作。

  • 首先我們基於一種內部的儲存提供的語義實現了原子變更和異地多活。使用者可以通過介面原子地觸發Schema變更並立刻獲得結果。

  • 並通過加入版本號的方法實現了Schema的多版本,Schema而不是把Json傳來傳去。有了多版本也可以實現Schema更靈活的演進。

  • 我們還支援了列級別的額外資訊編碼,來幫助業務實現一些場景下特有的擴充套件功能。並把列名替換成了數字來節約使用過程中的開銷。

  • Hudi的Spark Job在使用的時候會在JVM級別構建一個local cache並通過pull的方式和元資料中心同步資料,實現Schema的快速訪問和程序內Schema的單例。

在我們場景下效能挑戰比較大,最大單表資料量達400PB+,日增PB級資料量,總資料量達EB級別,因此我們針對性能和資料特性開發做了一些工作來提高效能。

序列化方面包括如下優化

  1. Schema:資料使用Avro序列化開銷特別大,而且消耗資源也非常多。針對這個問題,我們首先借助Schema的JVM單例,規避了序列化過程中很多費CPU的比較操作。
  2. 通過優化Payload邏輯,減少了需要序列化的次數。
  3. 藉助了第三方的Avro序列化實現,通過將序列化過程編譯成位元組碼的方式來提高SerDe的速度以及降低記憶體佔用。對這種序列化形式做了修改,以保證我們的複雜Schema也能夠正常編譯。

對於Compaction流程優化如下

  • Hudi除了預設的Inline/Async compaction選項之外,還支援Compaction的靈活部署。Compaction Job的作業特性和Ingestion作業其實有較大區別。在同一個Spark Application當中不僅不能針對性設定,也存在資源彈性不足的問題。我們首先構建了獨立部署的指令碼,讓Compaction作業可以獨立觸發執行。使用了低成本的混部佇列並可以針對此次Compaction的Plan做資源申請。除此之外還做了基於規則和啟發式的Compaction Strategy,使用者的需求通常是保證天級別或者小時級別的SLA,並針對性地壓縮某些分割槽的資料,所以提供了針對性壓縮的能力。
  • 為了能縮短關鍵Compaction的時間,我們通常會提前做Compaction來避免所有工作都在一個Compaction Job中完成。但是如果一個Compact過的FileGroup又有新的Update,就不得不再次Compact。為了優化整體的效率,我們根據業務資訊對一個FileGroup該在何時被壓縮做了啟發式的排程以減少額外的壓縮損耗。該特性的具體收益還在評估中。
  • 最後我們對Compaction做了一些流程的優化,比如不使用WriteStatus的Cache等等。

HDFS作為一種面向吞吐設計的儲存,在叢集水位比較高的情況下,實時寫入毛刺比較嚴重。通過和HDFS團隊的溝通與合作,做了相關的一些工作。

  • 首先把原有的資料HSync操作替換為HFlush,避免了分散性update導致的磁碟IO寫放大。
  • 針對場景調參做了激進的pipeline切換設定,並且HDFS團隊開發了靈活的可以控制pipeline的api,來實現這個場景下靈活的配置。
  • 最後還通過logfile獨立IO隔離的方式保證了實時寫入的時效性。

還有一些零零碎碎的效能提升,流程修改和Bug Fix,大家感興趣可以找我交流。

未來我們會在以下幾個方面持續迭代。

  • 產品化問題:目前使用的API和調參調優方式對使用者要求很高,尤其是調參和運維,需要對Hudi原理有相當的瞭解才可以完成,不利於使用者推廣使用。

  • 生態對接問題:在我們的場景中,技術棧以Flink為主,未來會探索Flink的使用。除此之外上下游使用的應用和環境也比較複雜,非常需要跨語言和通用的介面實現。目前和Spark繫結過於嚴重。

  • 成本和效能問題:老生常談的話題,由於我們場景比較大,所以在這塊優化上的收益非常可觀。

  • 儲存語義:我們把Hudi當做一種儲存來使用而非一種表格式。所以未來會拓展Hudi的使用場景,需要更豐富的儲存語義,會在這方面做更多的工作。

最後打個廣告,目前我們推薦架構團隊正在招人,工作地包括:北京/上海/杭州/新加坡/山景城等,有興趣的小夥伴可以新增微信qinglingcannotfly或傳送簡歷至郵箱: [email protected]