1. 程式人生 > >【轉】【Spark】Spark 資料傾斜優化方法

【轉】【Spark】Spark 資料傾斜優化方法

大資料梅峰谷 2017-05-19

--------本節內容--------

1.前言

2.Spark資料傾斜

   2.1 資料傾斜現象

     2.1.1 OOM錯誤

     2.1.2 Spark執行緩慢

2.2 資料傾斜原理

    2.3 資料傾斜原因定位

       2.3.1 通過業務經驗定位

       2.3.2 結合Spark原理定位

       2.3.3 檢視資料傾斜Key的分佈

3.資料傾斜解決方法

    3.1 使用broadcast避免shuffle

    3.2 取樣傾斜key並分拆join操作

    3.3 改變並行度緩解資料傾斜

    3.4 兩階段聚合(區域性+全域性)

    3.5 Hive ETL預處理資料

    3.6 過濾少數導致傾斜的key

    3.7自定義Partitioner

4.參考資料

---------------------- 

1.前言

資料傾斜是一個非常普遍的問題,不管你的Spark應用於什麼場景,碰到資料傾斜的現象近乎是100%,因為太多數業務資料都服從2/8原則。所以熱點資料問題是一個常規現象,如何發現、理解和解決資料傾斜問題是我們必須掌握的技巧。並且,資料傾斜的調優是很多Spark程式調優的基礎,這個問題沒有解決,其他的調優手段顯得都很多餘。

2.資料傾斜

2.1 資料傾斜現象

資料傾斜有2個方面的現象

2.1.1 OOM錯誤

過多的資料在同一個task中執行,將會把executor撐爆,造成OOM,程式終止執行。報記憶體溢位的錯誤,這種錯誤已經非常嚴重了,程式直接沒法執行起來,可想而知。OOM 現象(這幾個圖是網上看到的,我覺得很具有代表性)主要有:

1)任務日誌顯示某節點記憶體超過yarn的限制:xx G,被yarn殺掉。

2)開啟webui(:4040/jobs),開啟executor列表,會顯示只有一個worker在工作

3)日誌報錯java.lang.OutOfMemroError:Java heap space,當然實際開發中,不能一看到這個錯誤,就認為是資料傾斜了,你的程式bug,偶然的資料異常等等,都有可能導致記憶體溢位。

4) 觀察CPU,CPU被打到100%,或者使用率非常高

下圖是阿里雲集群監控圖表中的cpu曲線,黑色的曲線是其中一臺worker,靠前的兩次100%就是兩次問題任務執行,紅色的是修改了部分問題之後重新跑了一遍,還是有部分傾斜,但是成功跑完了。最後一段是正常曲線。 

5)觀察記憶體,記憶體爆表

 這裡需要注意一點,Spark實際資料到了記憶體會變的比原來大很多

2.1.2 Spark程式執行慢

多數task執行速度較快,少數task執行時間非常長,執行速度非常慢,雖然能執行,但要花很長時間。這種情況也是不允許的,生產中對任務的執行時間是有要求的,不可能允許毫無止境的執行下去,而且也不允許執行時間不可估算。

上圖,倒數第三列顯示了每個task的執行時間。明顯可以看到,有的task執行特別快,只需要幾秒鐘就可以執行完;而有的task執行特別慢,需要幾分鐘才能執行完,此時單從執行時間上看就已經能夠確定發生資料傾斜了。此外,倒數第一列顯示了每個task處理的資料量,明顯可以看到,執行時間特別短的task只需要處理幾百KB的資料即可,而執行時間特別長的task需要處理幾千KB的資料,處理的資料量差了10倍。此時更加能夠確定是發生了資料傾斜。此時通過WebUI是可以看到在哪個Stage發生了資料傾斜。

2.2 資料傾斜原理

資料傾斜,字面理解,就是資料分佈不均衡的意思。表現在Spark分散式叢集中,資料在各個節點上分佈不均勻,從而某個節點要處理大量的資料,超出了該節點的處理能力。

資料傾斜原理:主要發生在shuffle階段,因為這個階段要發生大量的網路通訊,資料要重新佈局。【shuffle的過程可以參考前面的博文Spark你媽喊你回家吃飯-13Spark計算引擎剖析,參考文獻的第三篇,對shuffle的原理和調優也介紹的非常到位】,在進行shuffle的時候,Spark將各個節點上相同的key拉取到某個節點上的task來處理,比如執行join、groupByKey、reduceByKey、repartion、distinct、agregateByKey、cogroup等操作,此時,如果某個key對應的資料量特別大的話,task就會出現資料傾斜,執行速度非常慢,導致整體任務執行灌滿,甚至失敗。舉個栗子(這個也是來源於網際網路),80%的資料集中在某個節點上,該節點有不可承受之重。

