1. 程式人生 > >Spark 核心篇-SparkContext

Spark 核心篇-SparkContext

Spark 核心篇-SparkContext

閱讀目錄

 


本章內容:

回到頂部

1、功能描述

本篇文章就要根據原始碼分析SparkContext所做的一些事情,用過Spark的開發者都知道SparkContext是編寫Spark程式用到的第一個類,足以說明SparkContext的重要性;這裡先摘抄SparkContext原始碼註釋來簡單介紹介紹SparkContext,註釋的第一句話就是說SparkContext為Spark的主要入口點,簡明扼要,如把Spark叢集當作服務端那Spark Driver就是客戶端,SparkContext則是客戶端的核心;如註釋所說 SparkContext用於連線Spark叢集、建立RDD、累加器(accumlator)、廣播變數(broadcast variables),所以說SparkContext為Spark程式的根本都不為過。

SparkContext 是 Spark 中元老級的 API,從0.x.x 版本就已經存在。有過 Spark 使用經驗會感覺 SparkContext 已經太老了,然後 SparkContext 始終跟隨著 Spark 的迭代不斷向前。SparkContext 內部雖然已經發生了很大的變化,有些內部元件已經廢棄,有些元件已經優化,還有一些新的元件不斷加入,不斷煥發的強大的魅力,是 Spark 的靈魂。

1

2

3

4

5

6

7

8

9

10

11

/**

 * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark

 * cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.

 *

 * Only one SparkContext may be active per JVM.  You must `stop()` the active SparkContext before

 * creating a new one.  This limitation may eventually be removed; see SPARK-2243 for more details.

 *

 * @param config a Spark Config object describing the application configuration. Any settings in

 *   this config overrides the default configs as well as system properties.

 */

class SparkContext(config: SparkConf) extends Logging

也就是說SparkContext是Spark的入口,相當於應用程式的main函式。目前在一個JVM程序中可以建立多個SparkContext,但是隻能有一個active級別的。如果你需要建立一個新的SparkContext例項,必須先呼叫stop方法停掉當前active級別的SparkContext例項。

圖1 Spark 架構圖

圖片來自Spark官網,可以看到SparkContext處於DriverProgram核心位置,所有與Cluster、Worker Node互動的操作都需要SparkContext來完成。

圖2 SparkContext 在 Spark 應用程式中的扮演的主要角色

圖3 Driver 上執行的服務元件

 

回到頂部

2、相關元件

名稱

說明

SparkConf

Spark配置類,配置已鍵值對形式儲存,封裝了一個ConcurrentHashMap類例項settings用於儲存Spark的配置資訊。
SparkEnv SparkContext中非常重要的類,它維護著Spark的執行環境,所有的執行緒都可以通過SparkContext訪問到同一個SparkEnv物件。
LiveListenerBus SparkContext 中的事件匯流排,可以接收各種使用方的事件,並且非同步傳遞Spark事件監聽與SparkListeners監聽器的註冊。
SparkUI 為Spark監控Web平臺提供了Spark環境、任務的整個生命週期的監控。
TaskScheduler 為Spark的任務排程器,Spark通過他提交任務並且請求叢集排程任務。因其排程的 Task 由 DAGScheduler 建立,所以 DAGScheduler 是 TaskScheduler 的前置排程。
DAGScheduler 為高階的、基於Stage的排程器, 負責建立 Job,將 DAG 中的 RDD 劃分到不同的 Stage,並將Stage作為Tasksets提交給底層排程器TaskScheduler執行。
HeartbeatReceiver 心跳接收器,所有 Executor 都會向HeartbeatReceiver 傳送心跳,當其接收到 Executor 的心跳資訊後,首先更新 Executor 的最後可見時間,然後將此資訊交給 TaskScheduler 進一步處理。

ExecutorAllocationManager

