1. 程式人生 > >Spark 3.0 新特性 之 自適應查詢與分割槽動態裁剪

Spark 3.0 新特性 之 自適應查詢與分割槽動態裁剪

Spark憋了一年半的大招後,釋出了3.0版本,新特性主要與Spark SQL和Python相關。這也恰恰說明了大資料方向的兩大核心:BI與AI。下面是本次釋出的主要特性,包括效能、API、生態升級、資料來源、SQL相容、監控和除錯等方面的升級。 ![](https://mmbiz.qpic.cn/mmbiz_png/4PPc462eOOPdjv7bTVmctkhe9KNTlHtRWVDvAx5zs5MjlvA9IbgpKHtlXV7qtw5e870DKvyeJofgao8szl8A0A/640?wx_fmt=png&tp=webp&wxfrom=5&wx_lazy=1&wx_co=1) 本次主要整理了效能方面的優化,包括了自適應查詢與動態分割槽裁剪。 # 1 自適應查詢 AQE,Adaptive Query Execution,說的簡單點就是讓Spark在執行中根據蒐集到的資訊靈活採取優化手段,提升效能。 說起這個可以先回想下Spark的發展歷史,在1.x時代Spark通過RDD的程式設計形成DAG圖,這個階段可以說沒啥優化完全是按照規則來執行;在2.x時代,引入了代價計算,Spark會通過提前進行代價計算,選擇代價最小的查詢計劃(跟大部分的資料庫類似,代價計算依賴於資料本身的統計,如資料量、檔案大小、分割槽數等,由於Spark是儲存與計算分離的模式,因此這些統計資訊有時候會缺失或者不準確,那麼得到的查詢代價自然也就不準確了);在3.x時代,引入自適應查詢,即在執行的過程中可以根據得到的快取資料資訊動態調整分割槽策略、join策略等。這樣就保證了剛開始表的統計資訊不準,可能查詢計劃不是最高效的,但是隨著查詢的執行,可以動態優化整個查詢計劃。 那麼到底自適應都可以做什麼呢? ## 1.1 動態分割槽合併 在Spark的經典優化策略裡,調整分割槽數從而改變並行度是最基本的優化手段,可以調整的分割槽數卻不是那麼容易找到最優值的。分割槽數太小,可能導致單個分割槽內的資料太多,單個任務的執行效率低下;分割槽數太大,可能導致碎片太多,任務之間來回切換浪費效能。比如經典的shuffle操作後,每個shuffle資料都需要對應的reduce端接收處理,如果分割槽數過多,有可能導致某幾個任務讀取的資料量很小,造成資源的浪費。 ![](https://mmbiz.qpic.cn/mmbiz_png/4PPc462eOOPdjv7bTVmctkhe9KNTlHtROz6iagff6g8pknJ23K8KwtBnj0iajp4Q4yFU71qCHkhSicKAc8pRpicf0w/640?wx_fmt=png&tp=webp&wxfrom=5&wx_lazy=1&wx_co=1) 引入AQE後,Spark會自動把資料量很小的分割槽進行合併處理: ![](https://mmbiz.qpic.cn/mmbiz_png/4PPc462eOOPdjv7bTVmctkhe9KNTlHtRr9t5u15P7Z5iazcnXd3D4gfxibmqhz4nibHqwQwBqclahmCmIoE9kfccQ/640?wx_fmt=png&tp=webp&wxfrom=5&wx_lazy=1&wx_co=1) ## 1.2 動態join策略選擇 在Spark中支援多種join策略,這些策略在不同的分散式框架中差不多。分別是: - Broadcast Hash Join(BHJ),廣播 join - Shuffle Hash Join(SHJ),雜湊 join - Sort Merge Join(SMJ),排序 join BHJ是當小表與大表關聯時,把小表廣播到大表的每個分割槽中,每個分割槽都與完整的小表進行關聯,最後合併得到結果。像Spark會配置一個引數 spark.sql.autoBroadcastJoinThreshold 來決定小於這個配置的表就認為是小表,然後採用廣播策略(預設10MB)。一般廣播的套路是把小表拷貝到driver端,然後分發到每個executor工作節點上,因此如果表的資料太大,會導致來回複製的資料太多,效能低下,因此BHJ僅適用於廣播小表。 SHJ是針對表的資料量過大時,按照分割槽列進行打散,兩張表按照不同的分割槽重新排列資料。不過這種JOIN方法也有個弊端,就是需要對應分割槽的兩張表資料都同時載入完成,才能開始計算。如果兩張表的資料量都很大,有可能會造成分割槽節點記憶體溢位。 SMJ是針對上述的情況,在確定shuffle分割槽後對資料進行排序,這樣兩張表可以不需要等待資料全部載入到記憶體,只要對應的排序資料部分載入完成後就可以提前開始。 總結完三種join策略後,可以發現假設由於資料統計資訊的缺失或不準確,或者是過濾條件的影響,可能會按照原來表的大小判斷join的策略。比如某個表初始的時候15M,達不到廣播join的要求,但是該表在查詢過程中有個filter條件可以讓表僅保留8M的有效資料,此時就可以採用廣播join了。AQE就是利用這種特性,在執行時動態檢測表的大小,當表的大小達到要求後會優化join為廣播join。 ![](https://mmbiz.qpic.cn/mmbiz_png/4PPc462eOOPdjv7bTVmctkhe9KNTlHtR3fxZoSoibSIQNDBoeibeibQdUOpr4vf3R3kLYPSicM5TCKxGqAMEs3lE5A/640?wx_fmt=png&tp=webp&wxfrom=5&wx_lazy=1&wx_co=1) ## 1.3 資料傾斜優化 在分散式查詢中某個查詢任務會同時分拆成多個任務執行在不同的機器上,假設某個任務對應的資料量很大,就會引發資料傾斜的問題。比如下面的兩張表關聯,但是左表的第一個分割槽資料量很多,就會引發資料傾斜問題. ![](https://mmbiz.qpic.cn/mmbiz_png/4PPc462eOOPdjv7bTVmctkhe9KNTlHtRk6Hvnr8ML9TSUPeWQKYEPO9KmYMwPcSeA02j1zX4HgyF4e6xUctYWg/640?wx_fmt=png&tp=webp&wxfrom=5&wx_lazy=1&wx_co=1) AQE可以在執行時檢測到資料傾斜,並把大分割槽分割成多個小分割槽同時與對應的右表進行關聯。 ![](https://mmbiz.qpic.cn/mmbiz_png/4PPc462eOOPdjv7bTVmctkhe9KNTlHtRnpzJFDVM0E4hiaREOVxYk9Z5rPiaTFw50RRXtCWG2DbN7X6gPmjQyfXA/640?wx_fmt=png&tp=webp&wxfrom=5&wx_lazy=1&wx_co=1) # 2 動態分割槽裁剪 這個比較好理解,正常Spark或Hive在查詢時,會根據查詢條件與分割槽欄位自動過濾底層的資料檔案。但是如果過濾條件沒有及時的反映到查詢上,就會導致資料被冗餘載入。比如左邊的是沒有動態分割槽裁剪的情況,兩張表進行關聯操作,左表包含一個過濾條件,右表需要全表讀取。經過動態分割槽優化後,右表可以直接新增過濾條件,如 id in (select id from lefttable where filter_cond) , 這樣可以提前過濾掉部分資料。 # 3 關聯提示 之前在Flink中看到過這種用法,即在sql中使用某種程式碼提示,讓編譯器根據程式碼提示選擇優化策略執行。語法如:/** xxx */。比如 select /** BROADCAST(a) */ * from a join b on a.id = b.id,可以強制a表廣播與b表進行關聯操作。 以上就是主要的效能方面的優化。其他方面由於工作內容涉及的不多,因此就先不過多整理了,感興趣可以去官網或者觀看上面的分享視訊。需要額外一提的是,官方文件也有兩個很重要的調整: 1 增加了SQL相關的文件 ![](https://mmbiz.qpic.cn/mmbiz_png/4PPc462eOOPdjv7bTVmctkhe9KNTlHtRPgeE0XI46LMJQomZvia0cfOkwL422rLyFnczKDMVER4qJ8SK4VxianxA/640?wx_fmt=png&tp=webp&wxfrom=5&wx_lazy=1&wx_co=1) 2 增加了UI方面的說明 ![](https://mmbiz.qpic.cn/mmbiz_png/4PPc462eOOPdjv7bTVmctkhe9KNTlHtR5dYStsnDia3RV8K4FzuoC17Gblj4RNeSLXAffjuibBEWwMFkLDnxibwHw/640?wx_fmt=png&tp=webp&wxfrom=5&wx_lazy=1&wx_co=1) 後續會分享更多Spark相關的原理和特性