2.3 資料傾斜原因定位

一個理想的分散式程式應該是將計算任務分攤到各個節點,儘量均衡的分攤下去。發現數據傾斜的時候,不要急於提高executor的資源,修改引數或是修改程式,首先要檢查資料本身,是否存在異常資料,結合WebUI和日誌確定和定位是否發生了資料傾斜,那麼如何定位是不是發生了資料傾斜呢,這個既要靠經驗(對業務的理解)也要靠方法(對Spark執行機制的掌握),方法論+經驗論組合判斷。

資料傾斜的罪魁禍首就是Key的不均與分析,那開始Shuffle之前,那個Key是怎麼劃分,怎麼產生的? Key分佈不均不就是key的劃分不合理麼,所以要搞清楚Key在Spark中時怎麼劃分出來的,Key出來之後,是根據什麼方法將Key進行分類的,有幾個reduce就會將Key分成幾類

1)對於鍵值對型別的RDD,還好理解,執行帶有ByKey操作觸發shuffle時,用的key就是鍵值對RDD的Key。

2)對於不是鍵值對RDD的Key,系統自帶的Hash函式產生Key,這個會比較均勻。

所以在在開發的時候,對於帶有ByKey的運算元使用,就要長點心了,不要掉坑裡去了。

2.3.1 通過業務經驗定位

分析業務資料,大多數業務的資料分佈都服從2/8法則,所以定位的時候有以下幾點給予參考:

1)正常資料:結合業務規則,分析熱點資料的分佈,找出hot key

2)空資料:key為null(空值)或是一些無意義的資料構成

3)異常資料:大量重複的測試資料或是對結果影響不大的異常資料。

單獨把key拉出來觀察資料分佈,或者通過抽樣的方式,分析key的分佈

df.select("key").sample(false,0.1).(k=>(k,1)).reduceBykey(_+_).map(k=>(k._2,k._1)).sortByKey(false).take(100)

如果發現多數資料分佈都較為平均,而個別資料比其他資料大上若干個數量級,則說明發生了資料傾斜。

2.3.2 結合Spark原理定位

主要是spark使用不當導致資料傾斜,例如parittion指定的個數不合理等,一些帶byKey的運算元濫用等,結合shuffle原理來判斷是不是會資料傾斜,就要求對spark的機制理解的非常透徹,比如stage的劃分,舉個例子

-----------

val conf = new SparkConf() val sc = new SparkContext(conf) val lines = sc.textFile("hdfs://...") val words = lines.flatMap(_.split(" ")) val pairs = words.map((_, 1)) val wordCounts = pairs.reduceByKey(_ + _) wordCounts.collect().foreach(println(_))

-------------

這裡我們就以Spark最基礎的入門程式——單詞計數來舉例,如何用最簡單的方法大致推算出一個stage對應的程式碼。如下示例,在整個程式碼中,只有一個reduceByKey是會發生shuffle的運算元,因此就可以認為,以這個運算元為界限,會劃分出前後兩個stage。

·stage0,主要是執行從textFile到map操作,以及執行shuffle write操作。shuffle write操作,我們可以簡單理解為對pairs RDD中的資料進行分割槽操作,每個task處理的資料中,相同的key會寫入同一個磁碟檔案內。

·stage1,主要是執行從reduceByKey到collect操作,stage1的各個task一開始執行,就會首先執行shuffle read操作。執行shuffle read操作的task,會從stage0的各個task所在節點拉取屬於自己處理的那些key,然後對同一個key進行全域性性的聚合或join等操作,在這裡就是對key的value值進行累加。stage1在執行完reduceByKey運算元之後,就計算出了最終的wordCounts RDD,然後會執行collect運算元,將所有資料拉取到Driver上,供我們遍歷和列印輸出。