Executor 動態分配管理器,根據負載動態的分配與刪除Executor,可通過其設定動態分配最小Executor、最大Executor、初始Executor數量等配置。
ContextClearner 上下文清理器,為RDD、shuffle、broadcast狀態的非同步清理器,清理超出應用範圍的RDD、ShuffleDependency、Broadcast物件。
SparkStatusTracker 低級別的狀態報告API,只能提供非常脆弱的一致性機制,對Job(作業)、Stage(階段)的狀態進行監控。
HadoopConfiguration Spark預設使用HDFS來作為分散式檔案系統,用於獲取Hadoop配置資訊。

以上的物件為SparkContext使用到的主要物件,可以看到SparkContext包含了Spark程式用到的幾乎所有核心物件可見SparkContext的重要性;建立SparkContext時會新增一個鉤子到ShutdownHookManager中用於在Spark程式關閉時對上述物件進行清理,在建立RDD等操作也會判斷SparkContext是否已stop;通常情況下一個Driver只會有一個SparkContext例項,但可通過spark.driver.allowMultipleContexts配置來允許driver中存在多個SparkContext例項。

回到頂部

3、程式碼分析

程式碼

說明

SparkContext.markPartiallyConstructed(this, allowMultipleContexts)
用來確保例項的唯一性
try {
  _conf = config.clone()
  _conf.validateSettings()

  if (!_conf.contains("spark.master")) {
    throw new SparkException("A master URL must be set in your configuration")
  }
  if (!_conf.contains("spark.app.name")) {
    throw new SparkException("An application name must be set in your configuration")
  }

  // log out spark.app.name in the Spark driver logs
 logInfo(s"Submitted application: $appName")

  // System property spark.yarn.app.id must be set if user code ran by AM on a YARN cluster
 if (master == "yarn" && deployMode == "cluster" && !_conf.contains("spark.yarn.app.id")) {
    throw new SparkException("Detected yarn cluster mode, but isn't running on a cluster. " +
      "Deployment to YARN is not supported directly by SparkContext. Please use spark-submit.")
  }

  if (_conf.getBoolean("spark.logConf", false)) {
    logInfo("Spark configuration:\n" + _conf.toDebugString)
  }

  // Set Spark driver host and port system properties. This explicitly sets the configuration
 // instead of relying on the default value of the config constant.
 _conf.set(DRIVER_HOST_ADDRESS, _conf.get(DRIVER_HOST_ADDRESS))
  _conf.setIfMissing("spark.driver.port", "0")

  _conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER)

  _jars = Utils.getUserJars(_conf)
  _files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.nonEmpty))
    .toSeq.flatten

  _eventLogDir =
    if (isEventLogEnabled) {
      val unresolvedDir = conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR)
        .stripSuffix("/")
      Some(Utils.resolveURI(unresolvedDir))
    } else {
      None
    }

  _eventLogCodec = {
    val compress = _conf.getBoolean("spark.eventLog.compress", false)
    if (compress && isEventLogEnabled) {
      Some(CompressionCodec.getCodecName(_conf)).map(CompressionCodec.getShortName)
    } else {
      None
    }
  }

  _listenerBus = new LiveListenerBus(_conf)

  // Initialize the app status store and listener before SparkEnv is created so that it gets
 // all events.
 _statusStore = AppStatusStore.createLiveStore(conf)
  listenerBus.addToStatusQueue(_statusStore.listener.get)

  // Create the Spark execution environment (cache, map output tracker, etc)
 _env = createSparkEnv(_conf, isLocal, listenerBus)
  SparkEnv.set(_env)

  // If running the REPL, register the repl's output dir with the file server.
 _conf.getOption("spark.repl.class.outputDir").foreach { path =>
    val replUri = _env.rpcEnv.fileServer.addDirectory("/classes", new File(path))
    _conf.set("spark.repl.class.uri", replUri)
  }

  _statusTracker = new SparkStatusTracker(this, _statusStore)

  _progressBar =
    if (_conf.get(UI_SHOW_CONSOLE_PROGRESS) && !log.isInfoEnabled) {
      Some(new ConsoleProgressBar(this))
    } else {
      None
    }

  _ui =
    if (conf.getBoolean("spark.ui.enabled", true)) {
      Some(SparkUI.create(Some(this), _statusStore, _conf, _env.securityManager, appName, "",
 startTime))
    } else {
      // For tests, do not enable the UI
 None
    }
  // Bind the UI before starting the task scheduler to communicate
 // the bound port to the cluster manager properly
 _ui.foreach(_.bind())

  _hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf)

  // Add each JAR given through the constructor
 if (jars != null) {
    jars.foreach(addJar)
  }

  if (files != null) {
    files.foreach(addFile)
  }

  _executorMemory = _conf.getOption("spark.executor.memory")
    .orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY")))
    .orElse(Option(System.getenv("SPARK_MEM"))
    .map(warnSparkMem))
    .map(Utils.memoryStringToMb)
    .getOrElse(1024)

  // Convert java options to env vars as a work around
 // since we can't set env vars directly in sbt.
 for { (envKey, propKey) <- Seq(("SPARK_TESTING", "spark.testing"))
    value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey)))} {
    executorEnvs(envKey) = value
  }
  Option(System.getenv("SPARK_PREPEND_CLASSES")).foreach { v =>
    executorEnvs("SPARK_PREPEND_CLASSES") = v
  }
  // The Mesos scheduler backend relies on this environment variable to set executor memory.
 // TODO: Set this only in the Mesos scheduler.
 executorEnvs("SPARK_EXECUTOR_MEMORY") = executorMemory + "m"
 executorEnvs ++= _conf.getExecutorEnv
  executorEnvs("SPARK_USER") = sparkUser

 // We need to register "HeartbeatReceiver" before "createTaskScheduler" because Executor will
 // retrieve "HeartbeatReceiver" in the constructor. (SPARK-6640)
 _heartbeatReceiver = env.rpcEnv.setupEndpoint(
    HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))

  // Create and start the scheduler
 val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
  _schedulerBackend = sched
  _taskScheduler = ts
  _dagScheduler = new DAGScheduler(this)
  _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)

  // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
 // constructor
 _taskScheduler.start()

  _applicationId = _taskScheduler.applicationId()
  _applicationAttemptId = taskScheduler.applicationAttemptId()
  _conf.set("spark.app.id", _applicationId)
  if (_conf.getBoolean("spark.ui.reverseProxy", false)) {
    System.setProperty("spark.ui.proxyBase", "/proxy/" + _applicationId)
  }
  _ui.foreach(_.setAppId(_applicationId))
  _env.blockManager.initialize(_applicationId)

  // The metrics system for Driver need to be set spark.app.id to app ID.
 // So it should start after we get app ID from the task scheduler and set spark.app.id.
 _env.metricsSystem.start()
  // Attach the driver metrics servlet handler to the web ui after the metrics system is started.
 _env.metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))

  _eventLogger =
    if (isEventLogEnabled) {
      val logger =
        new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get,
 _conf, _hadoopConfiguration)
      logger.start()
      listenerBus.addToEventLogQueue(logger)
      Some(logger)
    } else {
      None
    }

  // Optionally scale number of executors dynamically based on workload. Exposed for testing.
 val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
  _executorAllocationManager =
    if (dynamicAllocationEnabled) {
      schedulerBackend match {
        case b: ExecutorAllocationClient =>
          Some(new ExecutorAllocationManager(
            schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf,
 _env.blockManager.master))
        case _ =>
          None
      }
    } else {
      None
    }
  _executorAllocationManager.foreach(_.start())

  _cleaner =
    if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) {
      Some(new ContextCleaner(this))
    } else {
      None
    }
  _cleaner.foreach(_.start())

  setupAndStartListenerBus()
  postEnvironmentUpdate()
  postApplicationStart()

  // Post init
 _taskScheduler.postStartHook()
  _env.metricsSystem.registerSource(_dagScheduler.metricsSource)
  _env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))
  _executorAllocationManager.foreach { e =>
    _env.metricsSystem.registerSource(e.executorAllocationManagerSource)
  }

  // Make sure the context is stopped if the user forgets about it. This avoids leaving
 // unfinished event logs around after the JVM exits cleanly. It doesn't help if the JVM
 // is killed, though.
 logDebug("Adding shutdown hook") // force eager creation of logger
 _shutdownHookRef = ShutdownHookManager.addShutdownHook(
    ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () =>
    logInfo("Invoking stop() from shutdown hook")
    stop()
  }
} catch {
  case NonFatal(e) =>
    logError("Error initializing SparkContext.", e)
    try {
      stop()
    } catch {
      case NonFatal(inner) =>
        logError("Error stopping SparkContext after init error.", inner)
    } finally {
      throw e
    }
}
  1. 配置校驗並設定Spark Driver 的 Host 和 Port
  2. 初始化事件日誌目錄和壓縮型別
  3. 初始化App狀態儲存以及事件LiveListenerBus
  4. 建立Spark的執行環境SparkEnv
  5. 初始化狀態跟蹤器SparkStatusTracker
  6. 根據配置建立ConsoleProgressBar
  7. 建立並初始化Spark UI
  8. Hadoop相關配置及Executor環境變數的設定
  9. 註冊HeartbeatReceiver心跳接收器
  10. 建立TaskScheduler
  11. 建立DAGScheduler
  12. 啟動TaskScheduler
  13. 初始化塊管理器BlockManager
  14. 啟動測量系統MetricsSystem
  15. 建立事件日誌監聽器
  16. 建立和啟動Executor分配管ExecutorAllocationManager
  17. 建立和啟動ContextCleaner
  18. 額外的 SparkListenser 與啟動事件匯流排(setupAndStartListenerBus)
  19. Spark環境更新(postEnvironmentUpdate)
  20. 投遞應用程式啟動事件(postApplicationStart)
  21. 建立DAGSchedulerSource、BlockManagerSource和ExecutorAllocationManagerSource

 

 

 

 

 

