1. 程式人生 > >Adaptive Execution如何讓Spark SQL更高效更好用

Adaptive Execution如何讓Spark SQL更高效更好用

1 背  景

Spark SQL / Catalyst 和 CBO 的優化,從查詢本身與目標資料的特點的角度儘可能保證了最終生成的執行計劃的高效性。但是

  • 執行計劃一旦生成,便不可更改,即使執行過程中發現後續執行計劃可以進一步優化,也只能按原計劃執行;

  • CBO 基於統計資訊生成最優執行計劃,需要提前生成統計資訊,成本較大,且不適合資料更新頻繁的場景;

  • CBO 基於基礎表的統計資訊與操作對資料的影響推測中間結果的資訊,只是估算,不夠精確。

本文介紹的 Adaptive Execution 將可以根據執行過程中的中間資料優化後續執行,從而提高整體執行效率。核心在於兩點:

  • 執行計劃可動態調整

  • 調整的依據是中間結果的精確統計資訊

2 動態設定 Shuffle Partition2.1 Spark Shuffle 原理

Spark Shuffle 一般用於將上游 Stage 中的資料按 Key 分割槽,保證來自不同 Mapper (表示上游 Stage 的 Task)的相同的 Key 進入相同的 Reducer (表示下游 Stage 的 Task)。一般用於 group by 或者 Join 操作。

 

如上圖所示,該 Shuffle 總共有 2 個 Mapper 與 5 個 Reducer。每個 Mapper 會按相同的規則(由 Partitioner 定義)將自己的資料分為五份。每個 Reducer 從這兩個 Mapper 中拉取屬於自己的那一份資料。

2.2 原有 Shuffle 的問題

使用 Spark SQL 時,可通過spark.sql.shuffle.partitions指定 Shuffle 時 Partition 個數,也即 Reducer 個數。

該引數決定了一個 Spark SQL Job 中包含的所有 Shuffle 的 Partition 個數。如下圖所示,當該引數值為 3 時,所有 Shuffle 中 Reducer 個數都為 3。

 

這種方法有如下問題:

  • Partition 個數不宜設定過大;

  • Reducer(代指 Spark Shuffle 過程中執行 Shuffle Read 的 Task) 個數過多,每個 Reducer 處理的資料量過小。大量小 Task 造成不必要的 Task 排程開銷與可能的資源排程開銷(如果開啟了 Dynamic Allocation);

  • Reducer 個數過大,如果 Reducer 直接寫 HDFS 會生成大量小檔案,從而造成大量 addBlock RPC,Name node 可能成為瓶頸,並影響其它使用 HDFS 的應用;

  • 過多 Reducer 寫小檔案,會造成後面讀取這些小檔案時產生大量 getBlock RPC,對 Name node 產生衝擊;

  • Partition 個數不宜設定過小

    • 每個 Reducer 處理的資料量太大,Spill 到磁碟開銷增大;

    • Reducer GC 時間增長;

    • Reducer 如果寫 HDFS,每個 Reducer 寫入資料量較大,無法充分發揮並行處理優勢;

  • 很難保證所有 Shuffle 都最優

    • 不同的 Shuffle 對應的資料量不一樣,因此最優的 Partition 個數也不一樣。使用統一的 Partition 個數很難保證所有 Shuffle 都最優;

    • 定時任務不同時段資料量不一樣,相同的 Partition 數設定無法保證所有時間段執行時都最優;

2.3 自動設定 Shuffle Partition 原理

如 Spark Shuffle 原理 一節圖中所示,Stage 1 的 5 個 Partition 資料量分別為 60MB,40MB,1MB,2MB,50MB。其中 1MB 與 2MB 的 Partition 明顯過小(實際場景中,部分小 Partition 只有幾十 KB 及至幾十位元組)。