通過對單詞計數程式的分析,希望能夠讓大家瞭解最基本的stage劃分的原理,以及stage劃分後shuffle操作是如何在兩個stage的邊界處執行的。然後我們就知道如何快速定位出發生資料傾斜的stage對應程式碼的哪一個部分了。比如我們在Spark Web UI或者本地log中發現,stage1的某幾個task執行得特別慢,判定stage1出現了資料傾斜,那麼就可以回到程式碼中定位出stage1主要包括了reduceByKey這個shuffle類運算元,此時基本就可以確定是由educeByKey運算元導致的資料傾斜問題。比如某個單詞出現了100萬次,其他單詞才出現10次,那麼stage1的某個task就要處理100萬資料,整個stage的速度就會被這個task拖慢。

2.3.3 檢視資料傾斜Key的分佈

知道了資料傾斜發生在哪裡之後,通常需要分析一下那個執行了shuffle操作並且導致了資料傾斜的RDD/Hive表,檢視一下其中key的分佈情況。這主要是為之後選擇哪一種技術方案提供依據。針對不同的key分佈與不同的shuffle運算元組合起來的各種情況,可能需要選擇不同的技術方案來解決。

此時根據你執行操作的情況不同,可以有很多種檢視key分佈的方式:

·如果是Spark SQL中的group by、join語句導致的資料傾斜,那麼就查詢一下SQL中使用的表的key分佈情況。

·如果是對Spark RDD執行shuffle運算元導致的資料傾斜,那麼可以在Spark作業中加入檢視key分佈的程式碼,比如RDD.countByKey()。然後對統計出來的各個key出現的次數,collect/take到客戶端列印一下,就可以看到key的分佈情況。

舉例來說,對於上面所說的單詞計數程式,如果確定了是stage1的reduceByKey運算元導致了資料傾斜,那麼就應該看看進行reduceByKey操作的RDD中的key分佈情況,在這個例子中指的就是pairs RDD。如下示例,我們可以先對pairs取樣10%的樣本資料,然後使用countByKey運算元統計出每個key出現的次數,最後在客戶端遍歷和列印樣本資料中各個key的出現次數。

---------------

val sampledPairs = pairs.sample(false, 0.1) val sampledWordCounts = sampledPairs.countByKey() sampledWordCounts.foreach(println(_))

---------------

總結起來,有以下幾點:

1)Web UI,可以清晰看見哪些個Task執行的資料量大小;

2)Log,Log的一個好處是可以清晰的告訴是哪一行出現問題OOM,同時可以清晰的看到在具體哪個Stage出現了資料傾斜(資料傾斜一般是在Shuffle過程中產生的),從而定位具體Shuffle的程式碼;也有可能發現絕大多數Task非常快,但是個別Task非常慢;

3)程式碼走讀,重點看join、groupByKey、reduceByKey等關鍵程式碼;

4)結合業務,對資料特徵分佈進行分析。

3.資料傾斜解決方法

3.1 使用broadcast避免shuffle

1)儘可能的將Reduce端放在Map端,就避免了shuffle,避免了shuffle就在很大情況下化解了資料傾斜的問題

2)什麼叫MappedReduce,整個spark是Rdd的鏈式操作,我們的DAGScheduler根據不同得RDD型別的依賴關係劃分成不同的stage,不同型別依賴關係就是寬依賴和窄依賴。寬依賴的時候把stage換分成更小的stage,我們想做的事是把寬依賴減掉,避免掉shuffle,把操作直接放在map端。從stage角度講,後邊stage都是前面前面stage都是後邊stage的map的。對我們解決資料傾斜很有幫助。

3)使用broadcast避免shuffle

 MapReduce過程:RDD1和RDD2要進行join操作,spark根據key在reduce階段劃分了2個task,然後發生shuffle,資料根據key的情況將資料移動相應的executor所在節點,組成成RDD3,RDD3下有2個task,這2 task拿到shuffle後的資料,各自進行join操作,最後將結果合併。

 分析:分析發現,RDD1的資料非常大,RDD2的資料很小,可以將RDD2廣播到RDD1,這樣計算就發生在Map階段了,有效避免了shuffle

4)舉例說明

用broadcast + filter來代替join,這種優化是一種特定場景的神器,就是拿大的RDD A去join一個小的RDD B,比如有這樣兩個RDD:

·A的結構為(name, age, sex),表示全國人民的RDD,超大

·B的結果為(age, title),表示“年齡 -> 稱號”的對映,比如60歲有稱號“花甲之年”,70歲則是“古稀之年”,這個RDD顯然很小,因為人的年齡範圍在0~200歲之間,而且有的“年齡”還沒有“稱號”