SparkContext.setActiveContext(this, allowMultipleContexts)
將SparkContext標記為啟用
 

3.1 初始設定

首先儲存了當前的CallSite資訊,並且判斷是否允許建立多個SparkContext例項,使用的是spark.driver.allowMultipleContexts屬性,預設為false。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

// 包名:org.apache.spark

// 類名:SparkContext

class SparkContext(config: SparkConf) extends Logging {

  

  // The call site where this SparkContext was constructed.

  // 獲取當前SparkContext的當前呼叫棧。包含了最靠近棧頂的使用者類及最靠近棧底的Scala或者Spark核心類資訊

  private val creationSite: CallSite = Utils.getCallSite()

  

  // If true, log warnings instead of throwing exceptions when multiple SparkContexts are active

  // SparkContext預設只有一個例項。如果在config(SparkConf)中設定了allowMultipleContexts為true,

  // 當存在多個active級別的SparkContext例項時Spark會發生警告,而不是丟擲異常,要特別注意。

  // 如果沒有配置,則預設為false

  private val allowMultipleContexts: Boolean =

    config.getBoolean("spark.driver.allowMultipleContexts"false)

  

  // In order to prevent multiple SparkContexts from being active at the same time, mark this

  // context as having started construction.

  // NOTE: this must be placed at the beginning of the SparkContext constructor.