開啟 Adaptive Execution 後:

  • Spark 在 Stage 0 的 Shuffle Write 結束後,根據各 Mapper 輸出,統計得到各 Partition 的資料量,即 60MB,40MB,1MB,2MB,50MB;

  • 通過 ExchangeCoordinator 計算出合適的 post-shuffle Partition 個數(即 Reducer)個數(本例中 Reducer 個數設定為 3);

  • 啟動相應個數的 Reducer 任務;

  • 每個 Reducer 讀取一個或多個 Shuffle Write Partition 資料(如下圖所示,Reducer 0 讀取 Partition 0,Reducer 1 讀取 Partition 1、2、3,Reducer 2 讀取 Partition 4)。

 

三個 Reducer 這樣分配是因為:

  • targetPostShuffleInputSize 預設為 64MB,每個 Reducer 讀取資料量不超過 64MB;

  • 如果 Partition 0 與 Partition 2 結合,Partition 1 與 Partition 3 結合,雖然也都不超過 64 MB。但讀完 Partition 0 再讀 Partition 2,對於同一個 Mapper 而言,如果每個 Partition 資料比較少,跳著讀多個 Partition 相當於隨機讀,在HDD 上效能不高;

  • 目前的做法是隻結合相臨的 Partition,從而保證順序讀,提高磁碟 IO 效能;

  • 該方案只會合併多個小的 Partition,不會將大的 Partition 拆分,因為拆分過程需要引入一輪新的 Shuffle;

  • 基於上面的原因,預設 Partition 個數(本例中為 5)可以大一點,然後由 ExchangeCoordinator 合併。如果設定的 Partition 個數太小,Adaptive Execution 在此場景下無法發揮作用。

由上圖可見,Reducer 1 從每個 Mapper 讀取 Partition 1、2、3 都有三根線,是因為原來的 Shuffle 設計中,每個 Reducer 每次通過 Fetch 請求從一個特定 Mapper 讀資料時,只能讀一個 Partition 的資料。也即在上圖中,Reducer 1 讀取 Mapper 0 的資料,需要 3 輪 Fetch 請求。對於 Mapper 而言,需要讀三次磁碟,相當於隨機 IO。

為了解決這個問題,Spark 新增介面,一次 Shuffle Read 可以讀多個 Partition 的資料。如下圖所示,Task 1 通過一輪請求即可同時讀取 Task 0 內 Partition 0、1 和 2 的資料,減少了網路請求數量。同時 Mapper 0 一次性讀取並返回三個 Partition 的資料,相當於順序 IO,從而提升了效能。

 

由於 Adaptive Execution 的自動設定 Reducer 是由 ExchangeCoordinator 根據 Shuffle Write 統計資訊決定的,因此即使在同一個 Job 中不同 Shuffle 的 Reducer 個數都可以不一樣,從而使得每次 Shuffle 都儘可能最優。

上文 原有 Shuffle 的問題 一節中的例子,在啟用 Adaptive Execution 後,三次 Shuffle 的 Reducer 個數從原來的全部為 3 變為 2、4、3。

 

2.4 使用與優化方法

可通過spark.sql.adaptive.enabled=true啟用 Adaptive Execution 從而啟用自動設定 Shuffle Reducer 這一特性。

通過spark.sql.adaptive.shuffle.targetPostShuffleInputSize可設定每個 Reducer 讀取的目標資料量,其單位是位元組,預設值為 64 MB。上文例子中,如果將該值設定為 50 MB,最終效果仍然如上文所示,而不會將 Partition 0 的 60MB 拆分。具體原因上文已說明。

3 動態調整執行計劃3.1 固定執行計劃的不足

在不開啟 Adaptive Execution 之前,執行計劃一旦確定,即使發現後續執行計劃可以優化,也不可更改。如下圖所示,SortMergJoin 的 Shuffle Write 結束後,發現 Join 一方的 Shuffle 輸出只有 46.9KB,仍然繼續執行 SortMergeJoin。

 

此時完全可將 SortMergeJoin 變更為 BroadcastJoin 從而提高整體執行效率。