現在我要從全國人民中找出這些有稱號的人來。如果直接寫成:

A.map{case (name, age, sex) => (age, (name, sex))} .join(B) .map{case (age, ((name, sex), title)) => (name, age, sex)}

你就可以想象,執行的時候超大的A被打散和分發到各個節點去。而且更要命的是,為了恢復一開始的(name, age, sex)的結構,又做了一次map,而這次map一樣導致shuffle。兩次shuffle,太瘋狂了。但是如果這樣寫:

val b = sc.broadcast(B.collectAsMap) A.filter{case (name, age, sex) => b.values.contains(age)}

一次shuffle都沒有,A老老實實待著不動,等著全量的B被分發過來。

5)優點和缺點

 優點:1)有效避免shuffle的發生,2)broadcast是程序級別,並且只讀的,spark task是執行緒級別,所以很安全;3)spark sql中有小表,會自動進行broadcast,要配置點引數,配置資訊的分佈也可以broadcast。

 缺點:1)要廣播出去的rdd資料要比較小,否則也會導致OOM,2)並且對GC也會有影響,因為廣播變數是常住記憶體的,很容易變成老年代,不適合兩個rdd資料量都非常大情況

場景:兩個要shuffle的表,有一個rdd資料非常小很適合使用。那兩個表都很大要進行join該怎麼優化,1)優化key,2)取樣

3.2 取樣傾斜key並分拆join操作

適用場景:兩個RDD/Hive表進行join的時候,如果資料量都比較大,那麼此時可以看一下兩個RDD/Hive表中的key分佈情況。如果出現數據傾斜,是因為其中某一個RDD/Hive表中的少數幾個key的資料量過大,而另一個RDD/Hive表中的所有key都分佈比較均勻,那麼採用這個解決方案是比較合適的。

方案實現思路:

1)統計熱門Key

對包含少數幾個資料量過大的key的那RDD,通過sample運算元取樣出一份樣本來,然後統計一下每個key的數量,計算出來資料量最大的是哪幾個key。

2)拆分熱門Key的RDD

然後將這幾個key對應的資料從原來的RDD中拆分出來,形成一個單獨的RDD,並給每個key都打上n以內的隨機數作為字首,而不會導致傾斜的大部分key形成另外一個也會導致傾斜的RDD。

3)擴容被Join的RDD

因為前面是隨機加了隨機數,所以需要將要參與join的另一個RDD,也過濾出來那幾個傾斜key對應的資料並形成一個單獨的RDD,將每條資料膨脹成n條資料,這n條資料都按順序附加一個0~n的字首,不會導致傾斜的大部分key也形成另外一個RDD。

4)執行Join被拆分和被擴容的RDD

再將附加了隨機字首的獨立RDD與另一個膨脹n倍的獨立RDD進行join,此時就可以將原先相同的key打散成n份,分散到多個task中去進行join了。

5)執行非熱門RDDjoin

另外兩個普通的RDD就照常join即可。

6)合併熱門RDD和非熱門RDD join 後的結果

最後將兩次join的結果使用union運算元合併起來即可,就是最終的join結果。

實現原理:對於join導致的資料傾斜,如果只是某幾個key導致了傾斜,可以將少數幾個key分拆成獨立RDD,並附加隨機字首打散成n份去進行join,此時這幾個key對應的資料就不會集中在少數幾個task上,而是分散到多個task進行join了。具體原理見下圖。

方案優點:對於join導致的資料傾斜,如果只是某幾個key導致了傾斜,採用該方式可以用最有效的方式打散key進行join。而且只需要針對少數傾斜key對應的資料進行擴容n倍,不需要對全量資料進行擴容。避免了佔用過多記憶體。

方案缺點:如果導致傾斜的key特別多的話,比如成千上萬個key都導致資料傾斜,那麼這種方式也不適合。

舉個栗子:程式碼demo如下

-----------------

// 首先從包含了少數幾個導致資料傾斜key的rdd1中,取樣10%的樣本資料。 

JavaPairRDD<Long, String> sampledRDD = rdd1.sample(false, 0.1);

// 對樣本資料RDD統計出每個key的出現次數,並按出現次數降序排序。

// 對降序排序後的資料,取出top 1或者top 100的資料,也就是key最多的前n個數據。

// 具體取出多少個數據量最多的key,由大家自己決定,我們這裡就取1個作為示範。 

