1. 程式人生 > >【Spark核心原始碼】SparkContext一些方法的解讀

【Spark核心原始碼】SparkContext一些方法的解讀

目錄

建立SchedulerBackend的TaskScheduler方法

設定並啟動事件匯流排

釋出環境更新的方法

釋出應用程式系統的方法


【Spark核心原始碼】SparkContext中的元件和初始化 已經介紹了Spark初始化時是如何執行的,都建立了哪些元件。這些元件具體技術細節後面會慢慢的說,而針對SparkContext,裡面還有一些方法,值得大家品一品。

建立SchedulerBackend的TaskScheduler方法

在SparkContext初始化中有這麼一段程式碼執行了SparkContext.createTaskScheduler方法,用於建立SchedulerBackend和TaskScheduler。TaskScheduler是Spark的重要組成部分,其進行兩級排程,第一級排程是負責向叢集管理器傳送請求給應用程式分配並執行Executor,第二級排程是給具體任務分配Executor並執行相關任務。

createTaskScheduler方法有點長,需要慢慢分析。

首先createTaskScheduler方法需要傳入三個引數,SparkContext,master和deployMode,從呼叫createTaskScheduler方法時可以看出,SparkContext就是使用的本身,"sc = this",master是配置檔案中key=“spark.master”所對應的值,deployMode是配置檔案中key=“spark.submit.deployMode”所對應的值。返回值型別是Tuple,第一個元素是SchedulerBackend,第二個元素是TaskScheduler。

接著就是對master進行模式匹配

如果master與local匹配:

分別建立TaskSchedulerImpl和LocalSchedulerBackend,然後初始化TaskSchedulerImpl,最後返回SchedulerBackend和TaskScheduler。其實後面的幾個模式匹配都是這個模式,只是根據匹配的不同內容做了前置調整。

這裡很值得注意的就是scheduler.initialize(backend)方法,initialize的主要作用就是依據不同的優先策略建立排程池,Spark中的排程模式主要有兩種:FIFO和FAIR。通過配置檔案的spark.scheduler.mode進行設定,預設情況下Spark的排程模式是FIFO(先進先出),誰先提交誰先執行,後面的任務需要等待前面的任務執行。而FAIR(公平排程)模式支援在排程池中為任務進行分組,不同的排程池權重不同,任務可以按照權重來決定執行順序。程式碼如下:

 

如果master與LOCAL_N_REGEX(threads)匹配:

表示通過"local[]"這是使用本地的執行緒數量,如果“[]”中是“*”就採用java虛擬機器可用的處理器數量,如果是具體數字就使用具體個數的處理器個數。然後和local模式一樣,分別建立TaskSchedulerImpl和LocalSchedulerBackend,然後初始化TaskSchedulerImpl,最後返回SchedulerBackend和TaskScheduler。

如果master與LOCAL_N_FAILURES_REGEX(threads)匹配:

與LOCAL_N_REGEX(threads)類似,只不過增加了最大失敗次數的設定

如果master與SPARK_REGEX(threads)匹配:

採用"spark://",表示standalone模式,進行遠端訪問,建立SchedulerBackend的實現類與之前的不一樣了這裡使用的是StandaloneSchedulerBackend。

如果master與LOCAL_CLUSTER_REGEX(threads)匹配:

這是單機叢集模式,通過“local-cluster[x,y,z]”進行設定,x代表executor數量,y代表executor的core的個數,z代表executor擁有的memory,這個memory值不能大於sparkContext設定的executorMemory的值,否則會報錯。

如果master與masterURL匹配:

這裡就指的是yarn模式,yarn-cluster還是yarn-client模式,都是走的這裡。

設定並啟動事件匯流排

設定並啟動時間匯流排是在SparkContext初始化接近尾聲時進行的,該方法通過配置檔案中key=“spark.extraListeners”獲取所有額外的監聽器的類名,然後通過反射機制,顯示載入這些監聽器,並把每個監聽器載入到事件匯流排中,最後啟動事件匯流排ListenerBus。

釋出環境更新的方法

這個方法張這樣:

根據當前排程模式和增加的依賴、檔案的路徑,制定環境更新詳情,並通過事件匯流排傳送跟新資訊。釋出環境更新的方法,在SparkContext中出現了三次,分別在SparkContext初始化接近尾聲時、增加依賴後(addJar方法)和增加檔案後(addFile方法)執行的,也就是說,系統中的資源和依賴發生變化時,都會通過事件匯流排將資源和依賴的變化告訴各個元件。

釋出應用程式系統的方法

事件匯流排釋出Spark應用程式啟動事件,SparkListenerApplicationStart繼承自SparkListenerEvent,是Spark應用程式啟動時的時間,需要設定應用程式的名稱,ID,開始時間,使用者資訊,當且應用程式嘗試的ID以及驅動程式日誌的url。因為applicationId和applicationAttempId都是通過taskScheduler生成的,因此,postAppliactionStart方法一定是在建立TaskScheduler之後執行的。postAppliactionStart方法確定的執行位置是在SparkContext初始化時postEnvironmentUpdate方法之後執行。啟動匯流排、釋出更新事件、釋出啟動應用程式時間按照順序依次執行。

還有一些元件啟動的詳細過程涉及到對應的元件程式,需要對這些元件進行深入的研究。後面會一點一點研究他們。前面的路還很長,這只是一個開始。