1. 程式人生 > >【Spark核心原始碼】SparkContext中的元件和初始化

【Spark核心原始碼】SparkContext中的元件和初始化

目錄

SparkContext概述

SparkContext元件概述

SparkContext初始化過程

第一步:確保當前執行緒中沒有SparkContext在執行

第二步:版本反饋

第三步:真正的初始化

第四步:確認啟動成功


SparkContext概述

【Spark核心原始碼】Word Count程式的簡單分析 當中使用Spark Shell編寫了簡單的Word Count程式,在進入Spark Shell的時候,Spark Shell為我們自動建立了SparkContext sc。

Spark應用程式的第一步就是建立並初始化SparkContext,SparkContext的初始化過程包含了內部元件的建立和準備,主要涉及網路通訊、分散式、訊息、儲存、計算、排程、快取、度量、清理、檔案服務和UI

等方面。它是Spark主要功能的入口點,連結Spark叢集,建立RDD、累加器和廣播變數,一個執行緒只能執行一個SparkContext。SparkContext在應用程式中將外部資料轉換成RDD,因此建立了第一個RDD,也就是說SparkContext建立了RDD血緣關係的根,是DAG的根源。

SparkContext元件概述

SparkContext的元件如下圖所示:

各元件的說明如下:

listenerBus:Spark事件匯流排,接收各個使用方的事件,以非同步的方式對事件進行匹配和處理。

_ui:Spark使用者介面,間接依賴於計算引擎、排程系統、儲存體系、作業、階段、儲存、執行器等元件的監控資料,以SparkListenerEnvent的形式投遞給LiveListener,SparkUI從SparkListener中讀取資料。

_taskScheduler:任務排程器,排程系統中最重要的元件之一,按照排程演算法對叢集管理器已經分配給應用程式的資源進行二次排程後分配任務,TaskScheduler排程的task是DAGScheduler建立的,因此DAGScheduler是TaskScheduler的前置排程器。

_statusTracker:狀態跟蹤器,提供對作業、stage等的監控資訊。

_shutdownHookRef:任務退出時執行清理任務。

_schedulerBackend:用於對接不同的資源管理系統。

_progressBar:進度條,利用SparkStatusTracker的API,在控制檯展示Stage的進度。

_jobProgressListener:作業進度監聽器,會註冊到LiveListenerBus中作為時間監聽器使用。

_cleaner:上下文清理器,用非同步方式清理超出應用程式範圍的RDD、ShuffleDependency和BroadCast。

_conf:Spark配置,以k-v的形式儲存著Spark應用程式的配置資訊。

_dagScheduler:DAG排程器,排程系統中最重要的元件之一,負責建立job,將DAG的RDD劃分為不同的stage,提交stage。

_env:Spark執行時環境。

_eventLogger:將事件日誌的監聽器,Spark可選元件,spark.eventLog.enabled=true時啟動。

_executorAllocationManager:Executor動態分配管理器,根據工作負載動態調整Executor的數量,在spark.dynamicAllocation.enabled=true的前提下,和非local模式下或者spark.dynamicAllocation.testing=true時啟動。

_hadoopConfiguration:hadoop的配置資訊,如果使用的是系統SPARK_YARN_MODE=true或者環境變數SPARK_YARN_MODE=true時,啟用yarn配置,否則啟用hadoop配置。

_heartbeatReceiver:心跳接收器,Executor都會向heartbeatReceiver傳送心跳資訊,heartbeatReceiver接收到資訊後,更新executor最後的可見時間,然後傳遞給taskScheduler做進一步處理。

SparkContext初始化過程

在將SparkContext初始化過程之前,需要先了解SparkContext伴生物件中的兩個變數,它們分別是activeContext: AtomicReference[SparkContext]和contextBeingConstructed: Option[SparkContext]。

activeContext: AtomicReference[SparkContext]記錄了當前SparkContext是否處於活躍狀態,當活躍的時候activeContext的value就是當前SparkContext,否則value就是null。

contextBeingConstructed: Option[SparkContext]則是SparkContext正在啟動時的一個標識,SparkContext初始化時有很多元件需要進行初始化設定,需要消耗一些時間,同時又要保證一個執行緒中只執行一個SparkContext,通過設定SparkContext啟動時的表示,來保證一個執行緒中只執行一個SparkContext,當SparkContext正在啟動時,contextBeingConstructed=Some(sc),否則contextBeingConstructed=None。

下面來看看SparkContext啟動的具體步驟,

第一步:確保當前執行緒中沒有SparkContext在執行

markPartiallyConstructed方法中當assertNoOtherContextIsRunning方法執行通過之後,設定contextBeingConstructed = Some(sc),表示當前執行緒中正在建立SparkContext,如果這個執行緒中再建立SparkContext,就要出問題了,assertNoOtherContextIsRunning就是做這個檢驗用的。

