1. 程式人生 > >海勝專訪--MaxCompute 與大資料查詢引擎的技術和故事

海勝專訪--MaxCompute 與大資料查詢引擎的技術和故事

在2019大資料技術公開課第一季《技術人生專訪》中,阿里巴巴雲端計算平臺高階技術專家苑海勝為大家分享了《MaxCompute 與大資料查詢引擎的技術和故事》,主要介紹了MaxCompute與MPP Database的異同點,分散式系統上Join的實現,且詳細講解了MaxCompute針對Join和聚合引入的Hash Clustering Table和Range Clustering Table的優化。

以下內容根據演講視訊以及PPT整理而成。


一、MaxCompute VS MPP Database

MaxCompute 與 MPP Database有非常大的不同,主要體現在效能(Performance)、成本(Cost)、可擴充套件性(Scalability)及靈活性(Flexibility)等度量緯度。

  • 效能(Performance):作為一個數據倉庫,大家首先關心的指標是效能。MPP Database典型的產品有Greenplum,Vertica和Redshift等,它們主要針對的線上實時資料的分析,效能要求一般是毫秒級別。而MaxCompute多數場景應用在離線資料下,MaxCompute需要動態的拉起程序和資料封裝,如果進行MapReduce還涉及資料落地,所以離線資料的分析會比較慢,這也導致MaxCompute無法適用於實時場景。但在大量資料場景下,MaxCompute會展示出優勢,它可以動態調整Instance數量,保證有足夠多的Instance處理資料。 而MPP Database一旦開啟了固定的Cluster和Node之後,資料量較大時會受到叢集計算資源的限制。
  • 成本(Cost):MaxCompute在cost層面佔較大優勢。首先,資料儲存在阿里雲上,計算部分也只需要為所付出的計算資源付費,不計算時只需為儲存資源付費。而MPP Database一旦開啟一定的資源,即使不使用也需要付費。
  • 可擴充套件性(Scalability):阿里雲在起初也使用過MPP Database,MPP Database剛開始就設定了固定的cluster,但是由於阿里雲內部的業務資料在不斷的增加,導致計算資源嚴重不足。MaxCompute可以動態分配資源,根據計算的複雜度實時調整Instance數量,保證較高的可擴充套件性。
  • 靈活性(Flexibility):MaxCompute不僅可以處理SQL的查詢,還可以處理MapReduce,以及能夠查詢Machine Learning節點。由於MaxCompute的高擴充套件性和靈活性,它可以支援阿里雲內部95%的資料計算,承載的任務也非常多。
     

二、分散式系統上Join的實現

Query Plan Generation流程:首先使用者會提交SQL給Parser,Parser將其編譯成Relation節點,然後將Relation的節點交給優化器Optimizer,經過一系列的優化,其中包括根據物理轉化和邏輯轉化。Cost model從中選擇代價最低的物理的執行計劃。Transformer將最優的計劃轉成Physical Operator tree,並且將Physical Operator tree交給Manager。Manager啟動例項,並交給RunTime執行此次query。這過程中,Cost model從Metadata中獲取統計資訊(如表的寬度和行數),來選擇最優的計劃,Apache Calcite被用來作用於Optimizer的框架。

Optimizer Core Components:邏輯運算子(Logical Operator)主要描述要做哪些事情,如LogicalInnerJoin做InnerJoin, LogicalScan做掃描,LogicalFilter做過濾。物理運算子(Physical Operator)主要描述怎麼做,如HashJoin,MergeJoin及NLJoin代表不同的演算法,IndexScan表示索引掃描,TableScan是全標掃描,PhysicalFilter是物理過濾器。邏輯運算子(Logical Operator)可以通過Logical Transformation Rules轉化為新的邏輯運算子,還可以通過Logical Implementation Rules轉化成物理運算子,如從InnerJoin轉化成HashJoin。另外,物理運算子(Physical Operator)可以通過屬性的強化(Physical Property Enforcement)產生新的物理運算子(Physical Operator),如通過Distribution滿足分佈的屬性,通過Sort滿足排序的屬性。