JavaPairRDD<Long, Long> mappedSampledRDD = sampledRDD.mapToPair(  

new PairFunction<Tuple2<Long,String>, Long, Long>() {  

private static final long serialVersionUID = 1L;   

@Override  

public Tuple2<Long, Long> call(Tuple2<Long, String> tuple) 

throws Exception {

return new Tuple2<Long, Long>(tuple._1, 1L); 

}     

});

JavaPairRDD<Long, Long> countedSampledRDD = mappedSampledRDD.reduceByKey(  

new Function2<Long, Long, Long>() {

private static final long serialVersionUID = 1L;

@Override

public Long call(Long v1, Long v2) throws Exception {

return v1 + v2; } });

JavaPairRDD<Long, Long> reversedSampledRDD = countedSampledRDD.mapToPair(

new PairFunction<Tuple2<Long,Long>, Long, Long>() {

private static final long serialVersionUID = 1L;

@Override

public Tuple2<Long, Long> call(Tuple2<Long, Long> tuple)

throws Exception {

return new Tuple2<Long, Long>(tuple._2, tuple._1); 

}

});

//取出導致資料傾斜的Key出來,只取出了一個 

final Long skewedUserid = reversedSampledRDD.sortByKey(false).take(1).get(0)._2; 

// 從rdd1中分拆出導致資料傾斜的key,形成獨立的RDD。 

JavaPairRDD<Long, String> skewedRDD = rdd1.filter(

new Function<Tuple2<Long,String>, Boolean>() {

private static final long serialVersionUID = 1L;

@Override

public Boolean call(Tuple2<Long, String> tuple) throws Exception {

return tuple._1.equals(skewedUserid);

}

});

// 從rdd1中分拆出不導致資料傾斜的普通key,形成獨立的RDD。 

JavaPairRDD<Long, String> commonRDD = rdd1.filter(

new Function<Tuple2<Long,String>, Boolean>() {

private static final long serialVersionUID = 1L;

@Override

public Boolean call(Tuple2<Long, String> tuple) throws Exception {

return !tuple._1.equals(skewedUserid);

}

});

// rdd2,就是那個所有key的分佈相對較為均勻的rdd。

// 這裡將rdd2中,前面獲取到的key對應的資料,過濾出來,分拆成單獨的rdd,並對rdd中的資料使用flatMap運算元都擴容100倍

// 對擴容的每條資料,都打上0~100的字首。

JavaPairRDD<String, Row> skewedRdd2 = rdd2.filter(

new Function<Tuple2<Long,Row>, Boolean>() {

private static final long serialVersionUID = 1L;

@Override

public Boolean call(Tuple2<Long, Row> tuple) throws Exception {

return tuple._1.equals(skewedUserid);

}

}).flatMapToPair(

new PairFlatMapFunction<Tuple2<Long,Row>, String, Row>() {

private static final long serialVersionUID = 1L;

@Override

public Iterable<Tuple2<String, Row>> call(

Tuple2<Long, Row> tuple) throws Exception {

Random random = new Random();

List<Tuple2<String, Row>> list = new ArrayList<Tuple2<String, Row>>();

for(int i = 0; i < 100; i++) {

list.add(new Tuple2<String, Row>(i + "_" + tuple._1, tuple._2));

}

return list;

}

});

// 將rdd1中分拆出來的導致傾斜的key的獨立rdd,每條資料都打上100以內的隨機字首。 

// 然後將這個rdd1中分拆出來的獨立rdd,與上面rdd2中分拆出來的獨立rdd,進行join。 

