1. 程式人生 > >Spark資料傾斜的完美解決

Spark資料傾斜的完美解決

資料傾斜解決方案

資料傾斜的解決,跟之前講解的效能調優,有一點異曲同工之妙。

效能調優中最有效最直接最簡單的方式就是加資源加並行度,並注意RDD架構(複用同一個RDD,加上cache快取)。相對於前面,shuffle、jvm等是次要的。

6.1、原理以及現象分析

6.1.1、資料傾斜怎麼出現的

在執行shuffle操作的時候,是按照key,來進行values的資料的輸出、拉取和聚合的。

同一個key的values,一定是分配到一個reduce task進行處理的。

多個key對應的values,比如一共是90萬。可能某個key對應了88萬資料,被分配到一個task上去面去執行。

另外兩個task,可能各分配到了1萬資料,可能是數百個key,對應的1萬條資料。

這樣就會出現資料傾斜問題。

想象一下,出現數據傾斜以後的執行的情況。很糟糕!

其中兩個task,各分配到了1萬資料,可能同時在10分鐘內都執行完了。另外一個task有88萬條,88 * 10 =  880分鐘 = 14.5個小時。

大家看,本來另外兩個task很快就執行完畢了(10分鐘),但是由於一個拖後腿的傢伙,第三個task,要14.5個小時才能執行完,就導致整個spark作業,也得14.5個小時才能執行完。

資料傾斜,一旦出現,是不是效能殺手?!

6.1.2、發生資料傾斜以後的現象

Spark資料傾斜,有兩種表現:

1、你的大部分的task,都執行的特別特別快,(你要用client模式,standalone client,yarn client,本地機器一執行spark-submit指令碼,就會開始列印log),task175 finished,剩下幾個task,執行的特別特別慢,前面的task,一般1s可以執行完5個,最後發現1000個task,998,999 task,要執行1個小時,2個小時才能執行完一個task。

出現以上loginfo,就表明出現數據傾斜了。

這樣還算好的,因為雖然老牛拉破車一樣非常慢,但是至少還能跑。

2、另一種情況是,執行的時候,其他task都執行完了,也沒什麼特別的問題,但是有的task,就是會突然間報了一個OOM,JVM Out Of Memory,記憶體溢位了,task failed,task lost,resubmitting task。反覆執行幾次都到了某個task就是跑不通,最後就掛掉。

某個task就直接OOM,那麼基本上也是因為資料傾斜了,task分配的數量實在是太大了!所以記憶體放不下,然後你的task每處理一條資料,還要建立大量的物件,記憶體爆掉了。

這樣也表明出現數據傾斜了。

這種就不太好了,因為你的程式如果不去解決資料傾斜的問題,壓根兒就跑不出來。

作業都跑不完,還談什麼效能調優這些東西?!

6.1.3、定位資料傾斜出現的原因與出現問題的位置

根據log去定位

出現數據傾斜的原因,基本只可能是因為發生了shuffle操作,在shuffle的過程中,出現了資料傾斜的問題。因為某個或者某些key對應的資料,遠遠的高於其他的key。

1、你在自己的程式裡面找找,哪些地方用了會產生shuffle的運算元,groupByKey、countByKey、reduceByKey、join

2、看log

log一般會報是在你的哪一行程式碼,導致了OOM異常。或者看log,看看是執行到了第幾個stage。spark程式碼,是怎麼劃分成一個一個的stage的。哪一個stage生成的task特別慢,就能夠自己用肉眼去對你的spark程式碼進行stage的劃分,就能夠通過stage定位到你的程式碼,到底哪裡發生了資料傾斜。

6.2、聚合源資料以及過濾導致傾斜的key

資料傾斜解決方案,第一個方案和第二個方案,一起來講。這兩個方案是最直接、最有效、最簡單的解決資料傾斜問題的方案。

第一個方案:聚合源資料。

第二個方案:過濾導致傾斜的key。

後面的五個方案,尤其是最後4個方案,都是那種特別狂拽炫酷吊炸天的方案。但沒有第一二個方案簡單直接。如果碰到了資料傾斜的問題。上來就先考慮第一個和第二個方案看能不能做,如果能做的話,後面的5個方案,都不用去搞了。

有效、簡單、直接才是最好的,徹底根除了資料傾斜的問題。

6.2.1、方案一:聚合源資料

一些聚合的操作,比如groupByKey、reduceByKey,groupByKey說白了就是拿到每個key對應的values。reduceByKey說白了就是對每個key對應的values執行一定的計算。

這些操作,比如groupByKey和reduceByKey,包括之前說的join。都是在spark作業中執行的。