3.2 SortMergeJoin 原理

SortMergeJoin 是常用的分散式 Join 方式,它幾乎可使用於所有需要 Join 的場景。但有些場景下,它的效能並不是最好的。

SortMergeJoin 的原理如下圖所示:

  • 將 Join 雙方以 Join Key 為 Key 按照 HashPartitioner 分割槽,且保證分割槽數一致;

  • Stage 0 與 Stage 1 的所有 Task 在 Shuffle Write 時,都將資料分為 5 個 Partition,並且每個 Partition 內按 Join Key 排序;

  • Stage 2 啟動 5 個 Task 分別去 Stage 0 與 Stage 1 中所有包含 Partition 分割槽資料的 Task 中取對應 Partition 的資料。(如果某個 Mapper 不包含該 Partition 的資料,則 Redcuer 無須向其發起讀取請求);

  • Stage 2 的 Task 2 分別從 Stage 0 的 Task 0、1、2 中讀取 Partition 2 的資料,並且通過 MergeSort 對其進行排序;

  • Stage 2 的 Task 2 分別從 Stage 1 的 Task 0、1 中讀取 Partition 2 的資料,且通過 MergeSort 對其進行排序;

  • Stage 2 的 Task 2 在上述兩步 MergeSort 的同時,使用 SortMergeJoin 對二者進行 Join。

 

3.3 BroadcastJoin 原理

當參與 Join 的一方足夠小,可全部置於 Executor 記憶體中時,可使用 Broadcast 機制將整個 RDD 資料廣播到每一個 Executor 中,該 Executor 上執行的所有 Task 皆可直接讀取其資料。(本文中,後續配圖,為了方便展示,會將整個 RDD 的資料置於 Task 框內,而隱藏 Executor)。

對於大 RDD,按正常方式,每個 Task 讀取並處理一個 Partition 的資料,同時讀取 Executor 內的廣播資料,該廣播資料包含了小 RDD 的全量資料,因此可直接與每個 Task 處理的大 RDD 的部分資料直接 Join。

 

根據 Task 內具體的 Join 實現的不同,又可分為 BroadcastHashJoin 與 BroadcastNestedLoopJoin。後文不區分這兩種實現,統稱為 BroadcastJoin。

與 SortMergeJoin 相比,BroadcastJoin 不需要 Shuffle,減少了 Shuffle 帶來的開銷,同時也避免了 Shuffle 帶來的資料傾斜,從而極大地提升了 Job 執行效率。

同時,BroadcastJoin 帶來了廣播小 RDD 的開銷。另外,如果小 RDD 過大,無法存於 Executor 記憶體中,則無法使用 BroadcastJoin。

對於基礎表的 Join,可在生成執行計劃前,直接通過 HDFS 獲取各表的大小,從而判斷是否適合使用 BroadcastJoin。但對於中間表的 Join,無法提前準確判斷中間表大小從而精確判斷是否適合使用 BroadcastJoin。

《Spark SQL 效能優化再進一步 CBO 基於代價的優化》一文介紹的 CBO 可通過表的統計資訊與各操作對資料統計資訊的影響,推測出中間表的統計資訊,但是該方法得到的統計資訊不夠準確。同時該方法要求提前分析表,具有較大開銷。

而開啟 Adaptive Execution 後,可直接根據 Shuffle Write 資料判斷是否適用 BroadcastJoin。

3.4 動態調整執行計劃原理

如上文 SortMergeJoin 原理 中配圖所示,SortMergeJoin 需要先對 Stage 0 與 Stage 1 按同樣的 Partitioner 進行 Shuffle Write。

Shuffle Write 結束後,可從每個 ShuffleMapTask 的 MapStatus 中統計得到按原計劃執行時 Stage 2 各 Partition 的資料量以及 Stage 2 需要讀取的總資料量。(一般來說,Partition 是 RDD 的屬性而非 Stage 的屬性,本文為了方便,不區分 Stage 與 RDD。可以簡單認為一個 Stage 只有一個 RDD,此時 Stage 與 RDD 在本文討論範圍內等價)。

