1. 程式人生 > >大資料之效能調優方面(資料傾斜、shuffle、JVM等方面)

大資料之效能調優方面(資料傾斜、shuffle、JVM等方面)

一、對於資料傾斜的發生一般都是一個key對應的資料過大,而導致Task執行過慢,或者記憶體溢位(OOM),一般是發生在shuffle的時候,比如reduceByKey,groupByKey,sortByKey等,容易產生資料傾斜。

那麼針對資料傾斜我們如何解決呢?我們可以首先觀看log日誌,以為log日誌報錯的時候會提示在哪些行,然後就去檢查發生shuffle的地方,這些地方比較容易發生資料傾斜。

其次,因為我們都是測試的,所以都是在client端進行的,也可以觀察WebUI,上面也會有所有對應的stage的劃分等。

解決方案:

①聚合源資料,我們的資料一般來自Hive表,那麼在生成Hive表的時候對資料進行聚合,按照key進行分組,將key所對應的value以另一種方式儲存,比如拼接成一個字串這樣的,我們就可以省略groupByKey和reduceByKey的操作,那麼我們就避免了shuffle的產生,如果不能完美的拼接成字串,那我們也至少可以減少資料量,提高一點效能

②過濾key操作,這種方式就有點粗暴了,如果你老大允許的話,這也是一種不錯的方案。

③提高並行度,我們可以通過提高shuffle的reduce的並行度來提高reduce端的task執行數量,從而分擔資料壓力,但是如果出現之前執行時OOM了,加大了reduce端的task的數量,可以運行了,但是執行時間一長就要放棄這種方案。

④雙重聚合,用於groupByKey和reduceByKey,比較使用join(hive中join優化也有類似的雙重聚合操作設定引數hive.mao.aggr=true和hive.groupby.skewindata=true具體過程這裡不做介紹)先加入隨機數進行分組,然後字首去掉在進行分組

⑤將reduce的join轉換成map的join,如果兩個RDD進行join,有一個表比較小的話,可以將小表進行broadcast,這樣每個節點都會用一個小表,如果兩個表都很大,可以先將兩個表按相同的方式進行分割槽操作,最後合併。雖然map join替換了reduce join ,但這是我們消耗了部分記憶體換來的,所以我們需要考慮OOM現象。

⑥sample抽樣分解聚合,也就是說將傾斜的key單獨拉出來,然後用一個RDD進行打亂join

⑦隨機數+擴容表,也就是說通過flatMap進行擴容,然後將隨機數打入進去,再進行join,但是這樣不能從根本上解決資料傾斜,只能緩解這種現象。

⑧如果你所在公司很有錢,可以直接加機器,硬體足夠高,這些也就不是問題了。

 

------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

shuffle方面的效能調優:

設定map輸出檔案的合併引數  .set(“spark.shuffle.consolidation”,"true"),預設是不開啟的,設定為true,則無法併發執行。

設定map端的記憶體緩衝區大小,和reduce端記憶體的大小,這個主要針對檔案過大,導致效能低,但是調優效果不是很明顯。  

  map端引數:  .set("spark.shuffle.file.buffer","64k")

  reduce端引數  .set("spark.shuffle.memoryFraction","0.3")

這兩個引數根據我們觀察日誌的讀寫檔案的多少來調節的,適量調節,如果是stand-alone模式觀察4040頁面,如果是yarn模式直接進入yarn Ulog日誌檢視。

shuffle中有以下幾種shuffle,具體情況根據你的業務要求來取捨。hashshuffle+consolidation、sortshuffle、鎢絲shuffle(tungsten-sort他裡面有自己的記憶體機制,可以有效的防止記憶體溢位現象)

mappartitions的使用必須要慎重!因為mappartition是取出一個分割槽的資料進行操作,那麼如果資料過大我們就有可能造成OOM溢位現象的發生,所以mappartitions的使用應該只能在資料量小的情況下。

預設shuffle的記憶體佔用為20%,持久化佔用60%,根據具體業務調整。

----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

JVM方面效能調優:

記憶體不足的時候會導致minor gc的頻繁,導致spark停止工作。

頻繁進行gc的時候,可能有些年輕代裡面的物件被回收,但是因為記憶體效能不足的問題,導致傳入了老年代,而如果老年代裡面記憶體溢滿,就會進行full gc操作,也就是全域性清理機制,這個過程時間會很長,十幾秒中至幾十分鐘,這樣就導致了spark的效能變低。

調優:

降低cache操作記憶體的佔比,大不了用persist操作,將一部分資料寫入磁碟或者是進行序列化操作,減少RDD快取的記憶體佔用,降低cache操作的記憶體佔用,那麼運算元函式的記憶體佔比就上去了,可以減少頻繁的gc操作,簡單來說就是讓task執行運算元函式的時候擁有更多可用的記憶體。

spark.storage.memoryFraction=0.6  cache的預設佔用記憶體是60%。上述已說明

executor對外記憶體的設定,如果我們發現shuffle output  file  not  found的錯誤,那麼我們就需要調節一下對外記憶體了

(寫在spark-submit引數中)--conf  spark.yarn.executor.memoryOverhead=2048(基於yarn模式)

這個引數不是在sparkContext中調節的,而是在spark-submit中指定的引數,可以防止OOM溢位現象。

偶爾也有可能出現連線等待超時的現象,因為executor跨接點拉取資料的時候,可能另一個executor正在進行JVM垃圾回收(所有執行緒停止),導致連線報錯,not found,這時候我們就需要調節連線引數,要在spark-submit中指定spark.core.connection.ack.wait.timeout=300設定等待時間稍微長一點即可。