1. 程式人生 > >【Spark深入學習 -14】Spark應用經驗與程序調優

【Spark深入學習 -14】Spark應用經驗與程序調優

aps 它的 stack 申請 vco 用戶 統一 persist 資料

----本節內容-------

1.遺留問題解答

2.Spark調優初體驗

2.1 利用WebUI分析程序瓶頸

2.2 設置合適的資源

2.3 調整任務的並發度

2.4 修改存儲格式

3.Spark調優經驗

3.1 Spark原理及調優工具

3.2 運行環境優化

3.2.1 防止不必要的分發

3.2.2 提高數據本地性

3.2.3 存儲格式選擇

3.2.4 選擇高配機器

3.3 優化操作符

3.3.1 過濾操作導致多小任務

3.3.2 降低單條記錄開銷

3.3.3 處理數據傾斜或者任務傾斜

3.3.4 復用RDD進行緩存

3.3.5 慎用耗資源操作符

3.3.6 作業並行化

3.4 作業參數調優

3.4.1 設置合適的資源量

3.4.2 設置合理的JVM

3.4.3 啟用更高效的序列化方法

3.4.4 增大off head內存

3.4.5 shuffle參數調優

3.4.6 設置reduce task數目

3.4.7 使用spark sql實現

4.Spark 調優案例

5.參考資料

---------------------

技術分享

1.遺留問題

1)BAT這樣的企業內部是如何開發和運行Spark程序的?

· 開發和測試

intellij進行開發,對scala支持的非常好,java也支持的很好,開發好了在local模式下運行進行測試,intellij是不能遠程提交到集群的,itellij沒有分發jar包功能。

· spark生產環境運行

將所有依賴的包打成assembly.jar包,這樣不再依賴環境裏的任何模式,一般用yarn cluster模式運行(driver可以容錯),可以用yarn client模式進行測試。每天要跑,可以使用工作流調度器,按照你的設置定時定點跑程序,如果出錯了可以報警,工作流調度引擎,可以處理復雜的依賴,用的比較多的開源

airflow:輕量級,很多公司用

oozie:比較復雜

2)編譯spark程序非常慢

修改mavn pom倉庫,將國外的倉庫,改成國內的倉庫,如阿裏雲

3)Spark參數的設置問題

有三種設置方式, 在程序內設置(優先級高),提交參數設置(優先級中),配置文件配置(優先級低),park-default:常用的可以在配置文件中配

2.Spark調優初體驗

調優程序,首先得知道程序慢在哪裏,要定位問題,找到優化的點。定位問題又涉及到很多層面,機器硬件、網絡、操作系統、JVM虛擬機、大數據軟件平臺層、軟件開發層,所以調優是一個綜合工程,Spark程序調優可以分為以下幾個層面:

1)基礎設施層

機器硬件(如磁盤的選擇,SATA盤還是SAS盤,磁盤RAID方式等)、網絡(千兆網卡還是萬兆網卡,網絡峰值期間的帶寬、吞吐、網絡延遲、網絡抖動,很多時候網絡問題導致各種莫名問題,舉個真實的例子,公司網線被老鼠咬了,導致網絡時而可以,時而不行,鬼知道是什麽問題,讓人抓狂)、操作系統(操作系統的穩定性,內核版本的選擇,非常重要,還有一些配置策略得和hadoop生態吻合)。這些都非常底層了,就網絡、linux操作系統就夠花時間學習和了解的了,很多時候需要系統集成部的同事一起配合。

2)大數據平臺層:HDFS、YARN、Hive、Spark、JVM等

JVM的調優,還沒有什麽經驗,而且目前也沒有非常深入,最多也就是GC策略挑挑,大數據平臺調優真是個技術活,需要很多經驗。

3)程序開發層

開發工程師程序開發技巧、對業務的理解等

2.1利用WebUI分析Spark程序瓶頸

選擇合適的資源,前提是了解集群有多少資源 。1)集群內存總量:單臺節點共8G,2G給datanode,6G個spark做計算(yarn共有18G,3個節點);2)集群CPU總合數: 每個機器有8個CPU,共24個core;3)集群總存儲:總存儲空間有多少,一般保證20%左右的剩余空間,要不然會影響集群整體性能,HDFS剩余空間不足也會導致任務執行很慢,甚至失敗。

如何觀察和評估程序的效率怎麽執行shell,跑spark任務就不講了,說明一下如何觀察Spark任務執行的瓶頸,從哪幾個指標觀察任務執行的效率。

指標一:觀察Spark任務解析和提交消耗的時間

