1. 程式人生 > >spark任務執行過程的原始碼分析

spark任務執行過程的原始碼分析

spark任務執行的原始碼分析

在整個spark任務的編寫、提交、執行分三個部分:
① 編寫程式和提交任務到叢集中
②sparkContext的初始化
③觸發action運算元中的runJob方法,執行任務

(1)程式設計程式並提交到叢集:

①程式設計spark程式的程式碼
②打成jar包到叢集中執行
③使用spark-submit命令提交任務
在提交任務時,需要指定 --class 程式的入口(有main方法的類),
1) spark-submit --class xxx
2) ${SPARK_HOME}/bin/spark-class org.apache.spark.deploy.SparkSubmit

[email protected]
3) org.apache.spark.launcher.Main
submit(appArgs, uninitLog)
doRunMain()
runMain(childArgs, childClasspath, sparkConf, childMainClass, args.verbose)
childMainClass:…./.WordCount (自己編寫的程式碼的主類)
mainClass = Utils.classForName(childMainClass)
val app: SparkApplication = if() {} else {new JavaMainApplication(mainClass)}
app.start(childArgs.toArray, sparkConf) // 通過反射呼叫mainClass執行
// 到此為止,相當於呼叫了我們的自己編寫的任務類的main方法執行了。!!!
val mainMethod = klass.getMethod("main", new ArrayString
.getClass)
mainMethod.invoke(null, args)
④開始執行自己編寫的程式碼

(2)初始化sparkContext:

當自己編寫的程式執行到:new SparkContext()時,就開始了精妙而細緻的sparkContext的初始化。
sparkContext的相關介紹:sparkContext是使用者通往spark叢集的唯一入口,可以用來在spark叢集中建立RDD、累加器和廣播變數。sparkContext也是整個spark應用程式的一個至關重要的物件,是整個應用程式執行排程的核心(不是資源排程的核心)。在初始化sparkContext時,同時的會初始化DAGScheduler、TaskScheduler和SchedulerBackend,這些至關重要的物件。
sparkContext的構建過程


spark任務執行過程的原始碼分析

1)Driver端執行的程式碼:

初始化 TaskScheduler
 初始化 SchedulerBackend
 初始化 DAGScheduler

spark任務執行過程的原始碼分析

2)worker和master端執行的程式碼:

driver向master註冊申請資源。
  Worker負責啟動executor。

spark任務執行過程的原始碼分析

(3)觸發action運算元中的runJob方法:

spark任務執行過程的原始碼分析

spark任務執行總結:

  • 將編寫的程式打成jar包
  • 呼叫spark-submit提交任務到叢集上執行
  • 執行sparkSubmit 的main方法,在這個方法中通過反射的方式建立我們編寫的主類的例項物件,然後呼叫該物件的main方法,開始執行我們編寫的程式碼
  • 當代碼執行到new SparkContext物件的的時候,就開始了複雜和精緻的sparkContext物件的初始化
  • 在初始化SparkContext物件的時候,會建立兩個特別重要的物件,分別是:DAGScheduler 和 TaskScheduler,其中【DAGScheduler 的作用】將RDD的依賴切成一個一個的stage,然後stage作為taskSet提交給Taskscheduler。
  • 在構建TaskScheduler的同時,會建立兩個非常重要的物件,分別是 DriverActor 和 ClientActor,DriverActor負責接收executor的反向註冊,將任務提交給executor執行,clientActor是負責向master註冊並提交任務
  • 當clientActor啟動時,會將使用者提交的任務相關的引數分裝到applicationDescription物件中去,然後提交給master進行任務註冊
  • 當master接收到clientActor提交的任務請求時,會將請求的引數進行分析,並封裝成application,然後將其持久化,然後將其加入到任務佇列waitingApps中。
  • 當輪到我們提交任務的時候,就開始執行schedule(),進行任務資源的排程
  • worker接收到master傳送來的launchExecutor 時,會將其解壓並封裝到ExecutorRunner中,然後呼叫這個物件的start方法,啟動executor
  • executor啟動後會向driver反向註冊
  • driver會發送註冊成功資訊,給executor
  • executor接收到driver actor註冊成功資訊後,就會建立一個執行緒池,用於執行driveractor傳送過來的任務
  • 當屬於這個任務的所有的 Executor 啟動並反向註冊成功後,就意味著執行這個任務的 環境已經準備好了,driver 會結束 SparkContext 物件的初始化,也就意味著 new SparkContext 這句程式碼執行完成
  • 當sparkContext初始化完成之後,就會繼續執行我們的程式碼,直到執行到action運算元時,也就意味著觸發了一個job的提交
  • driver 會將這個 job 提交給 DAGScheduler
  • DAGScheduler將接收到的job,從最後一個運算元開始推導,將DAG根據依賴關係劃分成為一個個stage,然後將stage封裝成一個taskSet,並將taskSet中的task提交給taskScheduler
  • taskScheduler接收到DAGScheduler傳送過來的task,會拿到一個序列化器,對task進行序列化,然後將序列化好的task封裝到launchTask中,然後將launchTask傳送給指定的executor中執行
  • executor接收到了DriverActor 傳送過來的launchTask 時,會拿到一個反序列化器,對launchTask 進行反序列化,封裝到一個TaskRunner 中,然後從executor這個執行緒池中獲取一個執行緒,將反序列化好的任務中的運算元作用在RDD對應的分割槽上。
  • 最終當所有的task任務完成之後,整個application執行完成,關閉sparkContext物件。