spark作業的資料來源,通常是哪裡呢?90%的情況下,資料來源都是hive表(hdfs,大資料分散式儲存系統)。hdfs上儲存的大資料。hive表中的資料通常是怎麼出來的呢?有了spark以後,hive比較適合做什麼事情?hive就是適合做離線的,晚上凌晨跑的,ETL(extract transform load,資料的採集、清洗、匯入),hive sql,去做這些事情,從而去形成一個完整的hive中的資料倉庫。說白了,資料倉庫,就是一堆表。

spark作業的源表,hive表,通常情況下來說,也是通過某些hive etl生成的。hive etl可能是晚上凌晨在那兒跑。今天跑昨天的資料。

資料傾斜,某個key對應的80萬資料,某些key對應幾百條,某些key對應幾十條。現在咱們直接在生成hive表的hive etl中對資料進行聚合。比如按key來分組,將key對應的所有的values全部用一種特殊的格式拼接到一個字串裡面去,比如“key=sessionid, value: action_seq=1|user_id=1|search_keyword=火鍋|category_id=001;action_seq=2|user_id=1|search_keyword=涮肉|category_id=001”。

對key進行group,在spark中,拿到key=sessionid,values<Iterable>。hive etl中,直接對key進行了聚合。那麼也就意味著,每個key就只對應一條資料。在spark中,就不需要再去執行groupByKey+map這種操作了。直接對每個key對應的values字串進行map操作,進行你需要的操作即可。

spark中,可能對這個操作,就不需要執行shffule操作了,也就根本不可能導致資料傾斜。

或者是對每個key在hive etl中進行聚合,對所有values聚合一下,不一定是拼接起來,可能是直接進行計算。reduceByKey計算函式應用在hive etl中,從而得到每個key的values。

聚合源資料方案第二種做法是,你可能沒有辦法對每個key聚合出來一條資料。那麼也可以做一個妥協,對每個key對應的資料,10萬條。有好幾個粒度,比如10萬條裡面包含了幾個城市、幾天、幾個地區的資料,現在放粗粒度。直接就按照城市粒度,做一下聚合,幾個城市,幾天、幾個地區粒度的資料,都給聚合起來。比如說

city_id date area_id

select ... from ... group by city_id

儘量去聚合,減少每個key對應的數量,也許聚合到比較粗的粒度之後,原先有10萬資料量的key,現在只有1萬資料量。減輕資料傾斜的現象和問題。

6.2.2、方案二:過濾導致傾斜的key

如果你能夠接受某些資料在spark作業中直接就摒棄掉不使用。比如說,總共有100萬個key。只有2個key是資料量達到10萬的。其他所有的key,對應的數量都是幾十萬。

這個時候,你自己可以去取捨,如果業務和需求可以理解和接受的話,在你從hive表查詢源資料的時候,直接在sql中用where條件,過濾掉某幾個key。

那麼這幾個原先有大量資料,會導致資料傾斜的key,被過濾掉之後,那麼在你的spark作業中,自然就不會發生資料傾斜了。

6.3、提高shuffle操作reduce並行度

6.3.1、問題描述

第一個和第二個方案,都不適合做,然後再考慮這個方案。

將reduce task的數量變多,就可以讓每個reduce task分配到更少的資料量。這樣的話也許就可以緩解甚至是基本解決掉資料傾斜的問題。

6.3.2、提升shuffle reduce端並行度的操作方法

很簡單,主要給我們所有的shuffle運算元,比如groupByKey、countByKey、reduceByKey。在呼叫的時候,傳入進去一個引數。那個數字,就代表了那個shuffle操作的reduce端的並行度。那麼在進行shuffle操作的時候,就會對應著建立指定數量的reduce task。

這樣的話,就可以讓每個reduce task分配到更少的資料。基本可以緩解資料傾斜的問題。

比如說,原本某個task分配資料特別多,直接OOM,記憶體溢位了,程式沒法執行,直接掛掉。按照log,找到發生資料傾斜的shuffle操作,給它傳入一個並行度數字,這樣的話,原先那個task分配到的資料,肯定會變少。就至少可以避免OOM的情況,程式至少是可以跑的。

6.3.2、提升shuffle reduce並行度的缺陷

治標不治本的意思,因為它沒有從根本上改變資料傾斜的本質和問題。不像第一個和第二個方案(直接避免了資料傾斜的發生)。原理沒有改變,只是說,儘可能地去緩解和減輕shuffle reduce task的資料壓力,以及資料傾斜的問題。

實際生產環境中的經驗:

1、如果最理想的情況下,提升並行度以後,減輕了資料傾斜的問題,或者甚至可以讓資料傾斜的現象忽略不計,那麼就最好。就不用做其他的資料傾斜解決方案了。

2、不太理想的情況下,比如之前某個task執行特別慢,要5個小時,現在稍微快了一點,變成了4個小時。或者是原先執行到某個task,直接OOM,現在至少不會OOM了,但是那個task執行特別慢,要5個小時才能跑完。

那麼,如果出現第二種情況的話,各位,就立即放棄第三種方案,開始去嘗試和選擇後面的四種方案。

6.4、使用隨機key實現雙重聚合

6.4.1、使用場景

groupByKey、reduceByKey比較適合使用這種方式。join咱們通常不會這樣來做,後面會講三種針對不同的join造成的資料傾斜的問題的解決方案。

6.4.2、解決方案

第一輪聚合的時候,對key進行打散,將原先一樣的key,變成不一樣的key,相當於是將每個key分為多組。

先針對多個組,進行key的區域性聚合。接著,再去除掉每個key的字首,然後對所有的key進行全域性的聚合。

對groupByKey、reduceByKey造成的資料傾斜,有比較好的效果。

如果說,之前的第一、第二、第三種方案,都沒法解決資料傾斜的問題,那麼就只能依靠這一種方式了。

6.5、將reduce join轉換為map join

6.5.1、使用方式

普通的join,那麼肯定是要走shuffle。既然是走shuffle,那麼普通的join就肯定是走的是reduce join。那怎麼將reduce join 轉換為mapjoin呢?先將所有相同的key,對應的value匯聚到一個task中,然後再進行join。

6.5.2、使用場景

這種方式適合在什麼樣的情況下來使用?

如果兩個RDD要進行join,其中一個RDD是比較小的。比如一個RDD是100萬資料,一個RDD是1萬資料。(一個RDD是1億資料,一個RDD是100萬資料)。

其中一個RDD必須是比較小的,broadcast出去那個小RDD的資料以後,就會在每個executor的block manager中都儲存一份。要確保你的記憶體足夠存放那個小RDD中的資料。

這種方式下,根本不會發生shuffle操作,肯定也不會發生資料傾斜。從根本上杜絕了join操作可能導致的資料傾斜的問題。

對於join中有資料傾斜的情況,大家儘量第一時間先考慮這種方式,效果非常好。

不適合的情況

兩個RDD都比較大,那麼這個時候,你去將其中一個RDD做成broadcast,就很笨拙了。很可能導致記憶體不足。最終導致記憶體溢位,程式掛掉。

而且其中某些key(或者是某個key),還發生了資料傾斜。此時可以採用最後兩種方式。

對於join這種操作,不光是考慮資料傾斜的問題。即使是沒有資料傾斜問題,也完全可以優先考慮,用我們講的這種高階的reduce join轉map join的技術,不要用普通的join,去通過shuffle,進行資料的join。完全可以通過簡單的map,使用map join的方式,犧牲一點記憶體資源。在可行的情況下,優先這麼使用。

不走shuffle,直接走map,是不是效能也會高很多?這是肯定的。

6.6、sample取樣傾斜key單獨進行join

6.6.1、方案實現思路

將發生資料傾斜的key,單獨拉出來,放到一個RDD中去。就用這個原本會傾斜的key RDD跟其他RDD單獨去join一下,這個時候key對應的資料可能就會分散到多個task中去進行join操作。

就不至於說是,這個key跟之前其他的key混合在一個RDD中時,肯定是會導致一個key對應的所有資料都到一個task中去,就會導致資料傾斜。

6.6.2、使用場景

這種方案什麼時候適合使用?

優先對於join,肯定是希望能夠採用上一個方案,即reduce join轉換map join。兩個RDD資料都比較大,那麼就不要那麼搞了。

針對你的RDD的資料,你可以自己把它轉換成一箇中間表,或者是直接用countByKey()的方式,你可以看一下這個RDD各個key對應的資料量。此時如果你發現整個RDD就一個,或者少數幾個key對應的資料量特別多。儘量建議,比如就是一個key對應的資料量特別多。

此時可以採用這種方案,單拉出來那個最多的key,單獨進行join,儘可能地將key分散到各個task上去進行join操作。

什麼時候不適用呢?

如果一個RDD中,導致資料傾斜的key特別多。那麼此時,最好還是不要這樣了。還是使用我們最後一個方案,終極的join資料傾斜的解決方案。

就是說,咱們單拉出來了一個或者少數幾個可能會產生資料傾斜的key,然後還可以進行更加優化的一個操作。