主要涉及到的是jar包上傳的優化,這裏面分為2種情況:

1)程序依賴的jar,這種通常是spark lib目錄下的所有jar包,有好幾百兆,spark程序會上傳這些,為了提升效率,可以提前上傳好,

2)程序自身的jar包,如果程序不經常變動,也可以提前上傳到HDFS上。

指標二:觀察WebUI產生的系統監控數據

這裏面有很多指標,羅列如下

1)觀察job監控參數,產生了多少個job,一個action對應一個job,如果action之間沒有依賴關系,資源有富余,可以讓job並行執行。

2)觀察stage監控參數,一個job分解成了結果stage,每個stage執行了多少時間,輸入了多少數據量,shuffle read了多少數據,shuffle write 了多少數據。

3)觀察executor監控參數,driver在哪裏,executor在哪裏,每個executor啟動了幾個CPU核數,起了多少內存,輸入多少數據(可以查看數據是否有傾斜),shuffle了多少數據,內存放了多少數據,GC執行了多少時間。還可以查看stderr,查看日誌。

4)觀察每個task監控參數,task執行的時間,GC執行的時間,讀入的數據總大小和記錄數,shuffle的大小,task執行時的本地行,舉個執行的任務例子,如下所示

技術分享

可以看出來,一共有2個stage,1個stage包含8個task,一個包含2個task,先跑8個的,再跑2個的,一個14秒,一個0.1秒。再看看executor,發現只有2個executor,一個executor只有1個core,也就是一個executor只能處理一個task,集群也就是最多跑2個,10個task要跑5輪

技術分享

再點擊stage的鏈接進去,觀察每個task跑多長時間

技術分享

好了,按照前面羅列的監控指標點,可以看出如下監控參數:

集群資源:內存3個節點18G,VCORE:3個節點24VCORE

1)Job監控參數:只有一個Job

2)Stage監控參數:有2個Stage

· stage1:8個task,這個先跑,執行了14秒

· stage2:2個task,這個後跑,執行了0.1秒,是collect方法,數據匯聚到driver

3)executor:2個executor,1個executor 1G內存

4)task:本地性node_local,GC毫秒級,shuffle也是不足Kb

分析消耗的資源:

1)從stage的執行時間分析,stage1執行時間長,可以考慮優化stage1

2)executor分析:只有2個executor,內存也只有1G,啟動的都是默認參數,和集群資源相比,有很作資源沒有利用起來,有點浪費,可以調整executor個數,增大並發度,或者如果數據量大,也有必要,增大內存。

3)task:本地行還可以,node級別,gc也可以,shuffle也少

總結可能優化的點:

優化stage1,Job中的stage1有8個task,2個executor,需要跑4輪才能跑完第一輪的所有task,調整為8個executor(原來同時跑2個task,現在可以同時跑8個task了),那麽只需要一輪就而已跑完所有的task。內存1G也夠用了,因為輸入數據就不足1G,如果輸入數據很大的話,也可以增大內存。

2.2 設置合適的資源

設置合適的資源,要明確集群的資源總量,然後觀察監控指標,檢查是否充分使用了集群中的資源

資源量關聯度比較大的一個是內存一個是CPU的使用率,當然還有其他,但這兩個指標是比較重點的,這兩個指標對應spark程序主要是Executor的內存和core。所以計算資源的設置單位是Executor

增加Executor個數:--num-executors 4

增加每個Executor同時雲心的task數目:--executor-cores 2

除了Executor消耗資源,還有driver和shuffle等也都是消耗資源大戶,我把幾個常用的和資源使用相關的參數含義及參考值總結如下:

num-executors

參數說明:該參數用於設置Spark作業總共要用多少個Executor進程來執行。Driver在向YARN集群管理器申請資源時,YARN集群管理器會盡可能按照你的設置來在集群的各個工作節點上,啟動相應數量的Executor進程。這個參數非常之重要,如果不設置的話,默認只會給你啟動少量的Executor進程,此時你的Spark作業的運行速度是非常慢的。

參數調優建議:每個Spark作業的運行一般設置50~100個左右的Executor進程比較合適,設置太少或太多的Executor進程都不好。設置的太少,無法充分利用集群資源;設置的太多的話,大部分隊列可能無法給予充分的資源。

executor-memory

參數說明:該參數用於設置每個Executor進程的內存。Executor內存的大小,很多時候直接決定了Spark作業的性能,而且跟常見的JVM OOM異常,也有直接的關聯。

