1. 程式人生 > >Spark記錄-Spark on Yarn框架

Spark記錄-Spark on Yarn框架

ive 變量 進程 app shuf backend 性能 操作 spi

技術分享圖片

一、客戶端進行操作

1、根據yarnConf來初始化yarnClient,並啟動yarnClient
2、創建客戶端Application,並獲取Application的ID,進一步判斷集群中的資源是否滿足executor和ApplicationMaster申請的資源,如果不滿足則拋出IllegalArgumentException;
3、設置資源、環境變量:其中包括了設置Application的Staging目錄、準備本地資源(jar文件、log4j.properties)、設置Application其中的環境變量、創建Container啟動的Context等;
4、設置Application提交的Context,包括設置應用的名字、隊列、AM的申請的Container、標記該作業的類型為Spark;
5、申請Memory,並最終通過yarnClient.submitApplication向ResourceManager提交該Application。
當作業提交到YARN上之後,客戶端就沒事了,甚至在終端關掉那個進程也沒事,因為整個作業運行在YARN集群上進行,運行的結果將會保存到HDFS或者日誌中。

二、提交到YARN集群,YARN操作

1、運行ApplicationMaster的run方法;
2、設置好相關的環境變量。
3、創建amClient,並啟動;
4、在Spark UI啟動之前設置Spark UI的AmIpFilter;
5、在startUserClass函數專門啟動了一個線程(名稱為Driver的線程)來啟動用戶提交的Application,也就是啟動了Driver。在Driver中將會初始化SparkContext;
6、等待SparkContext初始化完成,最多等待spark.yarn.applicationMaster.waitTries次數(默認為10),如果等待了的次數超過了配置的,程序將會退出;否則用SparkContext初始化yarnAllocator;

7、當SparkContext、Driver初始化完成的時候,通過amClient向ResourceManager註冊ApplicationMaster
8、分配並啟動Executeors。在啟動Executeors之前,先要通過yarnAllocator獲取到numExecutors個Container,然後在Container中啟動Executeors。那麽這個Application將失敗,將Application Status標明為FAILED,並將關閉SparkContext。其實,啟動Executeors是通過ExecutorRunnable實現的,而ExecutorRunnable內部是啟動CoarseGrainedExecutorBackend的。

9、最後,Task將在CoarseGrainedExecutorBackend裏面運行,然後運行狀況會通過Akka通知CoarseGrainedScheduler,直到作業運行完成。

三、Spark on Yarn配置參數

1. spark.yarn.applicationMaster.waitTries 5

用於applicationMaster等待Spark master的次數以及SparkContext初始化嘗試的次數 (一般不用設置)

2.spark.yarn.am.waitTime 100s

3.spark.yarn.submit.file.replication 3

應用程序上載到HDFS的復制份數

4.spark.preserve.staging.files false

設置為true,在job結束後,將stage相關的文件保留而不是刪除。 (一般無需保留,設置成false)

5.spark.yarn.scheduler.heartbeat.interal-ms 5000

Spark application master給YARN ResourceManager 發送心跳的時間間隔(ms)

6.spark.yarn.executor.memoryOverhead 1000

此為vm的開銷(根據實際情況調整)

7.spark.shuffle.consolidateFiles true

僅適用於HashShuffleMananger的實現,同樣是為了解決生成過多文件的問題,采用的方式是在不同批次運行的Map任務之間重用Shuffle輸出文件,也就是說合並的是不同批次的Map任務的輸出數據,但是每個Map任務所需要的文件還是取決於Reduce分區的數量,因此,它並不減少同時打開的輸出文件的數量,因此對內存使用量的減少並沒有幫助。只是HashShuffleManager裏的一個折中的解決方案。

8.spark.serializer org.apache.spark.serializer.KryoSerializer

暫時只支持Java serializer和KryoSerializer序列化方式

9.spark.kryoserializer.buffer.max 128m

允許的最大大小的序列化值。

10.spark.storage.memoryFraction 0.3

用來調整cache所占用的內存大小。默認為0.6。如果頻繁發生Full GC,可以考慮降低這個比值,這樣RDD Cache可用的內存空間減少(剩下的部分Cache數據就需要通過Disk Store寫到磁盤上了),會帶來一定的性能損失,但是騰出更多的內存空間用於執行任務,減少Full GC發生的次數,反而可能改善程序運行的整體性能。

11.spark.sql.shuffle.partitions 800

一個partition對應著一個task,如果數據量過大,可以調整次參數來減少每個task所需消耗的內存.

12.spark.sql.autoBroadcastJoinThreshold -1

當處理join查詢時廣播到每個worker的表的最大字節數,當設置為-1廣播功能將失效。

13.spark.speculation false

如果設置成true,倘若有一個或多個task執行相當緩慢,就會被重啟執行。(事實證明,這種做法會造成hdfs中臨時文件的丟失,報找不到文件的錯)

14.spark.shuffle.manager tungsten-sort

tungsten-sort是一種類似於sort的shuffle方式,shuffle data還有其他兩種方式 sort、hash. (不過官網說 tungsten-sort 應用於spark 1.5版本以上)

15.spark.sql.codegen true

Spark SQL在每次執行次,先把SQL查詢編譯JAVA字節碼。針對執行時間長的SQL查詢或頻繁執行的SQL查詢,此配置能加快查詢速度,因為它產生特殊的字節碼去執行。但是針對很短的查詢,可能會增加開銷,因為它必須先編譯每一個查詢

16.spark.shuffle.spill false

如果設置成true,將會把spill的數據存入磁盤

17.spark.shuffle.consolidateFiles true

我們都知道shuffle默認情況下的文件數據為map tasks * reduce tasks,通過設置其為true,可以使spark合並shuffle的中間文件為reduce的tasks數目。

18.代碼中 如果filter過濾後 會有很多空的任務或小文件產生,這時我們使用coalesce或repartition去減少RDD中partition數量。

Spark記錄-Spark on Yarn框架