  // 用來確保SparkContext例項的唯一性,並將當前的SparkContext標記為正在構建中,以防止多個SparkContext例項同時成為active級別的。

  SparkContext.markPartiallyConstructed(this, allowMultipleContexts)

  

  ......

  

}

接下來是對SparkConf進行復制,然後對各種配置資訊進行校驗,其中最主要的就是SparkConf必須指定 spark.master(用於設定部署模式)和 spark.app.name(應用程式名稱)屬性,否則會丟擲異常。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

// 變數宣告

// 包名:org.apache.spark

// 類名:SparkContext

private var _conf: SparkConf = _

  

  

// 變數處理

// 包名:org.apache.spark

// 類名:SparkContext

_conf = config.clone()

_conf.validateSettings()

  

if (!_conf.contains("spark.master")) {

  throw new SparkException("A master URL must be set in your configuration")

}

if (!_conf.contains("spark.app.name")) {

  throw new SparkException("An application name must be set in your configuration")

}

3.2 建立執行環境 SparkEnv

SparkEnv是Spark的執行環境物件,其中包括與眾多Executor指向相關的物件。在local模式下Driver會建立Executor,local-cluster部署模式或者Standalone部署模式下Worker另起的CoarseGrainedExecutorBackend程序中也會建立Executor,所以SparkEnv存在於Driver或者CoarseGrainedExecutorBackend程序中。