如果其中一個 Stage 的資料量較小,適合使用 BroadcastJoin,無須繼續執行 Stage 2 的 Shuffle Read。相反,可利用 Stage 0 與 Stage 1 的資料進行 BroadcastJoin,如下圖所示。

具體做法是:

  • 將 Stage 1 全部 Shuffle Write 結果廣播出去

  • 啟動 Stage 2,Partition 個數與 Stage 0 一樣,都為 3

  • 每個 Stage 2 每個 Task 讀取 Stage 0 每個 Task 的 Shuffle Write 資料,同時與廣播得到的 Stage 1 的全量資料進行 Join

注:廣播資料存於每個 Executor 中,其上所有 Task 共享,無須為每個 Task 廣播一份資料。上圖中,為了更清晰展示為什麼能夠直接 Join 而將 Stage 2 每個 Task 方框內都放置了一份 Stage 1 的全量資料。

雖然 Shuffle Write 已完成,將後續的 SortMergeJoin 改為 Broadcast 仍然能提升執行效率:

  • SortMergeJoin 需要在 Shuffle Read 時對來自 Stage 0 與 Stage 1 的資料進行 Merge Sort,並且可能需要 Spill 到磁碟,開銷較大;

  • SortMergeJoin 時,Stage 2 的所有 Task 需要取 Stage 0 與 Stage 1 的所有 Task 的輸出資料(如果有它要的資料 ),會造成大量的網路連線。且當 Stage 2 的 Task 較多時,會造成大量的磁碟隨機讀操作,效率不高,且影響相同機器上其它 Job 的執行效率;

  • SortMergeJoin 時,Stage 2 每個 Task 需要從幾乎所有 Stage 0 與 Stage 1 的 Task 取資料,無法很好利用 Locality;

  • Stage 2 改用 Broadcast,每個 Task 直接讀取 Stage 0 的每個 Task 的資料(一對一),可很好利用 Locality 特性。最好在 Stage 0 使用的 Executor 上直接啟動 Stage 2 的 Task。如果 Stage 0 的 Shuffle Write 資料並未 Spill 而是在記憶體中,則 Stage 2 的 Task 可直接讀取記憶體中的資料,效率非常高。如果有 Spill,那可直接從本地檔案中讀取資料,且是順序讀取,效率遠比通過網路隨機讀資料效率高。

3.5 使用與優化方法

該特性的使用方式如下:

  • spark.sql.adaptive.enabledspark.sql.adaptive.join.enabled都設定為true時,開啟 Adaptive Execution 的動態調整 Join 功能;

  • spark.sql.adaptiveBroadcastJoinThreshold設定了 SortMergeJoin 轉 BroadcastJoin 的閾值。如果不設定該引數,該閾值與spark.sql.autoBroadcastJoinThreshold的值相等;

  • 除了本文所述 SortMergeJoin 轉 BroadcastJoin,Adaptive Execution 還可提供其它 Join 優化策略。部分優化策略可能會需要增加 Shuffle。spark.sql.adaptive.allowAdditionalShuffle引數決定了是否允許為了優化 Join 而增加 Shuffle。其預設值為 false。

4 自動處理資料傾斜4.1 解決資料傾斜典型方案

《Spark 效能優化之道——解決 Spark 資料傾斜(Data Skew)的 N 種姿勢》一文講述了資料傾斜的危害,產生原因,以及典型解決方法。

  • 保證檔案可 Split 從而避免讀 HDFS 時資料傾斜;

  • 保證 Kafka 各 Partition 資料均衡從而避免讀 Kafka 引起的資料傾斜;

  • 調整並行度或自定義 Partitioner 從而分散分配給同一 Task 的大量不同 Key;

  • 使用 BroadcastJoin 代替 ReduceJoin 消除 Shuffle 從而避免 Shuffle 引起的資料傾斜;

  • 對傾斜 Key 使用隨機字首或字尾從而分散大量傾斜 Key,同時將參與 Join 的小表擴容,從而保證 Join 結果的正確性。