參數調優建議:每個Executor進程的內存設置4G~8G較為合適。但是這只是一個參考值,具體的設置還是得根據不同部門的資源隊列來定。可以看看自己團隊的資源隊列的最大內存限制是多少,num-executors乘以executor-memory,就代表了你的Spark作業申請到的總內存量(也就是所有Executor進程的內存總和),這個量是不能超過隊列的最大內存量的。此外,如果你是跟團隊裏其他人共享這個資源隊列,那麽申請的總內存量最好不要超過資源隊列最大總內存的1/3~1/2,避免你自己的Spark作業占用了隊列所有的資源,導致別的同學的作業無法運行。

executor-cores

參數說明:該參數用於設置每個Executor進程的CPU core數量。這個參數決定了每個Executor進程並行執行task線程的能力。因為每個CPU core同一時間只能執行一個task線程,因此每個Executor進程的CPU core數量越多,越能夠快速地執行完分配給自己的所有task線程。

參數調優建議:Executor的CPU core數量設置為2~4個較為合適。同樣得根據不同部門的資源隊列來定,可以看看自己的資源隊列的最大CPU core限制是多少,再依據設置的Executor數量,來決定每個Executor進程可以分配到幾個CPU core。同樣建議,如果是跟他人共享這個隊列,那麽num-executors * executor-cores不要超過隊列總CPU core的1/3~1/2左右比較合適,也是避免影響其他同學的作業運行。

driver-memory

參數說明:該參數用於設置Driver進程的內存。

參數調優建議:Driver的內存通常來說不設置,或者設置1G左右應該就夠了。唯一需要註意的一點是,如果需要使用collect算子將RDD的數據全部拉取到Driver上進行處理,那麽必須確保Driver的內存足夠大,否則會出現OOM內存溢出的問題。

spark.default.parallelism

參數說明:該參數用於設置每個stage的默認task數量。這個參數極為重要,如果不設置可能會直接影響你的Spark作業性能。

參數調優建議:Spark作業的默認task數量為500~1000個較為合適。很多同學常犯的一個錯誤就是不去設置這個參數,那麽此時就會導致Spark自己根據底層HDFS的block數量來設置task的數量,默認是一個HDFS block對應一個task。通常來說,Spark默認設置的數量是偏少的(比如就幾十個task),如果task數量偏少的話,就會導致你前面設置好的Executor的參數都前功盡棄。試想一下,無論你的Executor進程有多少個,內存和CPU有多大,但是task只有1個或者10個,那麽90%的Executor進程可能根本就沒有task執行,也就是白白浪費了資源!因此Spark官網建議的設置原則是,設置該參數為num-executors * executor-cores的2~3倍較為合適,比如Executor的總CPU core數量為300個,那麽設置1000個task是可以的,此時可以充分地利用Spark集群的資源。

spark.storage.memoryFraction

參數說明:該參數用於設置RDD持久化數據在Executor內存中能占的比例,默認是0.6。也就是說,默認Executor 60%的內存,可以用來保存持久化的RDD數據。根據你選擇的不同的持久化策略,如果內存不夠時,可能數據就不會持久化,或者數據會寫入磁盤。

參數調優建議:如果Spark作業中,有較多的RDD持久化操作,該參數的值可以適當提高一些,保證持久化的數據能夠容納在內存中。避免內存不夠緩存所有的數據,導致數據只能寫入磁盤中,降低了性能。但是如果Spark作業中的shuffle類操作比較多,而持久化操作比較少,那麽這個參數的值適當降低一些比較合適。此外,如果發現作業由於頻繁的gc導致運行緩慢(通過spark web ui可以觀察到作業的gc耗時),意味著task執行用戶代碼的內存不夠用,那麽同樣建議調低這個參數的值。

spark.shuffle.memoryFraction

參數說明:該參數用於設置shuffle過程中一個task拉取到上個stage的task的輸出後,進行聚合操作時能夠使用的Executor內存的比例,默認是0.2。也就是說,Executor默認只有20%的內存用來進行該操作。shuffle操作在進行聚合時,如果發現使用的內存超出了這個20%的限制,那麽多余的數據就會溢寫到磁盤文件中去,此時就會極大地降低性能。

參數調優建議:如果Spark作業中的RDD持久化操作較少,shuffle操作較多時,建議降低持久化操作的內存占比,提高shuffle操作的內存占比比例,避免shuffle過程中數據過多時內存不夠用,必須溢寫到磁盤上,降低了性能。此外,如果發現作業由於頻繁的gc導致運行緩慢,意味著task執行用戶代碼的內存不夠用,那麽同樣建議調低這個參數的值。

