1. 程式人生 > >Spark:Executor原理剖析與原始碼分析

Spark:Executor原理剖析與原始碼分析

Executor原理示意圖
在這裡插入圖片描述

Executor程序的啟動

worker中為application啟動的executor,實際上是啟動的這個CoarseGrainedExecutorBackend程序.

原始碼分析:
第一步:CoarseGrainedExecutorBackend原始碼
原始碼地址:org.apache.spark.executor.CoarseGrainedExecutorBackend.scala

/**
 * work中為application啟動的executor,實際上是啟動了CoarseGrainedExecutorBackend程序
 */
private[spark] class CoarseGrainedExecutorBackend(
  override val rpcEnv: RpcEnv,
  driverUrl: String,
  executorId: String,
  hostname: String,
  cores: Int,
  userClassPath: Seq[URL],
  env: SparkEnv)
  extends ThreadSafeRpcEndpoint with ExecutorBackend with Logging {

  private[this] val stopping = new AtomicBoolean(false)
  var executor: Executor = null
  @volatile var driver: Option[RpcEndpointRef] = None

  // If this CoarseGrainedExecutorBackend is changed to support multiple threads, then this may need
  // to be changed so that we don't share the serializer instance across threads
  private[this] val ser: SerializerInstance = env.closureSerializer.newInstance()

  /**
   * 初始化方法
   * 相當於是向driver 傳送RegisterExecutor
   */
  override def onStart() {
    logInfo("Connecting to driver: " + driverUrl)
    rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
      // This is a very fast action so we can use "ThreadUtils.sameThread"
      //獲取driver的actor
      driver = Some(ref)
      //向driver傳送RegisterExecutor資訊
      ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls))
    }(ThreadUtils.sameThread).onComplete {
      // This is a very fast action so we can use "ThreadUtils.sameThread"
      case Success(msg) =>
      // Always receive `true`. Just ignore it
      case Failure(e) =>
        exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)
    }(ThreadUtils.sameThread)
  }

  def extractLogUrls: Map[String, String] = {
    val prefix = "SPARK_LOG_URL_"
    sys.env.filterKeys(_.startsWith(prefix))
      .map(e => (e._1.substring(prefix.length).toLowerCase(Locale.ROOT), e._2))
  }

  override def receive: PartialFunction[Any, Unit] = {
    /**
     * 當driver註冊好executor之後 ,返回RegisteredExecutor訊息
     * 此時CoarseGrainedExecutorBackend會建立Executor執行控制代碼,大部分的功能都是通過Executor實現的
     */
    case RegisteredExecutor =>
      logInfo("Successfully registered with driver")
      try {
        executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
      } catch {
        case NonFatal(e) =>
          exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
      }

......

}

啟動task

原始碼分析
第一步:使用executor控制代碼的launchTask()方法,啟動task

    // 啟動task
    case LaunchTask(data) =>
      if (executor == null) {
        exitExecutor(1, "Received LaunchTask command but executor was null")
      } else {
       //反序列化
        val taskDesc = TaskDescription.decode(data.value)
        logInfo("Got assigned task " + taskDesc.taskId)
        //使用executor控制代碼的launchTask()方法,啟動task
        executor.launchTask(this, taskDesc)
      }

第二部:launchTask()方法

  def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {
    // 對於每一個task都需要建立一個taskRunner 【執行緒】
    // TaskRunner實際上是繼承Java的Runnable介面
    val tr = new TaskRunner(context, taskDescription)
    // 將TaskRunner放入記憶體快取中,runningTasks維護執行任務列表。
    runningTasks.put(taskDescription.taskId, tr)
    //將task封裝在一個執行緒中(TaskRunner),將執行緒丟入執行緒池中,然後執行
   // 執行緒池是實現排隊機制的,如果執行緒池內的執行緒暫時沒有空閒,放入的執行緒就會排隊
    threadPool.execute(tr)
  }