1. 程式人生 > >Spark on YARN cluster & client 模式作業執行全過程分析

Spark on YARN cluster & client 模式作業執行全過程分析

原文連結列表如下,致謝:

https://www.iteblog.com/archives/1223.html

https://www.iteblog.com/archives/1189.html

https://www.iteblog.com/archives/1191.html


一、Spark:Yarn-Cluster 與 Yarn-Client 的區別與聯絡

我們都知道Spark支援在yarn上執行,但是Spark on yarn有分為兩種模式yarn-cluster和yarn-client,它們究竟有什麼區別與聯絡?閱讀完本文,你將瞭解。


  Spark支援可插拔的叢集管理模式(Standalone、Mesos以及YARN ),叢集管理負責啟動executor程序,編寫Spark application 的人根本不需要知道Spark用的是什麼叢集管理。Spark支援的三種叢集模式,這三種叢集模式都由兩個元件組成:master和slave。Master服務(YARN ResourceManager,Mesos master和Spark standalone master)決定哪些application可以執行,什麼時候執行以及哪裡去執行。而slave服務( YARN NodeManager, Mesos slave和Spark standalone slave)實際上執行executor程序。


  當在YARN上執行Spark作業,每個Spark executor作為一個YARN容器(container)執行。Spark可以使得多個Tasks在同一個容器(container)裡面執行。這是個很大的優點。

  注意這裡和Hadoop的MapReduce作業不一樣,MapReduce作業為每個Task開啟不同的JVM來執行。雖然說MapReduce可以通過引數來配置。詳見 mapreduce.job.jvm.numtasks。關於這個引數的介紹已經超過本篇文章的介紹。

  從廣義上講,yarn-cluster適用於生產環境;而yarn-client適用於互動和除錯,也就是希望快速地看到application的輸出。

  在我們介紹yarn-cluster和yarn-client的深層次的區別之前,我們先明白一個概念:Application Master。在YARN中,每個Application例項都有一個Application Master程序,它是Application啟動的第一個容器。它負責和ResourceManager打交道,並請求資源。獲取資源之後告訴NodeManager為其啟動container。

  從深層次的含義講,yarn-cluster和yarn-client模式的區別其實就是Application Master程序的區別,yarn-cluster模式下,driver執行在AM(Application Master)中,它負責向YARN申請資源,並監督作業的執行狀況。當用戶提交了作業之後,就可以關掉Client,作業會繼續在YARN上執行。然而yarn-cluster模式不適合執行互動型別的作業。而yarn-client模式下,Application Master僅僅向YARN請求executor,client會和請求的container通訊來排程他們工作,也就是說Client不能離開。看下下面的兩幅圖應該會明白(上圖是yarn-cluster模式,下圖是yarn-client模式):


圖一:yarn cluste


圖二:yarn client



二、Spark on YARN cluster 模式作業執行全過程分析

下面是分析Spark on YARN的Cluster模式,從使用者提交作業到作業執行結束整個執行期間的過程分析。

客戶端進行操作

  1、根據yarnConf來初始化yarnClient,並啟動yarnClient
  2、建立客戶端Application,並獲取Application的ID,進一步判斷叢集中的資源是否滿足executor和ApplicationMaster申請的資源,如果不滿足則丟擲IllegalArgumentException;
  3、設定資源、環境變數:其中包括了設定Application的Staging目錄、準備本地資源(jar檔案、log4j.properties)、設定Application其中的環境變數、建立Container啟動的Context等;
  4、設定Application提交的Context,包括設定應用的名字、佇列、AM的申請的Container、標記該作業的型別為Spark;
  5、申請Memory,並最終通過yarnClient.submitApplication向ResourceManager提交該Application。
  當作業提交到YARN上之後,客戶端就沒事了,甚至在終端關掉那個程序也沒事,因為整個作業執行在YARN叢集上進行,執行的結果將會儲存到HDFS或者日誌中。

提交到YARN叢集,YARN操作

  1、執行ApplicationMaster的run方法;
  2、設定好相關的環境變數。
  3、建立amClient,並啟動;
  4、在Spark UI啟動之前設定Spark UI的AmIpFilter;
  5、在startUserClass函式專門啟動了一個執行緒(名稱為Driver的執行緒)來啟動使用者提交的Application,也就是啟動了Driver。在Driver中將會初始化SparkContext;
  6、等待SparkContext初始化完成,最多等待spark.yarn.applicationMaster.waitTries次數(預設為10),如果等待了的次數超過了配置的,程式將會退出;否則用SparkContext初始化yarnAllocator;

   怎麼知道SparkContext初始化完成?
  其實在5步驟中啟動Application的過程中會初始化SparkContext,在初始化SparkContext的時候將會建立YarnClusterScheduler,在SparkContext初始化完成的時候,會呼叫YarnClusterScheduler類中的postStartHook方法,而該方法會通知ApplicationMaster已經初始化好了SparkContext

  7、當SparkContext、Driver初始化完成的時候,通過amClient向ResourceManager註冊ApplicationMaster
  8、分配並啟動Executeors。在啟動Executeors之前,先要通過yarnAllocator獲取到numExecutors個Container,然後在Container中啟動Executeors。如果在啟動Executeors的過程中失敗的次數達到了maxNumExecutorFailures的次數,maxNumExecutorFailures的計算規則如下:

// Default to numExecutors * 2, with minimum of 3 private val maxNumExecutorFailures = sparkConf.getInt( "spark.yarn.max.executor.failures" ,      sparkConf.getInt( "spark.yarn.max.worker.failures" , math.max(args.numExecutors * 2 , 3 )))

  那麼這個Application將失敗,將Application Status標明為FAILED,並將關閉SparkContext。其實,啟動Executeors是通過ExecutorRunnable實現的,而ExecutorRunnable內部是啟動CoarseGrainedExecutorBackend的。
  9、最後,Task將在CoarseGrainedExecutorBackend裡面執行,然後執行狀況會通過Akka通知CoarseGrainedScheduler,直到作業執行完成。


三、Spark on YARN client 模式作業執行全過程分析

在前篇文章中我介紹了Spark on YARN叢集模式(yarn-cluster)作業從提交到執行整個過程的情況(詳情見《Spark on YARN叢集模式作業執行全過程分析》),我們知道Spark on yarn有兩種模式:yarn-cluster和yarn-client。這兩種模式作業雖然都是在yarn上面執行,但是其中的執行方式很不一樣,今天我就來談談Spark on YARN yarn-client模式作業從提交到執行的過程剖析。
  和yarn-cluster模式一樣,整個程式也是通過spark-submit指令碼提交的。但是yarn-client作業程式的執行不需要通過Client類來封裝啟動,而是直接通過反射機制呼叫作業的main函式。下面就來分析:
  1、通過SparkSubmit類的launch的函式直接呼叫作業的main函式(通過反射機制實現),如果是叢集模式就會呼叫Client的main函式。
  2、而應用程式的main函式一定都有個SparkContent,並對其進行初始化;
  3、在SparkContent初始化中將會依次做如下的事情:設定相關的配置、註冊MapOutputTracker、BlockManagerMaster、BlockManager,建立taskScheduler和dagScheduler;其中比較重要的是建立taskScheduler和dagScheduler。在建立taskScheduler的時候會根據我們傳進來的master來選擇Scheduler和SchedulerBackend。由於我們選擇的是yarn-client模式,程式會選擇YarnClientClusterScheduler和YarnClientSchedulerBackend,並將YarnClientSchedulerBackend的例項初始化YarnClientClusterScheduler,上面兩個例項的獲取都是通過反射機制實現的,YarnClientSchedulerBackend類是CoarseGrainedSchedulerBackend類的子類,YarnClientClusterScheduler是TaskSchedulerImpl的子類,僅僅重寫了TaskSchedulerImpl中的getRackForHost方法。
  4、初始化完taskScheduler後,將建立dagScheduler,然後通過taskScheduler.start()啟動taskScheduler,而在taskScheduler啟動的過程中也會呼叫SchedulerBackend的start方法。在SchedulerBackend啟動的過程中將會初始化一些引數,封裝在ClientArguments中,並將封裝好的ClientArguments傳進Client類中,並client.runApp()方法獲取Application ID。
  5、client.runApp裡面的做是和前面客戶端進行操作那節類似,不同的是在裡面啟動是ExecutorLauncher(yarn-cluster模式啟動的是ApplicationMaster)。
  6、在ExecutorLauncher裡面會初始化並啟動amClient,然後向ApplicationMaster註冊該Application。註冊完之後將會等待driver的啟動,當driver啟動完之後,會建立一個MonitorActor物件用於和CoarseGrainedSchedulerBackend進行通訊(只有事件AddWebUIFilter他們之間才通訊,Task的執行狀況不是通過它和CoarseGrainedSchedulerBackend通訊的)。然後就是設定addAmIpFilter,當作業完成的時候,ExecutorLauncher將通過amClient設定Application的狀態為FinalApplicationStatus.SUCCEEDED。
  7、分配Executors,這裡面的分配邏輯和yarn-cluster裡面類似,就不再說了。
  8、最後,Task將在CoarseGrainedExecutorBackend裡面執行,然後執行狀況會通過Akka通知CoarseGrainedScheduler,直到作業執行完成。
  9、在作業執行的時候,YarnClientSchedulerBackend會每隔1秒通過client獲取到作業的執行狀況,並打印出相應的執行資訊,當Application的狀態是FINISHED、FAILED和KILLED中的一種,那麼程式將退出等待。
  10、最後有個執行緒會再次確認Application的狀態,當Application的狀態是FINISHED、FAILED和KILLED中的一種,程式就執行完成,並停止SparkContext。整個過程就結束了。