資源參數的調優,沒有一個固定的值,需要同學們根據自己的實際情況(包括Spark作業中的shuffle操作數量、RDD持久化操作數量以及spark web ui中顯示的作業gc情況),同時參考本篇文章中給出的原理以及調優建議,合理地設置上述參數。

參數設置demo

/bin/spark-submit \

--master yarn-cluster \

--num-executors 100 \

--executor-memory 6G \

--executor-cores 4 \

--driver-memory 1G \

--conf spark.default.parallelism=1000 \

--conf spark.storage.memoryFraction=0.5 \

--conf spark.shuffle.memoryFraction=0.3 \

優化後的,一共執行了11秒

技術分享

每個task執行時間

技術分享

再觀察每個task花費時間,觀察發現,每個task執行的時間比原來的4秒還長,原因是在資源不變的情況,任務數多了只有2個CPU,CPU變成瓶頸了,要頻繁切換如果CPU很多,優化效果會非常好,CPU是瓶頸

技術分享

2.3 設置合適的並發度

任務的並發度有幾個級別:job級別的並發,stage級別的並發,task級別的並發。這裏談的並發優化是task級別的,主要就是map任務並行度和reduce任務的並行度,這方面的調優其實可以參考MapReduce的調優,原理都是一樣一樣的。Spark的任務數需要註意幾點:

1)Map個數默認是和輸入文件的blok數是一樣的,如hdfs則和 blokc數目一致,hbase則和regio個數一致。

2)rdd之間的map個數如果不修改,後面的和前面個數一樣

3)reduce默認個數也是和map個數一樣

map設置方法:

單個設置: sc.textFile("/input/data",100); //指定100個blokc,那麽就100個map

批量設置:將每個map處理數量調大,map數就少了,默認128M

技術分享

2.4 修改存儲格式

很多人並不明白為什麽文件存儲格式會影響文件的讀取效率,我打個最簡單的比方。我們知道linux 系統是單機版的操作系統,裏面有ext3,ext4這樣的文件,ext的職責就是對linux系統文件進行管理,HDFS是分布式的文件系統,也是對文件進行管理。好的文件系統就像一個勤快的媳婦,在你房子面積不變的情況下,勤快的媳婦會將物品放的井井有條,利用到房子裏面的每個空間,你要拿什麽東西,都能很快找到;而不好的文件系統就像是一個懶媳婦,房間裏面堆滿了東西,找起來很麻煩,房間利用率也非常糟糕,找東西困難,放東西進去也很難,東西越多越臟越亂。(媳婦沒有好壞之分,只有適合不適合,還有看你怎麽和媳婦相處了,互相了不了解,性格和脾氣對不對路,文件系統也是如此)

文件存儲格式和文件系統是一樣的原理,文件系統管理的是文件,而文件儲存格式管理的是文件內容(管理的是文件中每一行每一列的具體內容)。所以低效率的文件存儲格式就像是一個賴媳婦,家裏被管的一塌糊塗,東西越多越臟亂差,高效率的文件存儲格式就是勤快且聰明的媳婦,一切都管的井然有序,取東西方便,放東西也容易,還會根據不同的物品特征進行擺放,完美,6666!!!

csv,txt,json等等都是懶媳婦,parquet,orc都是勤快媳婦,那為什麽文本文件是懶媳婦,parquet是好媳婦,主要有以下幾個原因:

文本文件為什麽不好?

文本文件行存儲,存儲占用空間,而且讀取數據的時候會讀出很多不必要的數據出來,這就好像你叫懶媳婦給你拿一頂帽子,結果她把衣服,鞋子,襪子統統拿出來,然後再從裏面挑出你要的帽子。

parquet為什麽好,Spark使用parquet文件存儲格式意義在哪裏?

1) 如果說HDFS 是大數據時代分布式文件系統首選標準,那麽parquet則是整個大數據時代文件存儲格式實時首選標準

2) 速度更快:從使用spark sql操作普通文件CSV和parquet文件速度對比上看,絕大多數情況

會比使用csv等普通文件速度提升10倍左右,在一些普通文件系統無法在spark上成功運行的情況

下,使用parquet很多時候可以成功運行

3) parquet的壓縮技術非常穩定出色,在spark sql中對壓縮技術的處理可能無法正常的完成工作

(例如會導致lost task,lost executor)但是此時如果使用parquet就可以正常的完成

4) 極大的減少磁盤I/o,通常情況下能夠減少75%的存儲空間,由此可以極大的減少spark sql處理

