1. 程式人生 > >Spark原始碼分析-spark叢集啟動及任務執行

Spark原始碼分析-spark叢集啟動及任務執行

注: 因為基於Akka的Actor的RPC版本相對容易理解一點,本文分析使用的Spark版本如下:

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.10</artifactId>
    <version>1.3.1</version>
</dependency>

叢集啟動過程分析

Master

Master程序通過start-master.sh啟動一個後臺程序,程式入口是Master類的main方法
該main方法的作用是建立一個ActorSystem,並建立一個master的actor,建立master的actor時,執行Master的構造方法做一些初始化的工作,同時執行Actor生命週期的prestart()方法

Worker

Worker過程也同Master,同時由於spark-env.sh的配置檔案中有Master啟動的伺服器地址和埠,Worker在prestart()方法中會獲取Master的AkkaSystem的URL並建立master actor的引用,並向Master註冊自己
大體過程如下圖:
叢集啟動過程

Spark-Submit提交任務

我們以提交簡單的WordCount任務為例:

object WordCount {

  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("WordCount"
) val sc = new SparkContext(conf); sc.textFile("hdfs://xxxx/input").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("hdfs://xxxx/output"); sc.stop() } }

並通過如下方式提交任務:

spark-submit.sh --class com.xxx.xxx.WordCount --master spark://127.0.0.1:7070 —executor-memory 2G —total-executor-cores 4
/xxx/xxx/xxx/wordcount.jar

spark-submit.sh指令碼會啟動程序呼叫SparkSubmit類的入口main方法,SparkSubmit的main方法中主要通過反射實際任務類,在本執行緒中執行該任務類的main方法,從而進入到使用者任務邏輯中

使用者的任務邏輯中,SparkConf是初始化相關的引數,核心邏輯在SparkContext中

SparkContext初始化邏輯

1.Executor排程及啟動過程分析

SparkContext是Spark功能的入口,在new SparkContext物件是,SparkContext做了很多初始化的工作,最主要的是建立taskScheduler和dagScheduler
SparkContext初始化
整個過程簡單描述:

  1. sparkcontext建立taskSchedulerImpl物件,然後根據不同的資源排程模型初始化不同的schedulerBackend及排程模型
  2. 建立DAGScheduler物件,dagScheduler在初始化過程中建立一個DAGSchedulerEventProcessLoop物件,該物件內部包含一個阻塞佇列,dagScheduler初始化的最後一行呼叫eventprocessloop.start()方法,該方法執行啟動一個守護執行緒不斷監控佇列的元素取出處理。
  3. 執行taskSchedulerImpl物件的start方法,同時執行對應的backend的start,此處以SparkDeploySchedulerBackend為例,backend將需要啟動的executor類名及相關的executor的啟動引數封裝到ApplicationDescription物件中,同時建立一個AppClient物件,AppClient裡有一個ClientActor,在AppClient執行start時會初始化ClientActor並給Master傳送RegisterApplication的訊息,同時把ApplicationDesc帶過去。
  4. Master處理Application註冊的訊息後返回註冊完成的訊息給ClientActor,並呼叫自己的scheduler()方法開始按照策略給worker分配該Application的執行Executor,並把對應的訊息傳送給worker讓worker啟動Executor
  5. Worker接收到Master傳送的LaunchExecutor命令後,根據appDesc的描述資訊初始化相關的啟動引數,然後啟動一個執行緒,通過fetchAndRunExecutor()方法啟動一個Executor的子程序,同時向Master彙報當前啟動的Executor的狀態

至此,由Master排程,分配給Worker啟動Executor訊息請求工作完成,下面開始分析Executor程序真正啟動過程中做了哪些事情。

Executor子程序啟動過程分析

以Spark Standalone叢集為例,Spark叢集啟動的Executor是CoarseGrainedExecutorBackend的例項,在執行任務時jps對應的worker機器檢視程序,能看到名為CoarseGrainedExecutorBackend的程序名,CoarseGrainedExecutorBackend程序啟動的入口自然也是對應的main方法。
Executor啟動過程
CoarseGrainedExecutorBackend啟動初始化過程步驟如下:

  1. 入口main方法首先構建自己的ActorSystem,然後根據啟動引數傳進來的dirverUrl構建對應的DriverActor的ActorRef引用,然後向driver傳送RegisterExecutor註冊自己。
  2. driver收到訊息後返回給ExecutorBackend確認註冊訊息後,ExecutorBackend建立一個Executor類的物件,內部構造方法構建一個執行緒池用於任務(TaskRunner封裝)的執行,並持續向driver端的TaskScheduler和DAGScheduler傳送任務心跳資訊。
  3. 建立一個WorkerWatcher的Actor向Worker傳送心跳資訊。

