1. 程式人生 > >Spark調優之Shuffle調優

Spark調優之Shuffle調優

原理概述:
什麼樣的情況下,會發生shuffle?
在spark中,主要是以下幾個運算元:groupByKey、reduceByKey、countByKey、join(分情況,先groupByKey後再join是不會發生shuffle的),等等。
什麼是shuffle?
groupByKey,要把分佈在叢集各個節點上的資料中的同一個key,對應的values,都要集中到一塊兒,集中到叢集中同一個節點上,更嚴密一點說,就是集中到一個節點的一個executor的一個task中。
然後呢,集中一個key對應的values之後,才能交給我們來進行處理,<key, Iterable>。reduceByKey,運算元函式去對values集合進行reduce操作,最後變成一個value。countByKey需要在一個task中,獲取到一個key對應的所有的value,然後進行計數,統計一共有多少個value。join,RDD<key, value>,RDD<key, value>,只要是兩個RDD中,key相同對應的2個value,都能到一個節點的executor的task中,給我們進行處理。
shuffle,一定是分為兩個stage來完成的。因為這其實是個逆向的過程,不是stage決定shuffle,是shuffle決定stage。
reduceByKey(+

),在某個action觸發job的時候,DAGScheduler,會負責劃分job為多個stage。劃分的依據,就是,如果發現有會觸發shuffle操作的運算元,比如reduceByKey,就將這個操作的前半部分,以及之前所有的RDD和transformation操作,劃分為一個stage。shuffle操作的後半部分,以及後面的,直到action為止的RDD和transformation操作,劃分為另外一個stage。

3.1、合併map端輸出檔案

3.1.1、如果不合並map端輸出檔案的話,會怎麼樣?
舉例實際生產環境的條件:
100個節點(每個節點一個executor):100個executor
每個executor:2個cpu core
總共1000個task:每個executor平均10個task
每個節點,10個task,每個節點會輸出多少份map端檔案?10 * 1000=1萬個檔案
總共有多少份map端輸出檔案?100 * 10000 = 100萬。
第一個stage,每個task,都會給第二個stage的每個task建立一份map端的輸出檔案
第二個stage,每個task,會到各個節點上面去,拉取第一個stage每個task輸出的,屬於自己的那一份檔案。
shuffle中的寫磁碟的操作,基本上就是shuffle中效能消耗最為嚴重的部分。
通過上面的分析,一個普通的生產環境的spark job的一個shuffle環節,會寫入磁碟100萬個檔案。
磁碟IO對效能和spark作業執行速度的影響,是極其驚人和嚇人的。
基本上,spark作業的效能,都消耗在shuffle中了,雖然不只是shuffle的map端輸出檔案這一個部分,但是這裡也是非常大的一個性能消耗點。

3.1.2、開啟shuffle map端輸出檔案合併的機制
new SparkConf().set(“spark.shuffle.consolidateFiles”, “true”)
預設情況下,是不開啟的,就是會發生如上所述的大量map端輸出檔案的操作,嚴重影響效能。

3.1.3、合併map端輸出檔案,對咱們的spark的效能有哪些方面的影響呢?

1、map task寫入磁碟檔案的IO,減少:100萬檔案 -> 20萬檔案
2、第二個stage,原本要拉取第一個stage的task數量份檔案,1000個task,第二個stage的每個task,都要拉取1000份檔案,走網路傳輸。合併以後,100個節點,每個節點2個cpu core,第二個stage的每個task,主要拉取100 * 2 = 200個檔案即可。此時網路傳輸的效能消耗也大大減少。
分享一下,實際在生產環境中,使用了spark.shuffle.consolidateFiles機制以後,實際的效能調優的效果:對於上述的這種生產環境的配置,效能的提升,還是相當的可觀的。spark作業,5個小時 -> 2~3個小時。
大家不要小看這個map端輸出檔案合併機制。實際上,在資料量比較大,你自己本身做了前面的效能調優,executor上去->cpu core上去->並行度(task數量)上去,shuffle沒調優,shuffle就很糟糕了。大量的map端輸出檔案的產生,對效能有比較惡劣的影響。這個時候,去開啟這個機制,可以很有效的提升效能。