數據的時候的數據輸入內容,尤其是在spark1.6x中有個下推過濾器在一些情況下可以極大的

減少磁盤的IO和內存的占用,(下推過濾器)

5) spark 1.6x parquet方式極大的提升了掃描的吞吐量,極大提高了數據的查找速度spark1.6和spark1.5x相比而言,提升了大約1倍的速度,在spark1.6X中,操作parquet時候cpu也進行了極大的優化,有效的降低了cpu

6) 采用parquet可以極大的優化spark的調度和執行。我們測試spark如果用parquet可以有效的減少stage的執行消耗,同時可以優化執行路徑

3.Spark調優經驗

3.1 Spark原理及調優工具

· Spark Web UI界面

· jstack、jstat、jprofile

· history server:當Spark應用退出後,仍可以獲得歷史Spark應用的stages和tasks執行信息,便於分析程序不明原因掛掉的情況,Spark的history server依賴mr的history server。

3.2 運行環境優化

3.2.1 防止不必要的分發

每個Application都會上傳一個spark-assembly-x.x.x-SNAPSHOT-hadoopx.x.x-cdhx.x.x.jar的jar包,影響HDFS的性能以及占用HDFS的空間.對於用戶的jar包,有時候體積也非常龐大,我們同樣的方式上傳hdfs上,然後直接使用。

1. 依賴ja包重復上傳

技術分享

執行spark任務有大量jar包上傳HDFS,將系統jar包上傳到hdfs上,直接使用hdfs上的文件,具體下:

1)修改conf/spark-default.conf添加以下配置

spark.yarn.jar hdfs://master:9000/system/spark/jars/spark-assembly-1.6.0-hadoop2.6.0.jar

2)再次執行SparkPi,提交腳本發生了變化,如下:

bin/spark-submit --class org.apache.spark.examples.SparkPi \ --master yarn-cluster \ --num-executors 3 \ --driver-memory 1g \ --executor-memory 1g \ --executor-cores 1 \ lib/spark-examples*.jar 10

2. 用戶jar包重復上傳,避免重復分發

bin/spark-submit --class org.apache.spark.examples.SparkPi \ --master yarn-cluster \ --num-executors 3 \ --driver-memory 1g \ --executor-memory 1g \ --executor-cores 1 \ hdfs://master:9000/user/spark/jars/spark-examples-1.6.0-hadoop2.6.0.jar 10

技術分享

3.2.2 提高數據本地性

技術分享

分布式數據並行環境下,保持數據的本地性是非常重要的內容,事關分布式系統性能高下,涉及到數據本地性的概念有block、partition、worker、rack。

Spark中的數據本地性有三種:

  • PROCESS_LOCAL是指讀取緩存在本地節點的數據

  • NODE_LOCAL是指讀取本地節點

  • RACK_LOCAL是指讀非本機架的節點數據

yarn和hfs盡可能的在一個節點上很多rack local,說明本地性很差,可以通過增加副本數來提升本地新。

3.2.3 存儲格式選擇

BAT等公司80%都是采用列式存儲結構 ,相同的列存儲在一起,只讀取所需的列,io減少,相同的列存在一起,壓縮比會非常高。大概是行存儲的1/22個apache頂級項目ORC:源自於hive,建表最好都搞成orc,hive常用parquet,rdd讀取效率低,spark sql高效率讀取

列式存儲和行式存儲相比有哪些優勢呢?

·可以跳過不符合條件的數據,只讀取需要的數據,降低IO數據量。

·壓縮編碼可以降低磁盤存儲空間。由於同一列的數據類型是一樣的,可以使用更高效的壓縮編碼(例如Run Length Encoding和Delta Encoding)進一步節約存儲空間。

·只讀取需要的列,支持向量運算,能夠獲取更好的掃描性能。

技術分享

從上圖可以很清楚地看到,行式存儲下一張表的數據都是放在一起的,但列式存儲下都被分開保存了。所以它們就有了如下這些優缺點:

技術分享

通過字典表壓縮數據。為了方面後面的講解,這部分也順帶提一下了。

下面中才是那張表本來的樣子。經過字典表進行數據壓縮後,表中的字符串才都變成數字了。正因為每個字符串在字典表裏只出現一次了,所以達到了壓縮的目的(有點像規範化和非規範化Normalize和Denomalize)

下面這個圖,通過一條查詢的執行過程說明列式存儲(以及數據壓縮)的優點:

技術分享

關鍵步驟如下:

1.去字典表裏找到字符串對應數字(只進行一次字符串比較)。

2. 用數字去列表裏匹配,匹配上的位置設為1。

3. 把不同列的匹配結果進行位運算得到符合所有條件的記錄下標。

4. 使用這個下標組裝出最終的結果集。

3.2.4 選擇高配機器

隨著硬件的不斷發展和企業的需求的不斷變化, 大部分企業在集群搭建初期和中期的集群配置都不一樣。而Spark是一個非常消耗內存的,因此對於初期一些配置較低,尤其內存較差的機器,是不適合跑spark任務的,更加適合硬件配置高一點的機器上跑。機器配置的參差不齊,應該如何有區別的調度

,yarn提供了很好的解決方案。

技術分享

yarn支持標簽,根據機器的配置,給機器打相應的標簽,標簽如何打不在討論範圍,目前只有capacity 調度算法支持標簽給隊列支持打標簽,將標簽和隊列綁定在一起,將應用程序提交到指定標簽的隊列,執行的時候就會提交到到相應標簽節點 。

很多時候是基礎平臺的修改,運維負責優化 yarn基於標簽的調度,haodop從hadoop2.6.0開始提供基於標簽的調度策略。

3.3 優化操作符

3.3.1 過濾操作導致多小任務

filter操作使用不當,很容易引發麻煩。假如一個任務有3個parition,經過filger過濾之後,可能導致部分剩下很少,有些剩余很多,剩余很多的在下一步計算量很大,會拖後腿,其他的作業很快就做完了,而剩余很多的要執行很長時間,整個任務都要延誤,而其他很快執行完的作業早就釋放資源了

造成資源還的浪費

對於這種場景有2種優化策略:

1)coalses:合並已有的partiion,性能非常高,但是很有可能還不是很均與,

大的依舊很大,小的進行了合並

2)repartion:根據數據量燈亮劃分,每個partion盡可能均勻,會經過一次shuffle比較均勻

技術分享

3.3.2 降低單條記錄開銷

做過Java連接數據庫操作的人都知道,要盡量避免數據庫鏈接的頻繁建立和斷開,方法很多,比如數據庫連接池的發明。單機版本對數據庫的連接操作比較容易管理和控制,但在分布式環境下,數據庫的連接管理和控制很麻煩,數據的連接是不可序列化的,因此分布式環境下,統一管理數據庫連接顯然是不靠譜的。比如這段代碼,如果寫數據到數據庫,就會頻繁建立和斷開連接,顯然是低效率的。因為數據庫連接的不可序列化,你也不可能把conn拿出來。

技術分享

解決方法是:使用mapPartitions或者mapWith操作符

技術分享

原因在於mapPartitions是map的調用的粒度不同,map的輸入變換函數是應用於RDD中每個元素,而mapPartitions的輸入函數是應用於每個分區。

假設一個rdd有10個元素,分成3個分區。如果使用map方法,map中的輸入函數會被調用10次;而使用mapPartitions方法的話,其輸入函數會只會被調用3次,每個分區調用1次。在大數據集情況下的資源初始化開銷和批處理處理,尤其數據庫鏈接操作,顯得特別好用。

3.3.3 處理數據傾斜或者任務傾斜

spark任務中的數據傾斜可能導致某一臺節點超負荷運轉、內存不足,其他節點都處於空閑等待,加內存不能解決問題。應該找到出問題的shuffle操作,修正它。具體問題具體分析,有如下個方向,但不限於此。

1)改變數據結構,從源頭調整數據的分布,例如分表,分區存放,橫向或者縱向分表等。

2)修改並行度

· 改變並行度可以改善數據傾斜的原因是因為如果某個task有100個key並且數據巨大,那麽有可能導致OOM或者任務運行緩慢; ·此時如果把並行度變大,那麽可以分解每個task的數據量,比如把該task分解給10個task, 那麽每個task的數據量將變小,從而可以解決OOM或者任務執行慢.對應reduceByKey而言可以傳入並行度參數也可以自定義partition. · 增加並行度:改變計算資源並沒有從根本上解決數據傾斜的問題,但是加快了任務運行的速度. · 這是加入有傾斜的key, 加隨機數前綴,reduceByKey聚合操作可以分而治之,產生的結果是代前綴的,因此需

3)提取聚集,預操作join,  把傾斜數據在上遊進行操作.

4)局部聚合+全局聚合

5)盡量避免shuffle