下圖展示了MaxCompute如何生成一個Join Plan。首先,Inner Join通過PartitionedJoinRule產生物理的plan,既Sort Merge Join,它存在盤古系統中,不滿足分佈的屬性,所以MaxCompute需要進行Exchange。

也就是按照T1.a和T2.b進行Shuffle,Shuffle之後進行Sort Merge。有相同T1.a的值和T2.b的值會分在同一個bucket中。不同的bucket啟動多個Instance,每個Instance處理每個bucket,從而進行分散式計算。其中在Shuffle時佔用了較多的資源,它不僅有資料的讀寫,還包括排序。如何儘量減少排序從而加快資料處理速度是優化的關鍵。

假設T1或T2較小,那麼可以將T2的全表廣播到T1進行Hash Join,好處是T1不需要多次Shuffle,T2也不需要進行Hash計算和排序。這時Join Plan只包含兩個stage,M2 stage對T2進行掃描,之後廣播到T1。T1不需要進行Shuffle,使用T2全表的資料建Hash表,再通過T1部分資料進行Hash Build,最後得到Hash Join的結果。

三、MaxCompute針對Join和聚合引入的Hash Clustering Table和Range Clustering的優化


1.Hash Clustering Table

分散式系統上Join的實現會涉及非常多次的Shuffle,為此MaxCompute建立了Hash Clustering Table來實現優化。Hash Clustering Table對選擇的column進行Hash,將不同的資料分配到不同的bucket裡面,這也就說明在建立Hash Clustering Table時,已經進行了Shuffle和排序。基本語法如下圖,clustered by 表明按照column進行Shuffle,sorted by 是按照column進行排序,number of buckets 推薦設定成2的n次方,方便與其它表進行Join。同時也推薦將clustered by和sorted by中的column設定為一樣或者clustered by中的column包含sorted by中的column。因為Hash Clustering Table通常被用來做Join和Shuffle Remove,可以利用它已有的屬性從而去除掉多餘的Shuffle和排序,實現優化的目的。

詳細步驟如下圖,Merge Join對T1傳送請求,拉取T1的屬性。假設T1為Hash Clustering Table,T1反饋是按照T1.a進行Hash,Hash到100個bucket,同時按照T1.a進行排序。T2同理。這時產生的Join Plan就滿足了M1,M2和R3的排序,最後所有的operator只需一個stage(M1),不需要多餘的Shuffle。

與之相反,T2的反饋如果是None,Merge Join會發送請求,使T2按照T2.b進行Hash和排序,設定100個bucket。這時產生的Join Plan包含M1和M2兩個stage,T2需要Shuffle,T1則不需要Shuffle,消除了一個stage的Shuffle。

假如T2的反饋是按照T2.b進行Hash,Hash到100個bucket,但排序不是T2.b。那麼Merge Join 依然請求T2按照T2.b排序。這時Join Plan還是僅僅會有M1一個stage,其中只是多了Sort Operator,但沒有多餘的Shuffle。

如果T2設定了200個bucket,T1的100個bucket會被讀兩遍,進行過濾,T1的1個bucket會對應T2的2個bucket。這時依然沒有Shuffle。

Hash Clustering Table的限制:Hash Clustering Table在Data Skew方面有明顯的限制。當資料量非常大,將這些資料Hash到一個bucket中導致的後果便是拖慢整個cluster的計算速度。Hash Clustering Table只支援等值的bucket pruning,如果按照a分配bucket,a=5,對5獲取Hash值,同時對Hash桶進行取模,那麼Hash Clustering Table可以定位出a=5具體在哪個bucket中。但如果不等值,Hash Clustering Table便無法支援。Hash Clustering Table 要求所有的clustering key出現聚合key或者Join key中。在CLUSTERED BY C1, C2; GROUP BY C1情況下,Hash Clustering Table無法實現優化。同樣,CLUSTERED BY C1, C2; … Join .. ON a.C1 == b.C1 也無法實現優化,Hash Clustering Table 要求Join key 包含C1和C2。

2.Range Clustering Table

Range Clustering Table 顧名思義,按照Range進行排序。MaxCompute自動的決定每個bucket的範圍。