對於那個key,從另外一個要join的表中,也過濾出來一份資料,比如可能就只有一條資料。userid2infoRDD,一個userid key,就對應一條資料。

然後呢,採取對那個只有一條資料的RDD,進行flatMap操作,打上100個隨機數,作為字首,返回100條資料。

單獨拉出來的可能產生資料傾斜的RDD,給每一條資料,都打上一個100以內的隨機數,作為字首。

再去進行join,是不是效能就更好了。肯定可以將資料進行打散,去進行join。join完以後,可以執行map操作,去將之前打上的隨機數給去掉,然後再和另外一個普通RDD join以後的結果進行union操作。

6.7、使用隨機數以及擴容表進行join

6.7.1、使用場景及步驟

當採用隨機數和擴容表進行join解決資料傾斜的時候,就代表著,你的之前的資料傾斜的解決方案,都沒法使用。

這個方案是沒辦法徹底解決資料傾斜的,更多的,是一種對資料傾斜的緩解。

步驟:

1、選擇一個RDD,要用flatMap,進行擴容,將每條資料,對映為多條資料,每個映射出來的資料,都帶了一個n以內的隨機數,通常來說會選擇10。

2、將另外一個RDD,做普通的map對映操作,每條資料都打上一個10以內的隨機數。

3、最後將兩個處理後的RDD進行join操作。

6.7.2、侷限性

1、因為你的兩個RDD都很大,所以你沒有辦法去將某一個RDD擴的特別大,一般咱們就是10倍。

2、如果就是10倍的話,那麼資料傾斜問題的確是只能說是緩解和減輕,不能說徹底解決。

sample取樣傾斜key並單獨進行join

將key,從另外一個RDD中過濾出的資料,可能只有一條或者幾條,此時,咱們可以任意進行擴容,擴成1000倍。

將從第一個RDD中拆分出來的那個傾斜key RDD,打上1000以內的一個隨機數。

這種情況下,還可以配合上,提升shuffle reduce並行度,join(rdd, 1000)。通常情況下,效果還是非常不錯的。

打散成100份,甚至1000份,2000份,去進行join,那麼就肯定沒有資料傾斜的問題了吧。

相關推薦

Spark資料傾斜解決方案

一.場景   1.絕大多數task執行得都非常快,但個別task執行極慢。比如,總共有100個task,97個task都在1s之內執行完了,但是剩餘的task卻要一兩分鐘。這種情況很常見。   2.原本能夠正常執行的Spark作業,某天突然報出OOM(記憶體溢位),觀察異常棧,是我們寫的業務程式碼造成的。

Spark資料傾斜完美解決

資料傾斜解決方案資料傾斜的解決,跟之前講解的效能調優,有一點異曲同工之妙。效能調優中最有效最直接最簡單的方式就是加資源加並行度,並注意RDD架構(複用同一個RDD,加上cache快取)。相對於前面,shuffle、jvm等是次要的。6.1、原理以及現象分析6.1.1、資料傾斜

spark資料傾斜分析與解決方案

Spark資料傾斜(資料分佈不均勻) 資料傾斜發生時的現象: 絕大多數task(任務)執行得都非常快,但個別task執行極慢。 OOM(記憶體溢位),這種情況比較少見。 資料傾斜發生的原理 資料傾斜的原理很簡單:在進行shuffle的時候,必須將各個節點上相同的k

Spark 執行時常見異常及資料傾斜解決方法

spark執行異常: 現象1: 有時會出現的一種情況非常普遍,在spark的作業中;shuffle file not found。(spark作業中,非常非常常見的)而且,有的時候,它是偶爾才會出現的一種情況。有的時候,出現這種情況以後,會重新去

Spark效能優化之道——解決Spark資料傾斜(Data Skew)的N種姿勢

