1. 程式人生 > >【Spark調優】聚合操作數據傾斜解決方案

【Spark調優】聚合操作數據傾斜解決方案

local fix tar 2pc rand shu spark調優 16px 優點

【使用場景】

  對RDD執行reduceByKey等聚合類shuffle算子或者在Spark SQL中使用group by語句進行分組聚合時,經過sample或日誌、界面定位,發生了數據傾斜。

【解決方案】

  局部聚合+全局聚合,進行兩階段聚合。具體為:

  將原本相同的key通過附加隨機前綴的方式,變成多個不同的key,就可以讓原本被一個task處理的數據分散到多個task上去做局部聚合,進而解決單個task處理數據量過多的問題。接著去除掉隨機前綴,再次進行全局聚合,就可以得到最終的結果。

  •   第一步:給key傾斜的dataSkewRDD中每個key都打上一個隨機前綴。

  例如10以內的隨機數,此時原先一樣的key,包括集中傾斜的key就變成不一樣的了,比如(hello, 1) (hello, 1) (hello, 1) (hello, 1),就會變成(5_hello, 1) (3_hello, 1) (3_hello, 1) (5_hello, 1) (8_hello, 1) (5_hello, 1) ...

  •   第二步:對打上隨機前綴的key不再傾斜的randomPrefixRdd進行局部聚合。

  接著對打上隨機數後的數據,執行reduceByKey等聚合操作,進行局部聚合時,就不會數據傾斜了。此時,第一步局部聚合的結果,變成了(5_hello, 3) (3_hello, 2) (8_hello, 1)

  •   第三步:局部聚合後,去除localAggRdd中每個key的隨機前綴。

  此時,第二步局部聚合的結果,變成了(hello, 3) (hello, 2) (hello, 1)

  •   第四步:對去除了隨機前綴的removeRandomPrefixRdd進行全局聚合。

  得到最終結果(hello, 6)

【方案優點

  對於聚合類的shuffle操作導致的數據傾斜,效果不錯,通常都可以解決數據傾斜問題,至少大幅緩解數據傾斜,將Spark作業的性能提升數倍以上。

【代碼實現】

  我對上述方案做了代碼實現,見我的github:https://github.com/wwcom614/Spark

  Java版實現

  Scala版實現

【Spark調優】聚合操作數據傾斜解決方案