assertNoOtherContextIsRunning方法中確保一個執行緒中吃執行一個SparkContext,如果檢測到右其他的SparkContext在執行,就丟擲異常。如果其他執行緒在構建SparkContext,就提出一個警告,這個警告是為了當SparkContext構建過程中出現錯誤,可以很清楚的區分開是哪個執行緒的SparkContext出的錯誤。

第二步:版本反饋

輸出Spark版本,檢驗java和scala版本,spark 2.1.0需要使用java7和scala2.10會有警告

第三步:真正的初始化

初始化過程分為十幾個步驟:

  /**
    * SparkConext初始化第三步:
    * 通過克隆的方式獲取sparkconf,在sparkContext初始化的過程中做了以下幾件事:
    * 1、會對conf中的配置資訊進行校驗(部署模式、appName、yarn模式校驗等等)
    * 2、處理或設定引數:
    *   2.1、driver的IP、埠號和ID
    *   2.2、處理jar路徑和檔案路徑
    *   2.3、事件日誌路徑、是否壓縮事件
    *   2.4、設定是否啟動yarn配置的屬性
    * 3、初始化並啟動一些元件(以下按照建立順序):
    *   3.1、建立任務進度監聽器,並增加到事件匯流排中
    *   3.2、建立spark執行環境
    *   3.3、建立狀態跟蹤器
    *   3.4、建立進度條
    *   3.5、建立Spark UI
    *   3.6、建立hadoop的配置資訊(SPARK_YARN_MODE=true時,採用yarn配置資訊)
    *   3.7、載入jar和file
    *   3.8、配置Executor執行環境
    *   3.9、建立心跳接收器,在建立taskScheduler之前建立,因為Executor需要再建構函式中檢索heartbeatReceiver
    *   3.10、建立schedulerBackend和taskScheduler
    *   3.11、建立dagScheduler,向dagScheduler引入了taskScheduler
    *   3.12、根據taskScheduler生成的_applicationId啟動度量系統,並且將監控資訊傳送給SparkUI進行展示
    *   3.13、建立事件日誌監聽器,並增加到匯流排中
    *   3.14、建立並啟動Executor動態分配管理器
    *   3.15、建立並啟動上下文清理器
    *   3.16、設定並啟動事件匯流排
    *   3.17、釋出環境更新事件
    *   3.18、釋出應用程式啟動事件
    *   3.19、taskScheduler需要等待SchedulerBackend
    *   3.20、將dagScheduler、BlockManagerSource和ExecutorAllocationManager註冊到度量系統中
    * */

1、通過克隆的方式獲取sparkconf,會對conf中的配置資訊進行校驗(部署模式、appName、yarn模式校驗等等)

2、處理或設定引數:
2.1、driver的IP、埠號和ID


2.2、處理jar路徑和檔案路徑


2.3、事件日誌路徑、是否壓縮事件


2.4、設定是否啟動yarn配置的屬性


3、初始化並啟動一些元件(以下按照建立順序):
3.1、建立任務進度監聽器,並增加到事件匯流排中


3.2、建立spark執行環境


3.3、建立狀態跟蹤器


3.4、建立進度條


3.5、建立Spark UI


3.6、建立hadoop的配置資訊(SPARK_YARN_MODE=true時,採用yarn配置資訊)


3.7、載入jar和file


3.8、配置Executor執行環境


3.9、建立心跳接收器,在建立taskScheduler之前建立,因為Executor需要再建構函式中檢索heartbeatReceiver


3.10、建立schedulerBackend和taskScheduler


3.11、建立dagScheduler,向dagScheduler引入了taskScheduler


3.12、啟動taskScheduler,並根據taskScheduler生成的_applicationId啟動度量系統,並且將監控資訊傳送給SparkUI進行展示


3.13、建立事件日誌監聽器,並增加到匯流排中


3.14、建立並啟動Executor動態分配管理器


3.15、建立並啟動上下文清理器


3.16、設定並啟動事件匯流排


3.17、釋出環境更新事件


3.18、釋出應用程式啟動事件


3.19、taskScheduler需要等待SchedulerBackend


3.20、將dagScheduler、BlockManagerSource和ExecutorAllocationManager註冊到度量系統中

第四步:確認啟動成功

設定activeContext的狀態

這一步的處理邏輯和第一步類似,都是先呼叫assertNoOtherContextIsRunning方法,確保當前執行緒中沒有其他SparkContext在建立,然後將contextBeingConstructed設定為None和activeContext的value設定為當前SparkContext

這裡吐槽一句:老外寫的程式碼真的很奇葩,初始化的程式碼真的是翻山越嶺才構造完,第一步在90多行,第二步在200多行,第三步在500多行,第四步在2400多行,這每一步中間穿插著很多的成員變數、方法,就不能寫在一塊嗎?