6)啟用推測執行,避免慢節點任務拖後腿,慢磁盤問題在hadoop集群運行了好幾年之後非常明顯,尤其是磁盤,hadoop以及很多監控工具並沒有對慢磁盤進行監控,需要自己寫腳本監控。

下面的例子就是通過對key進行增加隨機數,然後進行局部聚合+全局聚合

技術分享

3.3.4 復用RDD進行緩存

RDD是一系列的數據+計算,每一次計算利用上次計算結果都會重新計算,對於一些常用的計算結果可以緩存起來,避免重復計算

技術分享

技術分享

cache和persist的區別:cache只有一個默認的緩存級別MEMORY_ONLY ,而persist可以根據情況設置其它的緩存級別。

3.3.5 慎用耗資源操作符

選擇 Operator 方案的主要目標是減少 shuffle 的次數以及被 shuffle 的文件的大小。因為 shuffle 是最耗資源的操作,所以有 shuffle 的數據都需要寫到磁盤並且通過網絡傳遞,repartition,join,cogroup,以及任何 *By 或者 *ByKey 的 transformation 都需要 shuffle 數據。不是所有這些 Operator 都是平等的,但是有些常見的性能陷阱是需要註意的。消耗資源的操作盡量少用,能用小砍刀辦到的事情,何須屠龍刀,Spark中比較消耗資源的操作有

· 笛卡爾積操作

· 帶shuffle的各種算子

如果可能,用treeReduce代替reduce,盡量用reduceByKey替代groupByKey,舉個栗子

技術分享

盡量避免差生shuffle,什麽時候不發生 Shuffle 當前一個 transformation 已經用相同的 patitioner 把數據分 patition 了,Spark知道如何避免 shuffle

3.3.6 作業並行化

Job之間如果沒有依賴關系,在資源允許的情況下,當然是能並行就更佳,能高並發的,高吞吐量的時候那還要等什麽,除非你不希望活早點幹完。Job之間並行註意2點

· 啟動FAIR調度器:spark.scheduler.mode=fait

· 將action相關操作放到單獨線程中

技術分享

3.4 作業參數調優

3.4.1 設置合適的資源量

實際開發過程中不好把控需要啟動多少個Executor,每個Executor多少個內存要在測試環境不斷的嘗試,每個應用程序都不一樣,要根據實際針對性的調整。需要把握幾點

· Executor數目並非越多越好

· Task數目不應小於Executor的core總數,要不然有些Executor就白啟動了,跑空任務

假設有1G的數據,默認8個task,是啟動8個Executor,一個Executor1核,還是啟動4個Executor,一個Executor2核,這個需要手動調試去觀察和比較。要一個個的去試試才知道了,沒有固定的方式方法。

3.4.2 設置合理的JVM

JVM調優用的比較多的是調整GC策略,如何調整垃圾的回收機制主要是Executor的垃圾回收機制。可以為spark定制與Hadoop不同的jdk版本,前提是各個節點要安裝了 ,通過Spark Web UI可以查看GC消耗的時間。JVM參數設置主要有2個對象:driver和executor。並且提交的方式不一樣,參數設置和程序讀取的地方也不一樣。

1. Driver的JVM參數

yarn-client模式: (1)-Xmx,-Xms,默認讀取spark-env.sh,文件中的SPARK_DRIVER_MEMORY值,-Xmx,-Xms值一樣大小;(2)PermSize,默認讀取spark-class文件中的JAVA_OPTS="-XX:MaxPermSize=256m $OUR_JAVA_OPTS"值;(3)GC方式,默認讀取的是spark-class文件中的JAVA_OPTS

yarn-cluster模式:(1) -Xmx,-Xms讀取的是spark-default.conf文件中的spark.driver.extraJavaOptions對應的JVM參數值;(2)PermSize,讀取的是spark-default.conf文件中的spark.driver.extraJavaOptions對應的JVM參數值。(3)取的是spark-default.conf文件中的spark.driver.extraJavaOptions對應的參數值

上值最後均可被spark-submit工具中的--driver-java-options參數覆蓋。

2.Executor的JVM參數

-Xmx,-Xms,如果是yarn-client模式,則默認讀取spark-env文件中的SPARK_EXECUTOR_MEMORY值,-Xmx,-Xms值一樣大小;如果是yarn-cluster模式,則讀取的是spark-default.conf文件中的spark.executor.extraJavaOptions對應的JVM參數值。

PermSize,兩種模式都是讀取的是spark-default.conf文件中的spark.executor.extraJavaOptions對應的JVM參數值。GC方式,兩種模式都是讀取的是spark-default.conf文件中的spark.executor.extraJavaOptions對應的JVM參數值。