至此,new SparkContext()的過程就處理完成了,下面開始執行具體的任務。

Task執行過程分析

下面著重分析一下這一行語句的執行過程

sc.textFile("hdfs://xxxx/input").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("hdfs://xxxx/output");

構建RDD及關係過程

  1. textFile()會構建一個HadoopRDD利用Hadoop中的FileInputFormat對輸入檔案進行分片處理,然後呼叫map()方法生成一個MapPartitionsRDD對每個分片的資料進行迭代讀取到記憶體。所以textFile()方法會構建兩個RDD:HadoopRDD和MapPartitionsRDD
  2. flatMap()方法會對前面獲取的資料按照分割槽做Scala的flatMap壓平操作,所以也是通過MapPartitionsRDD讀取並處理資料。該方法構建一個MapPartitionsRDD
  3. map()方法獲取壓平後的資料進行進一步迭代處理,也是構建一個MapPartitionsRDD
  4. reduceByKey()方法呼叫時會先通過implicit
    conversion(隱式轉換)把普通的RDD擴充套件成PairRDDFunctions,進而呼叫擴充套件的reduceByKey方法,reduceByKey呼叫底層的combineByKey方法,生成一個ShuffledRDD返回,ShuffleRDD類過載了父類的getDependencies()方法,返回一個ShuffleDependency,也就是常說的寬依賴
  5. saveAsTextFile()方法會呼叫RDD的mapPartitions()方法迭代分片處理資料,內部會構建一個MapPartitionsRDD。到目前還只是構建RDD及對應的依賴關係,構建完成RDD及對應關係後開始真正的提交任務並執行的操作。

任務排程及提交執行

正如spark手冊描述的,Transform方法是定義RDD的操作邏輯和關係,並沒有做任何實際的資料操作;Action方法才開始做真正的任務排程和提交。本例中的saveAsTextFile就是一個Action方法,前面構建RDD部分邏輯暫時忽略,從提交任務邏輯開始分析。
任務提交及執行過程
具體邏輯如下:

  1. 以PairRDDFunctions的saveAsHadoopDataset作為任務提交的入口,呼叫RDD內部成員SparkContext的runJob()方法。
  2. 呼叫對應dagScheduler的runJob()方法,最終是通過在submitJob中例項化一個JobSubmitted的事件物件,扔到DAGSchedulerEventProcessLoop的阻塞佇列,DAGSchedulerEventProcessLoop在建立SparkContext時已經啟動了一個執行緒迴圈處理接收到的事件,事件處理邏輯通過模式匹配,最終呼叫dagScheduler的handleJobSubmitted方法。
  3. handleJobSubmitted內通過迭代RDD的父依賴的遞迴呼叫,判斷父依賴是否為ShuffleDependency來劃分stage,同時生成stage的依賴關係。
  4. 提交stage,迭代從第一個stage開始處理並構建stage對應的task:根據stage是否包含shuffle操作確認是ShuffleMapTask任務還是普通的ResultTask,並根據分割槽數生成對應多個task。然後把該stage分解成的一組tasks封裝到一個taskset並提交給TaskSchedulerImpl處理。
  5. taskScheduler將taskset封裝成TaskSetManager並新增到activeTaskSets的HashMap結構中,然後通過CoarseGrainedSchedulerBackend呼叫reviveOffers方法向其內部類DriveActor傳送ReviveOffers訊息,觸發DriveActor的makeOffers操作進行Executor的任務分配;makeOffers內先呼叫taskScheduler的resourceOffers基於資料就近原則,給每個task分配合適的executor(基於每個task對應partition的位置資訊及executor向給DriverActor上報的資訊)。
  6. 根據前面計算的排程結果,序列化task,並向相應executor傳送對應的LaunchTask資訊
  7. ExecutorBackend接收到LaunchTask的訊息,反序列化task描述資訊,並初始化一個TaskRunner的執行緒物件,交給內部executor的執行緒池處理task;執行緒執行task的run方法,對應到Task具體實現類ShuffleMapTask或ResultTask的runTask()方法,執行具體的業務邏輯。
  8. executorBackend獲取到task的執行情況,並通過StatusUpdate訊息傳送給DriverActor。

至此,任務的提交及執行過程分析完成。