3.2、調節map端記憶體緩衝與reduce端記憶體佔比

3.2.1、預設情況下可能出現的問題

預設情況下,shuffle的map task,輸出到磁碟檔案的時候,統一都會先寫入每個task自己關聯的一個記憶體緩衝區。
這個緩衝區大小,預設是32kb。
每一次,當記憶體緩衝區滿溢之後,才會進行spill溢寫操作,溢寫到磁碟檔案中去。
reduce端task,在拉取到資料之後,會用hashmap的資料格式,來對各個key對應的values進行匯聚。
針對每個key對應的values,執行我們自定義的聚合函式的程式碼,比如_ + _(把所有values累加起來)。
reduce task,在進行匯聚、聚合等操作的時候,實際上,使用的就是自己對應的executor的記憶體,executor(jvm程序,堆),預設executor記憶體中劃分給reduce task進行聚合的比例是0.2。
問題來了,因為比例是0.2,所以,理論上,很有可能會出現,拉取過來的資料很多,那麼在記憶體中,放不下。這個時候,預設的行為就是將在記憶體放不下的資料都spill(溢寫)到磁碟檔案中去。
在資料量比較大的情況下,可能頻繁地發生reduce端的磁碟檔案的讀寫。

3.2.2、調優方式

調節map task記憶體緩衝:spark.shuffle.file.buffer,預設32k(spark 1.3.x不是這個引數,後面還有一個字尾,kb。spark 1.5.x以後,變了,就是現在這個引數)
調節reduce端聚合記憶體佔比:spark.shuffle.memoryFraction,0.2

3.2.3、在實際生產環境中,我們在什麼時候來調節兩個引數?

看Spark UI,如果你的公司是決定採用standalone模式,那麼狠簡單,你的spark跑起來,會顯示一個Spark UI的地址,4040的埠。進去觀察每個stage的詳情,有哪些executor,有哪些task,每個task的shuffle write和shuffle read的量,shuffle的磁碟和記憶體讀寫的資料量。如果是用的yarn模式來提交,從yarn的介面進去,點選對應的application,進入Spark UI,檢視詳情。
如果發現shuffle 磁碟的write和read,很大。這個時候,就意味著最好調節一些shuffle的引數。首先當然是考慮開啟map端輸出檔案合併機制。其次調節上面說的那兩個引數。調節的時候的原則:spark.shuffle.file.buffer每次擴大一倍,然後看看效果,64,128。spark.shuffle.memoryFraction,每次提高0.1,看看效果。不能調節的太大,太大了以後過猶不及,因為記憶體資源是有限的,你這裡調節的太大了,其他環節的記憶體使用就會有問題了。

3.2.4、調節以後的效果

map task記憶體緩衝變大了,減少spill到磁碟檔案的次數。reduce端聚合記憶體變大了,減少spill到磁碟的次數,而且減少了後面聚合讀取磁碟檔案的數量。

3.3、HashShuffleManager與SortShuffleManager

3.3.1、shuffle調優概述

大多數Spark作業的效能主要就是消耗在了shuffle環 節,因為該環節包含了大量的磁碟IO、序列化、網路資料傳輸等操作。因此,如果要讓作業的效能更上一層樓,就有必要對shuffle過程進行調優。但是也 必須提醒大家的是,影響一個Spark作業效能的因素,主要還是程式碼開發、資源引數以及資料傾斜,shuffle調優只能在整個Spark的效能調優中佔 到一小部分而已。因此大家務必把握住調優的基本原則,千萬不要捨本逐末。下面我們就給大家詳細講解shuffle的原理,以及相關引數的說明,同時給出各個引數的調優建議。

3.3.2、ShuffleManager發展概述