建立SparkEnv主要使用SparkEnv的createDriverEnv方法,有四個引數:conf、isLocal、listenerBus 以及在本地模式下driver執行executor需要的numberCores。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

// 變數宣告

// 包名:org.apache.spark

// 類名:SparkContext

private var _env: SparkEnv = _

  

  

// 變數處理

// 包名:org.apache.spark

// 類名:SparkContext

def isLocal: Boolean = Utils.isLocalMaster(_conf)

private[spark] def listenerBus: LiveListenerBus = _listenerBus

  

// Create the Spark execution environment (cache, map output tracker, etc)

_env = createSparkEnv(_conf, isLocal, listenerBus)

SparkEnv.set(_env)

  

private[spark] def createSparkEnv(

    conf: SparkConf,

    isLocal: Boolean,

    listenerBus: LiveListenerBus): SparkEnv = {

  SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master))

}

  

/**

 * 獲取在本地模式下執行程式需要的cores個數,否則不需要,為0

 * The number of driver cores to use for execution in local mode, 0 otherwise.

 */

private[spark] def numDriverCores(master: String): Int = {

  def convertToInt(threads: String): Int = {

    if (threads == "*") Runtime.getRuntime.availableProcessors() else threads.toInt

  }

  master match {

    case "local" =1

    case SparkMasterRegex.LOCAL_N_REGEX(threads) => convertToInt(threads)

    case SparkMasterRegex.LOCAL_N_FAILURES_REGEX(threads, _=> convertToInt(threads)

    case _ =0 // driver is not used for execution

  }

}

3.3 建立 SparkUI

SparkUI 提供了用瀏覽器訪問具有樣式及佈局並且提供豐富監控資料的頁面。其採用的是時間監聽機制。傳送的事件會存入快取,由定時排程器取出後分配給監聽此事件的監聽器對監控資料進行更新。如果不需要SparkUI,則可以將spark.ui.enabled置為false。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

// 變數宣告

// 包名:org.apache.spark

// 類名:SparkContext

private var _ui: Option[SparkUI] = None

  

  

// 變數處理

// 包名:org.apache.spark

// 類名:SparkContext

_ui =

  if (conf.getBoolean("spark.ui.enabled"true)) {

    Some(SparkUI.create(Some(this), _statusStore, _conf, _env.securityManager, appName, "",

      startTime))

  else {

    // For tests, do not enable the UI

    None

  }

// Bind the UI before starting the task scheduler to communicate

// the bound port to the cluster manager properly

_ui.foreach(_.bind())

3.4 Hadoop 相關配置

預設情況下,Spark使用HDFS作為分散式檔案系統,所以需要獲取Hadoop相關的配置資訊:

1

2

3

4

5

6

7

8

9

10

// 變數宣告

// 包名:org.apache.spark

// 類名:SparkContext

private var _hadoopConfiguration: Configuration = _

  

  

// 變數處理

// 包名:org.apache.spark

// 類名:SparkContext

_hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf)

獲取的配置資訊包括:

  • 將Amazon S3檔案系統的AWS_ACCESS_KEY_ID和 AWS_SECRET_ACCESS_KEY載入到Hadoop的Configuration;
  • 將SparkConf中所有的以spark.hadoop.開頭的屬性都賦值到Hadoop的Configuration;
  • 將SparkConf的屬性spark.buffer.size複製到Hadoop的Configuration的配置io.file.buffer.size。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

// 第一步

// 包名:org.apache.spark

// 類名:SparkContext

def get: SparkHadoopUtil = instance

  

// 第二步

// 包名:org.apache.spark

// 類名:SparkContext

/**

 * Return an appropriate (subclass) of Configuration. Creating config can initializes some Hadoop

 * subsystems.

 */

def newConfiguration(conf: SparkConf): Configuration = {

  val hadoopConf = SparkHadoopUtil.newConfiguration(conf)

  hadoopConf.addResource(SparkHadoopUtil.SPARK_HADOOP_CONF_FILE)

  hadoopConf

}

  

// 第三步

// 包名:org.apache.spark.deploy

// 類名:SparkHadoopUtil

/**

 * Returns a Configuration object with Spark configuration applied on top. Unlike

 * the instance method, this will always return a Configuration instance, and not a

 * cluster manager-specific type.

 */

private[spark] def newConfiguration(conf: SparkConf): Configuration = {

  val hadoopConf = new Configuration()

  appendS3AndSparkHadoopConfigurations(conf, hadoopConf)

  hadoopConf

}

  

// 第四步

// 包名:org.apache.spark.deploy

// 類名:SparkHadoopUtil

private def appendS3AndSparkHadoopConfigurations(

    conf: SparkConf,

    hadoopConf: Configuration): Unit = {

  // Note: this null check is around more than just access to the "conf" object to maintain

  // the behavior of the old implementation of this code, for backwards compatibility.

  if (conf != null) {

    // Explicitly check for S3 environment variables

    val keyId = System.getenv("AWS_ACCESS_KEY_ID")

    val accessKey = System.getenv("AWS_SECRET_ACCESS_KEY")

    if (keyId != null && accessKey != null) {

      hadoopConf.set("fs.s3.awsAccessKeyId", keyId)

      hadoopConf.set("fs.s3n.awsAccessKeyId", keyId)

      hadoopConf.set("fs.s3a.access.key", keyId)

      hadoopConf.set("fs.s3.awsSecretAccessKey", accessKey)

      hadoopConf.set("fs.s3n.awsSecretAccessKey", accessKey)

      hadoopConf.set("fs.s3a.secret.key", accessKey)

  

      val sessionToken = System.getenv("AWS_SESSION_TOKEN")

      if (sessionToken != null) {

        hadoopConf.set("fs.s3a.session.token", sessionToken)

      }

    }

    appendSparkHadoopConfigs(conf, hadoopConf)

    val bufferSize = conf.get("spark.buffer.size""65536")

    hadoopConf.set("io.file.buffer.size", bufferSize)

  }

}

  

// 第五步

// 包名:org.apache.spark.deploy

// 類名:SparkHadoopUtil

private def appendSparkHadoopConfigs(conf: SparkConf, hadoopConf: Configuration): Unit = {

  // Copy any "spark.hadoop.foo=bar" spark properties into conf as "foo=bar"

  for ((key, value) <- conf.getAll if key.startsWith("spark.hadoop.")) {

    hadoopConf.set(key.substring("spark.hadoop.".length), value)

  }

}

3.5 Executor 環境變數

executorEnvs包含的環境變數將會註冊應用程式的過程中傳送給Master,Master給Worker傳送排程後,Worker最終使用executorEnvs提供的資訊啟動Executor。 
通過配置spark.executor.memory指定Executor佔用的記憶體的大小,也可以配置系統變數SPARK_EXECUTOR_MEMORY或者SPARK_MEM設定其大小。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

// 變數宣告

// 包名:org.apache.spark

// 類名:SparkContext

private var _executorMemory: Int = _

  

// 變數處理

// 包名:org.apache.spark

// 類名:SparkContext

_executorMemory = _conf.getOption("spark.executor.memory")

  .orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY")))

  .orElse(Option(System.getenv("SPARK_MEM"))

  .map(warnSparkMem))

  .map(Utils.memoryStringToMb)

  .getOrElse(1024)

executorEnvs是由一個HashMap儲存:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

// 變數宣告

// 包名:org.apache.spark

// 類名:SparkContext

// Environment variables to pass to our executors.

private[spark] val executorEnvs = HashMap[String, String]()

  

// 變數處理

// 包名:org.apache.spark

// 類名:SparkContext

// Convert java options to env vars as a work around

// since we can't set env vars directly in sbt.

for { (envKey, propKey) <- Seq(("SPARK_TESTING""spark.testing"))

  value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey)))} {

  executorEnvs(envKey) = value

}

