【Spark調優】聚合操作數據傾斜解決方案
【使用場景】
對RDD執行reduceByKey等聚合類shuffle算子或者在Spark SQL中使用group by語句進行分組聚合時,經過sample或日誌、界面定位,發生了數據傾斜。
【解決方案】
局部聚合+全局聚合,進行兩階段聚合。具體為:
將原本相同的key通過附加隨機前綴的方式,變成多個不同的key,就可以讓原本被一個task處理的數據分散到多個task上去做局部聚合,進而解決單個task處理數據量過多的問題。接著去除掉隨機前綴,再次進行全局聚合,就可以得到最終的結果。
- 第一步:給key傾斜的dataSkewRDD中每個key都打上一個隨機前綴。
例如10以內的隨機數,此時原先一樣的key,包括集中傾斜的key就變成不一樣的了,比如(hello, 1) (hello, 1) (hello, 1) (hello, 1),就會變成(5_hello, 1) (3_hello, 1) (3_hello, 1) (5_hello, 1) (8_hello, 1) (5_hello, 1) ...
- 第二步:對打上隨機前綴的key不再傾斜的randomPrefixRdd進行局部聚合。
接著對打上隨機數後的數據,執行reduceByKey等聚合操作,進行局部聚合時,就不會數據傾斜了。此時,第一步局部聚合的結果,變成了(5_hello, 3) (3_hello, 2) (8_hello, 1)
- 第三步:局部聚合後,去除localAggRdd中每個key的隨機前綴。
此時,第二步局部聚合的結果,變成了(hello, 3) (hello, 2) (hello, 1)
- 第四步:對去除了隨機前綴的removeRandomPrefixRdd進行全局聚合。
得到最終結果(hello, 6)
【方案優點】
對於聚合類的shuffle操作導致的數據傾斜,效果不錯,通常都可以解決數據傾斜問題,至少大幅緩解數據傾斜,將Spark作業的性能提升數倍以上。
【代碼實現】
我對上述方案做了代碼實現,見我的github:https://github.com/wwcom614/Spark
Java版實現
Scala版實現
【Spark調優】聚合操作數據傾斜解決方案