在Spark的原始碼中,負責shuffle過程的執行、計算和處理的元件主要就是ShuffleManager,也即shuffle管理器。
在Spark 1.2以前,預設的shuffle計算引擎是HashShuffleManager。該ShuffleManager而HashShuffleManager有著一個非常嚴重的弊端,就是會產生大量的中間磁碟檔案,進而由大量的磁碟IO操作影響了效能。
因此在Spark 1.2以後的版本中,預設的ShuffleManager改成了SortShuffleManager。SortShuffleManager相較於 HashShuffleManager來說,有了一定的改進。主要就在於,每個Task在進行shuffle操作時,雖然也會產生較多的臨時磁碟檔案,但是最後會將所有的臨時檔案合併(merge)成一個磁碟檔案,因此每個Task就只有一個磁碟檔案。在下一個stage的shuffle read task拉取自己的資料時,只要根據索引讀取每個磁碟檔案中的部分資料即可。
在spark 1.5.x以後,對於shuffle manager又出來了一種新的manager,tungsten-sort(鎢絲),鎢絲sort shuffle manager。官網上一般說,鎢絲sort shuffle manager,效果跟sort shuffle manager是差不多的。
但是,唯一的不同之處在於,鎢絲manager,是使用了自己實現的一套記憶體管理機制,效能上有很大的提升, 而且可以避免shuffle過程中產生的大量的OOM,GC,等等記憶體相關的異常。

3.3.3、hash、sort、tungsten-sort。如何來選擇?

1、需不需要資料預設就讓spark給你進行排序?就好像mapreduce,預設就是有按照key的排序。如果不需要的話,其實還是建議搭建就使用最基本的HashShuffleManager,因為最開始就是考慮的是不排序,換取高效能。
2、什麼時候需要用sort shuffle manager?如果你需要你的那些資料按key排序了,那麼就選擇這種吧,而且要注意,reduce task的數量應該是超過200的,這樣sort、merge(多個檔案合併成一個)的機制,才能生效把。但是這裡要注意,你一定要自己考量一下,有沒有必要在shuffle的過程中,就做這個事情,畢竟對效能是有影響的。
3、如果你不需要排序,而且你希望你的每個task輸出的檔案最終是會合併成一份的,你自己認為可以減少效能開銷。可以去調節bypassMergeThreshold這個閾值,比如你的reduce task數量是500,預設閾值是200,所以預設還是會進行sort和直接merge的。可以將閾值調節成550,不會進行sort,按照hash的做法,每個reduce task建立一份輸出檔案,最後合併成一份檔案。(一定要提醒大家,這個引數,其實我們通常不會在生產環境裡去使用,也沒有經過驗證說,這樣的方式,到底有多少效能的提升)
4、如果你想選用sort based shuffle manager,而且你們公司的spark版本比較高,是1.5.x版本的,那麼可以考慮去嘗試使用tungsten-sort shuffle manager。看看效能的提升與穩定性怎麼樣。
總結:
1、在生產環境中,不建議大家貿然使用第三點和第四點:
2、如果你不想要你的資料在shuffle時排序,那麼就自己設定一下,用hash shuffle manager。
3、如果你的確是需要你的資料在shuffle時進行排序的,那麼就預設不用動,預設就是sort shuffle manager。或者是什麼?如果你壓根兒不care是否排序這個事兒,那麼就預設讓他就是sort的。調節一些其他的引數(consolidation機制)。(80%,都是用這種)
spark.shuffle.manager:hash、sort、tungsten-sort
spark.shuffle.sort.bypassMergeThreshold:200。自己可以設定一個閾值,預設是200,當reduce task數量少於等於200,map task建立的輸出檔案小於等於200的,最後會將所有的輸出檔案合併為一份檔案。這樣做的好處,就是避免了sort排序,節省了效能開銷,而且還能將多個reduce task的檔案合併成一份檔案,節省了reduce task拉取資料的時候的磁碟IO的開銷。

總結: 因為在整個Job的執行過程中Shuffle過程佔用了大量的計算資源和記憶體,所以說Shuffle調優在整個job執行過程中也是很關鍵的 ! 好了,今天就分享到這裡,接下來幾篇還會給大家一起分享運算元調優,資料傾斜的解決方案等Spark調優過程,大家一起努力,共同進步 ! !