4.2 自動解決資料傾斜

目前 Adaptive Execution 可解決 Join 時資料傾斜問題。其思路可理解為將部分傾斜的 Partition (傾斜的判斷標準為該 Partition 資料是所有 Partition Shuffle Write 中位數的 N 倍) 進行單獨處理,類似於 BroadcastJoin,如下圖所示。

在上圖中,左右兩邊分別是參與 Join 的 Stage 0 與 Stage 1 (實際應該是兩個 RDD 進行 Join,但如同上文所述,這裡不區分 RDD 與 Stage),中間是獲取 Join 結果的 Stage 2。

明顯 Partition 0 的資料量較大,這裡假設 Partition 0 符合“傾斜”的條件,其它 4 個 Partition 未傾斜。

以 Partition 對應的 Task 2 為例,它需獲取 Stage 0 的三個 Task 中所有屬於 Partition 2 的資料,並使用 MergeSort 排序。同時獲取 Stage 1 的兩個 Task 中所有屬於 Partition 2 的資料並使用 MergeSort 排序。然後對二者進行 SortMergeJoin。

對於 Partition 0,可啟動多個 Task:

  • 在上圖中,啟動了兩個 Task 處理 Partition 0 的資料,分別名為 Task 0-0 與 Task 0-1

  • Task 0-0 讀取 Stage 0 Task 0 中屬於 Partition 0 的資料

  • Task 0-1 讀取 Stage 0 Task 1 與 Task 2 中屬於 Partition 0 的資料,並進行 MergeSort

  • Task 0-0 與 Task 0-1 都從 Stage 1 的兩個 Task 中所有屬於 Partition 0 的資料

  • Task 0-0 與 Task 0-1 使用 Stage 0 中屬於 Partition 0 的部分資料與 Stage 1中屬於 Partition 0 的全量資料進行 Join

通過該方法,原本由一個 Task 處理的 Partition 0 的資料由多個 Task 共同處理,每個 Task 需處理的資料量減少,從而避免了 Partition 0 的傾斜。

對於 Partition 0 的處理,有點類似於 BroadcastJoin 的做法。但區別在於,Stage 2 的 Task 0-0 與 Task 0-1 同時獲取 Stage 1 中屬於 Partition 0 的全量資料,是通過正常的 Shuffle Read 機制實現,而非 BroadcastJoin 中的變數廣播實現。

4.3 使用與優化方法

開啟與調優該特性的方法如下:

  • spark.sql.adaptive.skewedJoin.enabled設定為 true 即可自動處理 Join 時資料傾斜;

  • spark.sql.adaptive.skewedPartitionMaxSplits控制處理一個傾斜 Partition 的 Task 個數上限,預設值為 5;

  • spark.sql.adaptive.skewedPartitionRowCountThreshold設定了一個 Partition 被視為傾斜 Partition 的行數下限,也即行數低於該值的 Partition 不會被當作傾斜 Partition 處理。其預設值為 10L * 1000 * 1000 即一千萬;

  • spark.sql.adaptive.skewedPartitionSizeThreshold設定了一個 Partition 被視為傾斜 Partition 的大小下限,也即大小小於該值的 Partition 不會被視作傾斜 Partition。其預設值為 64 * 1024 * 1024 也即 64MB;

  • spark.sql.adaptive.skewedPartitionFactor該引數設定了傾斜因子。如果一個 Partition 的大小大於spark.sql.adaptive.skewedPartitionSizeThreshold的同時大於各 Partition 大小中位數與該因子的乘積,或者行數大於spark.sql.adaptive.skewedPartitionRowCountThreshold的同時大於各 Partition 行數中位數與該因子的乘積,則它會被視為傾斜的 Partition。