摘要 本文結合例項詳細闡明瞭Spark資料傾斜的幾種場景以及對應的解決方案,包括避免資料來源傾斜,調整並行度,使用自定義Partitioner,使用Map側Join代替Reduce側Join,給傾斜Key加上隨機字首等。 為何要處理資料傾斜(Da

Spark效能調優之道——解決Spark資料傾斜(Data Skew)的N種姿勢

為何要處理資料傾斜(Data Skew) 什麼是資料傾斜 對Spark/Hadoop這樣的大資料系統來講,資料量大並不可怕,可怕的是資料傾斜。 何謂資料傾斜?資料傾斜指的是,並行處理的資料集中,某一部分(如Spark或Kafka的一個Partition)的資料顯著多於其它

spark2.2.0:記錄一次資料傾斜解決(擴容join)!

前言: 資料傾斜,一個在大資料處理中很常見的名詞,經由前人總結,現已有不少資料傾斜的解決方案(而且會發現大資料的不同框架的資料傾斜解決思想是一致的,只是實現方法不同),本文重點記錄這次遇到spark處理資料中的傾斜問題。 老話: 菜雞一隻,本人會對文中的結論負責,如果有說錯的,還請各位批評指出

Hive資料傾斜解決辦法

轉自:https://blog.csdn.net/xinzhi8/article/details/71455883 操作: 關鍵詞 情形      後果 Join 其中一個表較小,但是key集中

【轉】【SparkSpark 資料傾斜優化方法

大資料梅峰谷 2017-05-19 --------本節內容-------- 1.前言 2.Spark資料傾斜    2.1 資料傾斜現象      2.1.1 OOM錯誤      2.1.2 Spark執行緩慢 2.2 資料傾斜原理     2.3 資

Spark資料傾斜調優

調優概述 有的時候,我們可能會遇到大資料計算中一個最棘手的問題——資料傾斜,此時Spark作業的效能會比期望差很多。資料傾斜調優,就是使用各種技術方案解決不同型別的資料傾斜問題,以保證Spark作業的效能。 資料傾斜發生時的現象 絕大多數task執行得都非常

Hive資料傾斜問題解決方案

最近一段時間主要在用Hive,前幾天終於還是沒有逃過經典的資料傾斜問題,備受煎熬,最後終於成功解決,這裡記錄一下心得。 直接上乾貨:解決資料傾斜問題,最大的難點在於知道為什麼會傾斜!!! 一般會造成資料傾斜的情況無外乎在使用 group by 或者 distinct 或

Spark 資料傾斜 join 調優

前言 繼基礎篇講解了每個Spark開發人員都必須熟知的開發調優與資源調優之後,本文作為《Spark效能優化指南》的高階篇,將深入分析資料傾斜調優與shuffle調優,以解決更加棘手的效能問題。 資料傾斜調優 調優概述 有的時候,我們可能會

資料Spark(二)--- RDD,RDD變換,RDD的Action,解決spark資料傾斜問題,spark整合hadoop的HA

一、Spark叢集執行 ------------------------------------------------------- 1.local //本地模式 2.standalone //獨立模式 3.yarn //yarn模式

Spark專案實戰-資料傾斜解決方案之原理以及現象分析

一、資料傾斜的原理 在執行shuffle操作的時候,大家都知道是按照key來進行values的資料的輸出、拉取和聚合的。同一個key的values,一定是分配到一個reduce task進行處理的。假設多個key對應的values,總共是90萬。但是問題是可能某個key對應

Spark專案實戰-資料傾斜解決方案之將reduce join轉換為map join

一、reduce端join操作原理 二、map端join操作原理  三、適用場景 如果兩個RDD要進行join,其中一個RDD是比較小的。一個RDD是100萬資料,一個RDD是1萬資料。(一個RDD是1億資料,一個RDD是100萬資料) 其中一個RDD必須是比較

spark 大型專案實戰(五十八):資料傾斜解決方案之sample取樣傾斜key進行兩次join

當採用隨機數和擴容表進行join解決資料傾斜的時候,就代表著,你的之前的資料傾斜的解決方案,都沒法使用。 這個方案是沒辦法徹底解決資料傾斜的,更多的,是一種對資料傾斜的緩解。 原理,其實在上一講,已經帶出來了。 步驟: 1、選擇一個RDD,要用flatM

解決spark中遇到的資料傾斜問題

一. 資料傾斜的現象 多數task執行速度較快,少數task執行時間非常長,或者等待很長時間後提示你記憶體不足,執行失敗。 二. 資料傾斜的原因 常見於各種shuffle操作,例如reduceByKey,groupByKey,join等操作。 資

《深入理解Spark》之通過自定義分割槽器解決資料傾斜問題

package com.lyzx.day37 import org.apache.spark.{Partitioner, SparkConf, SparkContext} class D1 { //partitionBy和自定義分割槽器解決資料傾斜的問題 def

spark1.x-spark-sql-資料傾斜解決方案

聚合源資料 過濾導致傾斜的key where條件 提高shuffle並行度 spark.sql.shuffle.partitions sqlContext.setConf("spark.sql.shuffle.partitions","1000")

如何解決spark中的資料傾斜問題

發現數據傾斜的時候,不要急於提高executor的資源,修改引數或是修改程式,首先要檢查資料本身,是否存在異常資料。 1、資料問題造成的資料傾斜 找出異常的key 如果任務長時間卡在最後最後1個(幾個)任務,首先要對key進行抽樣分析,判斷是哪些