Option(System.getenv("SPARK_PREPEND_CLASSES")).foreach { v =>

  executorEnvs("SPARK_PREPEND_CLASSES"= v

}

// The Mesos scheduler backend relies on this environment variable to set executor memory.

// TODO: Set this only in the Mesos scheduler.

executorEnvs("SPARK_EXECUTOR_MEMORY"= executorMemory + "m"

executorEnvs ++= _conf.getExecutorEnv

executorEnvs("SPARK_USER"= sparkUser

3.6 註冊 HeartbeatReceiver 心跳接收器

在 Spark 的實際生產環境中,Executor 是執行在不同的節點上的。在 local 模式下的 Driver 與 Executor 屬於同一個程序,所以 Dirver 與 Executor 可以直接使用本地呼叫互動,當 Executor 執行出現問題時,Driver 可以很方便地知道,例如,通過捕獲異常。但是在生產環境下,Driver 與 Executor 很可能不在同一個程序內,他們也許執行在不同的機器上,甚至在不同的機房裡,因此 Driver 對 Executor 失去掌握。為了能夠掌控 Executor,在 Driver 中建立了這個心跳接收器。

建立 HeartbearReceiver 的程式碼:

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

// 變數宣告

// 包名:org.apache.spark

// 類名:SparkContext

// HeartbeatReceiver.ENDPOINT_NAME val ENDPOINT_NAME = "HeartbeatReceiver"

// We need to register "HeartbeatReceiver" before "createTaskScheduler" because Executor will

// retrieve "HeartbeatReceiver" in the constructor. (SPARK-6640)

_heartbeatReceiver = env.rpcEnv.setupEndpoint(

  HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))

  

// 變數處理

// 包名:org.apache.spark.rpc.netty

// 類名:NettyRpcEnv

override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = {

  dispatcher.registerRpcEndpoint(name, endpoint)

}

上面的程式碼中使用了 SparkEnv 的子元件 NettyRpcEnv 的 setupEndpoint 方法,此方法的作用是想 RpcEnv 的 Dispatcher 註冊 HeartbeatReceiver,並返回 HeartbeatReceiver 的 NettyRpcEndpointRef 引用。

3.7 建立任務排程器 TaskScheduler

TaskScheduler也是SparkContext的重要組成部分,負責任務的提交,請求叢集管理器對任務排程,並且負責傳送的任務到叢集,執行它們,任務失敗的重試,以及慢任務的在其他節點上重試。 其中給應用程式分配並執行 Executor為一級排程,而給任務分配 Executor 並執行任務則為二級排程。另外 TaskScheduler 也可以看做任務排程的客戶端。

  • 為 TaskSet建立和維護一個TaskSetManager並追蹤任務的本地性以及錯誤資訊;
  • 遇到Straggle 任務會方到其他的節點進行重試;
  • 向DAGScheduler彙報執行情況, 包括在Shuffle輸出lost的時候報告fetch failed 錯誤等資訊;

TaskScheduler負責任務排程資源分配,SchedulerBackend負責與Master、Worker通訊收集Worker上分配給該應用使用的資源情況。

圖4 SparkContext 建立 Task Scheduler 和 Scheduler Backend

建立 TaskScheduler 的程式碼:

1

2

3

4

5

6

7

8

9

10

11

12

13

// 變數宣告

// 包名:org.apache.spark

// 類名:SparkContext

private var _schedulerBackend: SchedulerBackend = _

private var _taskScheduler: TaskScheduler = _

  

// 變數處理

// 包名:org.apache.spark

// 類名:SparkContext

// Create and start the scheduler

val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)

_schedulerBackend = sched

_taskScheduler = ts

createTaskScheduler方法根據master的配置匹配部署模式,建立TaskSchedulerImpl,並生成不同的SchedulerBackend。

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

39

40

41

42

43

44

45

46

47

48

49

50

51

52

53

54

55

56

57

58

59

60

61

62

63

64

65

66

67

68

69

70

71

72

73

74

75

76

77

78

79

80

81

82

83

84

85

86

87

88

// 包名:org.apache.spark

// 類名:SparkContext

/**

 * Create a task scheduler based on a given master URL.

 * Return a 2-tuple of the scheduler backend and the task scheduler.

 */

private def createTaskScheduler(

    sc: SparkContext,

    master: String,

    deployMode: String): (SchedulerBackend, TaskScheduler) = {

  import SparkMasterRegex._

  

  // When running locally, don't try to re-execute tasks on failure.

  val MAX_LOCAL_TASK_FAILURES = 1

  

  master match {

    case "local" =>

      val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)

      val backend = new LocalSchedulerBackend(sc.getConf, scheduler, 1)

      scheduler.initialize(backend)

      (backend, scheduler)

  

    case LOCAL_N_REGEX(threads) =>

      def localCpuCount: Int = Runtime.getRuntime.availableProcessors()

      // local[*] estimates the number of cores on the machine; local[N] uses exactly N threads.

      val threadCount = if (threads == "*") localCpuCount else threads.toInt

      if (threadCount <= 0) {

        throw new SparkException(s"Asked to run locally with $threadCount threads")

      }

      val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)

      val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)

      scheduler.initialize(backend)

      (backend, scheduler)

  

    case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>

      def localCpuCount: Int = Runtime.getRuntime.availableProcessors()

      // local[*, M] means the number of cores on the computer with M failures

      // local[N, M] means exactly N threads with M failures

      val threadCount = if (threads == "*") localCpuCount else threads.toInt

      val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true)

      val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)

      scheduler.initialize(backend)

      (backend, scheduler)

  

    case SPARK_REGEX(sparkUrl) =>

      val scheduler = new TaskSchedulerImpl(sc)

      val masterUrls = sparkUrl.split(",").map("spark://" _)

      val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)

      scheduler.initialize(backend)

      (backend, scheduler)

  

    case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>

      // Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.

      val memoryPerSlaveInt = memoryPerSlave.toInt

      if (sc.executorMemory > memoryPerSlaveInt) {

        throw new SparkException(

          "Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker".format(

            memoryPerSlaveInt, sc.executorMemory))

      }

  

      val scheduler = new TaskSchedulerImpl(sc)

      val localCluster = new LocalSparkCluster(

        numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt, sc.conf)

      val masterUrls = localCluster.start()

      val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)

      scheduler.initialize(backend)

      backend.shutdownCallback = (backend: StandaloneSchedulerBackend) => {

        localCluster.stop()

      }

      (backend, scheduler)

  

    case masterUrl =>

      val cm = getClusterManager(masterUrl) match {

        case Some(clusterMgr) => clusterMgr

        case None =throw new SparkException("Could not parse Master URL: '" + master + "'")

      }

      try {

        val scheduler = cm.createTaskScheduler(sc, masterUrl)

        val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)

        cm.initialize(scheduler, backend)

        (backend, scheduler)

      catch {

        case se: SparkException =throw se