JavaPairRDD<Long, Tuple2<String, Row>> joinedRDD1 = skewedRDD.mapToPair(

new PairFunction<Tuple2<Long,String>, String, String>() {

private static final long serialVersionUID = 1L;

@Override 

public Tuple2<String, String> call(Tuple2<Long, String> tuple)

throws Exception {

Random random = new Random();

int prefix = random.nextInt(100);

return new Tuple2<String, String>(prefix + "_" + tuple._1, tuple._2);

}).join(skewedUserid2infoRDD).mapToPair(

new PairFunction<Tuple2<String,Tuple2<String,Row>>, Long, Tuple2<String, Row>>() { 

private static final long serialVersionUID = 1L;

@Override 

public Tuple2<Long, Tuple2<String, Row>> call(

Tuple2<String, Tuple2<String, Row>> tuple)

throws Exception {

long key = Long.valueOf(tuple._1.split("_")[1]);

return new Tuple2<Long, Tuple2<String, Row>>(key, tuple._2);

}); 

// 將rdd1中分拆出來的包含普通key的獨立rdd,直接與rdd2進行join。 

JavaPairRDD<Long, Tuple2<String, Row>> joinedRDD2 = commonRDD.join(rdd2);

// 將傾斜key join後的結果與普通key join後的結果,uinon起來。

// 就是最終的join結果。 

JavaPairRDD<Long, Tuple2<String, Row>> joinedRDD = joinedRDD1.union(joinedRDD2);

-----------------

3.3 改變並行度緩解資料傾斜

原理

Spark在做Shuffle時,預設使用HashPartitioner(非Hash Shuffle)對資料進行分割槽。如果並行度設定的不合適,可能造成大量不相同的Key對應的資料被分配到了同一個Task上,造成該Task所處理的資料遠大於其它Task,從而造成資料傾斜。

如果調整Shuffle時的並行度,使得原本被分配到同一Task的不同Key發配到不同Task上處理,則可降低原Task所需處理的資料量,從而緩解資料傾斜問題造成的短板效應。

案例

現有一張測試表,名為student_external,內有10.5億條資料,每條資料有一個唯一的id值。現從中取出id取值為9億到10.5億的共1.5條資料,並通過一些處理,使得id為9億到9.4億間的所有資料對12取模後餘數為8(即在Shuffle並行度為12時該資料集全部被HashPartition分配到第8個Task),其它資料集對其id除以100取整,從而使得id大於9.4億的資料在Shuffle時可被均勻分配到所有Task中,而id小於9.4億的資料全部分配到同一個Task中。處理過程如下:

--------------

INSERT OVERWRITE TABLE test

SELECT CASE WHEN id < 940000000 THEN (9500000  + (CAST (RAND() * 8 AS INTEGER)) * 12 )

ELSE CAST(id/100 AS INTEGER)

END,

name

FROM student_external

WHERE id BETWEEN 900000000 AND 1050000000;

--------------

通過上述處理,一份可能造成後續資料傾斜的測試資料即以準備好。接下來,使用Spark讀取該測試資料,並通過groupByKey(12)對id分組處理,且Shuffle並行度為12。程式碼如下

--------------------

public class SparkDataSkew {

public static void main(String[] args) {

SparkSession sparkSession = SparkSession.builder()

.appName("SparkDataSkewTunning")

.config("hive.metastore.uris", "thrift://hadoop1:9083")

.enableHiveSupport()

.getOrCreate();

Dataset<Row> dataframe = sparkSession.sql( "select * from test");

dataframe.toJavaRDD()

.mapToPair((Row row) -> new Tuple2<Integer, String>(row.getInt(0),row.getString(1)))

.groupByKey(12)

.mapToPair((Tuple2<Integer, Iterable<String>> tuple) -> {

int id = tuple._1();

AtomicInteger atomicInteger = new AtomicInteger(0);

tuple._2().forEach((String name) -> atomicInteger.incrementAndGet());

return new Tuple2<Integer, Integer>(id, atomicInteger.get());

}).count();

sparkSession.stop();

sparkSession.close();

}

}

-----------------

本次實驗所使用叢集節點數為4,每個節點可被Yarn使用的CPU核數為16,記憶體為16GB。使用如下方式提交上述應用,將啟動4個Executor,每個Executor可使用核數為12(該配置並非生產環境下的最優配置,僅用於本文實驗),可用記憶體為12GB。

spark-submit --queue ambari --num-executors 4 --executor-cores 12 --executor-memory 12g --class com.jasongj.spark.driver.SparkDataSkew --master yarn --deploy-mode client SparkExample-with-dependencies-1.0.jar

GroupBy Stage的Task狀態如下圖所示,Task 8處理的記錄數為4500萬,遠大於(9倍於)其它11個Task處理的500萬記錄。而Task 8所耗費的時間為38秒,遠高於其它11個Task的平均時間(16秒)。整個Stage的時間也為38秒,該時間主要由最慢的Task 8決定。

在這種情況下,可以通過調整Shuffle並行度,使得原來被分配到同一個Task(即該例中的Task 8)的不同Key分配到不同Task,從而降低Task 8所需處理的資料量,緩解資料傾斜。

通過groupByKey(48)將Shuffle並行度調整為48,重新提交到Spark。新的Job的GroupBy Stage所有Task狀態如下圖所示。

在這種場景下,調整並行度,並不意味著一定要增加並行度,也可能是減小並行度。如果通過groupByKey(11)將Shuffle並行度調整為11,重新提交到Spark。新Job的GroupBy Stage的所有Task狀態如下圖所示。

從上圖可見,處理記錄數最多的Task 6所處理的記錄數約為1045萬,耗時為23秒。處理記錄數最少的Task 1處理的記錄數約為545萬,耗時12秒。

適用場景

大量不同的Key被分配到了相同的Task造成該Task資料量過大。

解決方案

調整並行度。一般是增大並行度,但有時如本例減小並行度也可達到效果。

優勢

實現簡單,可在需要Shuffle的操作運算元上直接設定並行度或者使用spark.default.parallelism設定。如果是Spark SQL,還可通過SET spark.sql.shuffle.partitions=[num_tasks]設定並行度。可用最小的代價解決問題。一般如果出現數據傾斜,都可以通過這種方法先試驗幾次,如果問題未解決,再嘗試其它方法。

劣勢

適用場景少,只能將分配到同一Task的不同Key分散開,但對於同一Key傾斜嚴重的情況該方法並不適用。並且該方法一般只能緩解資料傾斜,沒有徹底消除問題。從實踐經驗來看,其效果一般。

3.4 兩階段聚合(區域性+全域性)

方案適用場景:對RDD執行reduceByKey等聚合類shuffle運算元或者在Spark SQL中使用group by語句進行分組聚合時,比較適用這種方案。

方案實現思路:這個方案的核心實現思路就是進行兩階段聚合。第一次是區域性聚合,先給每個key都打上一個隨機數,比如10以內的隨機數,此時原先一樣的key就變成不一樣的了,比如(hello, 1) (hello, 1) (hello, 1) (hello, 1),就會變成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接著對打上隨機數後的資料,執行reduceByKey等聚合操作,進行區域性聚合,那麼區域性聚合結果,就會變成了(1_hello, 2) (2_hello, 2)。然後將各個key的字首給去掉,就會變成(hello,2)(hello,2),再次進行全域性聚合操作,就可以得到最終結果了,比如(hello, 4),和3.2【取樣傾斜key並分拆join操作】的方法類似。

方案實現原理:將原本相同的key通過附加隨機字首的方式,變成多個不同的key,就可以讓原本被一個task處理的資料分散到多個task上去做區域性聚合,進而解決單個task處理資料量過多的問題。接著去除掉隨機字首,再次進行全域性聚合,就可以得到最終的結果。具體原理見下圖。

方案優點:對於聚合類的shuffle操作導致的資料傾斜,效果是非常不錯的。通常都可以解決掉資料傾斜,或者至少是大幅度緩解資料傾斜,將Spark作業的效能提升數倍以上。

方案缺點:僅僅適用於聚合類的shuffle操作,適用範圍相對較窄。如果是join類的shuffle操作,還得用其他的解決方案。

3.5 Hive ETL預處理資料

適用場景:導致資料傾斜的是Hive表。如果該Hive表中的資料本身很不均勻(比如某個key對應了100萬資料,其他key才對應了10條資料),而且業務場景需要頻繁使用Spark對Hive表執行某個分析操作,那麼比較適合使用這種技術方案。

實現思路:此時可以評估一下,是否可以通過Hive來進行資料預處理(即通過Hive ETL預先對資料按照key進行聚合,或者是預先和其他表進行join),然後在Spark作業中針對的資料來源就不是原來的Hive表了,而是預處理後的Hive表。此時由於資料已經預先進行過聚合或join操作了,那麼在Spark作業中也就不需要使用原先的shuffle類運算元執行這類操作了。

實現原理:這種方案從根源上解決了資料傾斜,因為徹底避免了在Spark中執行shuffle類運算元,那麼肯定就不會有資料傾斜的問題了。但是這裡也要提醒一下大家,這種方式屬於治標不治本。因為畢竟資料本身就存在分佈不均勻的問題,所以Hive ETL中進行group by或者join等shuffle操作時,還是會出現資料傾斜,導致Hive ETL的速度很慢。我們只是把資料傾斜的發生提前到了Hive ETL中,避免Spark程式發生資料傾斜而已。

優點:實現起來簡單便捷,效果還非常好,完全規避掉了資料傾斜,Spark作業的效能會大幅度提升。

缺點:治標不治本,Hive ETL中還是會發生資料傾斜。

實踐經驗:在一些Java系統與Spark結合使用的專案中,會出現Java程式碼頻繁呼叫Spark作業的場景,而且對Spark作業的執行效能要求很高,就比較適合使用這種方案。將資料傾斜提前到上游的Hive ETL,每天僅執行一次,只有那一次是比較慢的,而之後每次Java呼叫Spark作業時,執行速度都會很快,能夠提供更好的使用者體驗。

實踐經驗:在美團·點評的互動式使用者行為分析系統中使用了這種方案,該系統主要是允許使用者通過Java Web系統提交資料分析統計任務,後端通過Java提交Spark作業進行資料分析統計。要求Spark作業速度必須要快,儘量在10分鐘以內,否則速度太慢,使用者體驗會很差。所以我們將有些Spark作業的shuffle操作提前到了Hive ETL中,從而讓Spark直接使用預處理的Hive中間表,儘可能地減少Spark的shuffle操作,大幅度提升了效能,將部分作業的效能提升了6倍以上。

3.6 過濾少數導致傾斜的key

適用場景:如果發現導致傾斜的key就少數幾個,而且對計算本身的影響並不大的話,那麼很適合使用這種方案。比如99%的key就對應10條資料,但是隻有一個key對應了100萬資料,從而導致了資料傾斜。

實現思路:如果我們判斷那少數幾個資料量特別多的key,對作業的執行和計算結果不是特別重要的話,那麼幹脆就直接過濾掉那少數幾個key。比如,在Spark SQL中可以使用where子句過濾掉這些key或者在Spark Core中對RDD執行filter運算元過濾掉這些key。如果需要每次作業執行時,動態判定哪些key的資料量最多然後再進行過濾,那麼可以使用sample運算元對RDD進行取樣,然後計算出每個key的數量,取資料量最多的key過濾掉即可。

實現原理:將導致資料傾斜的key給過濾掉之後,這些key就不會參與計算了,自然不可能產生資料傾斜。

優點:實現簡單,而且效果也很好,可以完全規避掉資料傾斜。

缺點:適用場景不多,大多數情況下,導致傾斜的key還是很多的,並不是只有少數幾個。

實踐經驗:在專案中我們也採用過這種方案解決資料傾斜。有一次發現某一天Spark作業在執行的時候突然OOM了,追查之後發現,是Hive表中的某一個key在那天資料異常,導致資料量暴增。因此就採取每次執行前先進行取樣,計算出樣本中資料量最大的幾個key之後,直接在程式中將那些key給過濾掉。

3.7 自定義partitioner

原理

使用自定義的Partitioner(預設為HashPartitioner),將原本被分配到同一個Task的不同Key分配到不同Task。

案例

以上述3.3 例子繼續,將併發度設定為12,但是在groupByKey運算元上,使用自定義的Partitioner(實現如下)

------------------------

.groupByKey(new Partitioner() {

@Override

public int numPartitions() {

return 12;

}

@Override

public int getPartition(Object key) {

int id = Integer.parseInt(key.toString());

if(id >= 9500000 && id <= 9500084 && ((id - 9500000) % 12) == 0) {

return (id - 9500000) / 12;

} else {

return id % 12;

}

}

})

------------------------

由下圖可見,使用自定義Partition後,耗時最長的Task 6處理約1000萬條資料,用時15秒。並且各Task所處理的資料集大小相當。

適用場景

大量不同的Key被分配到了相同的Task造成該Task資料量過大。

解決方案

使用自定義的Partitioner實現類代替預設的HashPartitioner,儘量將所有不同的Key均勻分配到不同的Task中。

優勢

不影響原有的並行度設計。如果改變並行度,後續Stage的並行度也會預設改變,可能會影響後續Stage。

劣勢

適用場景有限,只能將不同Key分散開,對於同一Key對應資料集非常大的場景不適用。效果與調整並行度類似,只能緩解資料傾斜而不能完全消除資料傾斜。而且需要根據資料特點自定義專用的Partitioner,不夠靈活。

4.參考資料

1.https://yq.aliyun.com/articles/62541spark 資料傾斜的一些表現

2.https://www.iteblog.com/archives/1671.html Spark效能優化:資料傾斜調優

3.https://www.iteblog.com/archives/1672.html shuffle調優

4.http://www.jasongj.com/spark/skew/ 解決Spark資料傾斜(Data Skew)的N種姿勢