3.JVM參數說明

-Xms:初始堆的大小,默認是物理內存的1/64

-Xmx:最大堆大小,默認物理內存的1/4

-Xmn:年輕代大小

-XX:PermSize:持久化初始值,默認是物理內存的1/64

-XX:MaxPermSize:設置持久代最大值,默認物理內存的1/4

-XX:+UseConcMarkSweepGC,使用CMS內存收集

技術分享

3.4.3 啟用更高效的序列化方法

序列化常用於網絡傳輸和數據持久化以便於存儲和傳輸,分布式集群有大量的網絡傳輸和數據存儲,序列化的重要性不言而喻。

如果序列化格式序列化過程緩慢,或者需要占用字節很多,都會大大拖慢整體的計算效率。通常,序列化都是Spark應用優化時首先需要關註的地方。Spark著眼於要達到便利性(允許你在計算過程中使用任何Java類型)和性能的一個平衡。Spark主要提供了兩個序列化庫:

Spark主要提供了兩個序列化庫:

  • Java serialization: 默認情況,Spark使用Java自帶的ObjectOutputStream 框架來序列化對象,這樣任何實現了 java.io.Serializable 接口的對象,都能被序列化。同時,你還可以通過擴展 java.io.Externalizable 來控制序列化性能。Java序列化很靈活但性能較差,同時序列化後占用的字節數也較多。

  • Kryo serialization: Spark還可以使用Kryo 庫(版本2)提供更高效的序列化格式。Kryo的序列化速度和字節占用都比Java序列化好很多(通常是10倍左右),但Kryo不支持所有實現了Serializable 接口的類型,它需要你在程序中 register 需要序列化的類型,以得到最佳性能。

要切換到使用 Kryo,你可以在 SparkConf 初始化的時候調用 conf.set(“spark.serializer”, “org.apache.spark.serializer.KryoSerializer”)。這個設置不僅控制各個worker節點之間的混洗數據序列化格式,同時還控制RDD存到磁盤上的序列化格式。目前,Kryo不是默認的序列化格式,因為它需要你在使用前註冊需要序列化的類型,不過我們還是建議在對網絡敏感的應用場景下使用Kryo。

3.4.4 增大off head內存

spark為了引擎更高效,用了部分堆外內存,這種類型的內存用JVM的參數就很難控制住,內存不夠會被殺掉。可能不是jvm的內存不夠了,可能是對外內存不足,設置overhead參數

memory*0.1。Spark On YARN,Executor經常被YARN殺掉報的錯如下所示:

· 解決方法:增大overhead內存大小,默認是內存的10%

driver:spark.yarn.driver.memeoryOverhead

executor:spark.yarn.executor.memoryOverhead

技術分享

3.4.5 shuffle參數調優

1) shuffle實現的選擇

Hash-based Shuffle:每個executor產生R個文件

Sorted-based Shuffle(默認實現):每個Map Task產生一個文件,更省內存的實現

如何選擇?reduce task數目較多時,選擇sort-based實現,修改spark.shuffle.manager,選擇hash或者sort,shuffle的詳細可以參考上一篇文章

技術分享

2)默認情況下

reduce task收到的數據會存到內存(HashTable)中,防止reduce task的OOM可以將spark.shuffle.spill設為true。

spill條件,可通過spark.shuffle.memeoryFraction設置,默認是0.3。

3.4.6 設置reduce task數目

Reduce Task數目過小,運行過慢,且可能導致OOM

Reduce Task數目過大,產生較多小任務,啟動和調度開銷增大

顯示設置reduce task數目,比如groupByKey,reduceByKey等均提供了設置參數

默認修改參數值spark.default.parallelism,模式是跟前一個階段一致。

3.4.7 使用spark sql實現

越來越多的程序會采用spark sql編寫,而不是spark core API,原因是

1)更加簡單,DataFrame/Dataset是更高級的API,而RDD API是比較低級

2)更加高效,spark sql自帶了優化器,可以自動優化你的程序

4.調優案例

略,後面實操了一遍補上

5.參考資料

1.http://www.csdn.net/article/2015-07-08/2825160 Spark性能調優

2.http://blog.csdn.net/xiaolang85/article/details/51705088

3.http://blog.csdn.net/wuxb_2000/article/details/52870198

4..http://www.cnblogs.com/redcreen/archive/2011/05/04/2037057.html,JVM系列三:JVM參數設置、分析

5.董西成ppt

【Spark深入學習 -14】Spark應用經驗與程序調優