Range Clustering Table怎樣確定bucket的範圍?如下圖,第一層是Mappers,中間是Job Manager,下一層是Reducers。首先在Stage1進行排序,之後從中抽取直方圖,每個Worker將直方圖傳送給Job Manager。Job Manager合併直方圖,根據資料量的大小決定合併成多少個bucket。Job Manager在將Bucket的範圍再發送給Mappers,由Mappers決定每一條資料傳送到具體哪個bucket。最後Reducers會得到具體的Aggregation Stage。

Range Clustering Table的優勢非常明顯,首先Range Clustering Table支援範圍比較(Range Comparison)。 同時它可以支援在prefix keys上的聚合和Join,既在CLUSTERED BY C1, C2; GROUP BY C1 情況下,Range Clustering Table也可以支援優化。

Range Clustering Table如何實現Join:假設T1和T2的Range如下圖,因為範圍不同無法直接Join。這時需要進行範圍的切分,將切分後的範圍交給Join Workers,由它讀取新的範圍。如下圖,w0讀取T1的切分範圍,將T2表的不必要範圍剔除。

Range Clustering如何按照prefix keys進行Join:Join on prefix keys需要直方圖和bucket的重新分配。假設按照a和b進行clustering,從直方圖中可以知道a是從哪個地方切分的。對bucket重新分配之後可以更新bucket的範圍,最後將新的bucket的範圍傳送給Join Worker

下圖展示了在range表和normal表中TPCH的查詢時間的對比。可以發現,速度總體上提升了60-70%,其中query 5, 17和21達到了數倍的速度的提升。


3.Tips for Clustering Table 

如何選擇正確的clustering keys,從而達到節省資源和降低速度的目的?下面有幾點提示可以提供給大家。首先,如果有Join condition keys,Distinct values,Where condition keys(EQUAL/IN, GT/GE/LT/BETWEEN),那麼可以針對這些已有的keys建立 Clustering Table。如果是Aggregate keys ,可以選擇建立Range Clustering Table。對於 Window keys, 可以根據Partition keys和Order keys 建立clustering keys和sort keys。舉例如下,SELECT row_number() OVER (PARTITION BY a ORDER BY b) FROM foo; 那麼Optimizer執行Window時產生的plan是CLUSTERED BY a SORTED BY a,b INTO x BUCKETS,既按照a Shuffle,按照a和b進行排序。在一個bucket中a的值不可能都相同,與a不同的值可以認為是一個frame,在frame中還需要進行b排序,所以每個Instance是按照a和b進行排序。如此便省去了預先的計算,既不需要Shuffle也無需排序。

此外,需要注意即使兩個Hash表是同樣的分佈,排序和bucket數量,但如果型別不同依然需要進行Shuffle,因為它們的binary表達方式不同,所以Hash的結果也會不同。另外,Clustering Table建立時耗費時間較長,假如建立Clustering Table之後並沒有頻繁查詢,也會造成浪費。還需要注意Clustering Table儘量避免Data Skew。再一個,使用FULL OUTER Join增量更新時需要進行改寫。

使用FULL OUTER Join進行增量更新: 如下圖,分別是snapshot表和delta表,Join keys 是s.key和d.key,但在向新的partition插入表示式時無法判斷新的SQL表示式是否滿足資料的排序,所以還需要對資料進行再一次的Shuffle。下圖中對SQL表示式進行了ANTI JOIN和 UNION ALL的改寫,ANTI JOIN可以利用排序的屬性,同時UNION ALL也是按照原來的key的分佈和排序,如此就可以完全做到Shuffle Remove。

Clustering Table分割槽建議:建立Clustering Table時需要考慮分割槽的大小,太小的分割槽本身優化空間就不大反而可能引入小檔案問題。假設設定1000個bucket就會生成1000個小檔案,而這些小檔案會對Mappers造成很大的壓力。另外,分割槽讀寫比越高的表 cluster後可能得到的收益越大。由於建立Clustering Table耗時較多,那麼讀的頻率較多就會有較大的優勢。最後,欄位利用率越高(列裁剪較少)的表,cluster後可能得到的收益越大。如果列裁剪之後使用到資料利用率較低,這表明浪費了較多的時間,所以cluster後的收益也不會很大。

 

原文連結
本文為雲棲社群原創內容,未經