1. 程式人生 > >spark原始碼分析之worker原理篇

spark原始碼分析之worker原理篇

這裡寫圖片描述
解釋:
1、master要求worker啟動driver和executor
2、worker啟動driver的一個基本的原理,worker會啟動一個執行緒DriverRunner,然後DriverRunner會去負責啟動driver程序,然後在之後對driver程序進行管理
3、worker啟動executor的一個基本的原理,worker會啟動一個執行緒ExecutorRunner,然後ExecutorRunner會去負責啟動executor程序,然後在之後對executor程序進行管理
4、driver首先建立driver的工作目錄,封裝啟動driver的命令,用ProcessBuilder啟動Driver
5、executor首先建立executor的工作目錄,封裝啟動executor的命令,用ProcessBuilder啟動executor,executor找到對應的driver,去反向註冊自己,然後就可以啟動executor

schedule方法:
原始碼位置:org/apache/spark/deploy/master/Master.scala

/**
 * 資源排程演算法
 */
private def schedule(): Unit = {
  // 首先判斷master的狀態不是alive的話直接返回
  // 也就是說,standby master是不會進行application等資源的排程的
  if (state != RecoveryState.ALIVE) { return }

  // Drivers take strict precedence over executors
  // random shuffle的原理是對傳入的集合的元素進行隨機的打亂
// 這裡是對取出workers中所有註冊上來上的worker,進行過濾,必須是狀態為alive的worker // 對狀態為alive的worker,呼叫rondom的shuffle方法進行隨機的打亂 // 意思就是從ArrayBuffer的最後一個元素開始到第三個元素,對於每個元素,都會取出該範圍內的隨機數, // 比如說buf.length為10,然後next會取0到10的一個隨機數,然後就會把buf隨機的一個位置和該數字進行交換 val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers
// 遍歷活著的worker for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) { // 遍歷等待的driver,只有yarn-cluster和standalone的cluster模式提交的時候,才會註冊driver,其他方式都是在 // 本地啟動driver,而不是來註冊driver,更不可能讓master來排程driver for (driver <- waitingDrivers) { // 判斷當前的worker的空閒記憶體量大於等於driver需要的記憶體量和判斷worker的空閒cpu數大於等於driver需要的cpu數量 if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) { // 啟動driver launchDriver(worker, driver) // 將driver從等待的driver的佇列中刪除 waitingDrivers -= driver } } } // 在workers上啟動和排程executor startExecutorsOnWorkers() }

master啟動並管理driver程序原始碼分析:
第一步:呼叫schedule方法的launchDriver方法
原始碼位置:org/apache/spark/deploy/master/Master.scala

/**
 * 在某一個worker上啟動driver
 */
private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
  logInfo("Launching driver " + driver.id + " on worker " + worker.id)
  // 將driver加入到worker內部的緩衝結構中
  // 將worker中使用的記憶體和cpu的數量,都加上driver需要的記憶體和cpu的數量
  worker.addDriver(driver)
  // 將worker加入到driver的記憶體緩衝結構中
  driver.worker = Some(worker)
  // 呼叫worker的actor,給worker傳送註冊driver的資訊
  worker.endpoint.send(LaunchDriver(driver.id, driver.desc))
  driver.state = DriverState.RUNNING
}

第二步:呼叫worker的launchDriver方法
原始碼位置:org/apache/spark/deploy/worker/Worker.scala

// 啟動driver
case LaunchDriver(driverId, driverDesc) => {
  logInfo(s"Asked to launch driver $driverId")
  val driver = new DriverRunner(
    conf,
    driverId,
    workDir,
    sparkHome,
    driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),
    self,
    workerUri,
    securityMgr)
  drivers(driverId) = driver
  driver.start()

  coresUsed += driverDesc.cores
  memoryUsed += driverDesc.mem
}

第三步:呼叫第二步的DriverRunner方法
原始碼位置:org/apache/spark/deploy/worker/DriverRunner.scala

/** Starts a thread to run and manage the driver. */
private[worker] def start() = {
  /**
   * 建立執行緒
   */
  new Thread("DriverRunner for " + driverId) {
    override def run() {
      try {
        // 第一步:建立driver的工作目錄
        val driverDir = createWorkingDirectory()

        // 第二步:下載使用者上傳的jar
        val localJarFilename = downloadUserJar(driverDir)

        def substituteVariables(argument: String): String = argument match {
          case "{{WORKER_URL}}" => workerUrl
          case "{{USER_JAR}}" => localJarFilename
          case other => other
        }

        // TODO: If we add ability to submit multiple jars they should also be added here
        // 構建ProcessBuilder
        // 傳入了driver的啟動命令、需要的記憶體大小等資訊
        val builder = CommandUtils.buildProcessBuilder(driverDesc.command, securityManager,
          driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables)
        // 通過ProcessBuilder啟動driver
        launchDriver(builder, driverDir, driverDesc.supervise)
      }
      catch {
        case e: Exception => finalException = Some(e)
      }

      // 對driver的退出狀態做一些處理
      val state =
        if (killed) {
          DriverState.KILLED
        } else if (finalException.isDefined) {
          DriverState.ERROR
        } else {
          finalExitCode match {
            case Some(0) => DriverState.FINISHED
            case _ => DriverState.FAILED
          }
        }

      finalState = Some(state)

      // 這個DriverRunner執行緒,向它所屬的worker的actor,傳送一個DriverStateChanged的事件
      worker.send(DriverStateChanged(driverId, state, finalException))
    }
  }.start()
}

第四步:呼叫第三步的launchDriver方法
原始碼位置:org/apache/spark/deploy/worker/DriverRunner.scala

private def launchDriver(builder: ProcessBuilder, baseDir: File, supervise: Boolean) {
  builder.directory(baseDir)
  def initialize(process: Process): Unit = {
    // Redirect stdout and stderr to files
    // 重定向stdout和stderr檔案
    val stdout = new File(baseDir, "stdout")
    CommandUtils.redirectStream(process.getInputStream, stdout)

    val stderr = new File(baseDir, "stderr")
    val formattedCommand = builder.command.asScala.mkString("\"", "\" \"", "\"")
    val header = "Launch Command: %s\n%s\n\n".format(formattedCommand, "=" * 40)
    Files.append(header, stderr, UTF_8)
    CommandUtils.redirectStream(process.getErrorStream, stderr)
  }
  // 呼叫waitFor函式,把driver程序啟動起來
  runCommandWithRetry(ProcessBuilderLike(builder), initialize, supervise)
}

第五步:呼叫第四步的runCommandWithRetry方法
原始碼位置:org/apache/spark/deploy/worker/DriverRunner.scala

def runCommandWithRetry(
    command: ProcessBuilderLike, initialize: Process => Unit, supervise: Boolean): Unit = {
  // Time to wait between submission retries.
  // 在提交重試之間等待時間。
  var waitSeconds = 1
  // A run of this many seconds resets the exponential back-off.
  val successfulRunDuration = 5

  var keepTrying = !killed

  while (keepTrying) {
    logInfo("Launch Command: " + command.command.mkString("\"", "\" \"", "\""))

    synchronized {
      if (killed) { return }
      process = Some(command.start())
      initialize(process.get)
    }

    val processStart = clock.getTimeMillis()
    // 啟動程序
    val exitCode = process.get.waitFor()
    if (clock.getTimeMillis() - processStart > successfulRunDuration * 1000) {
      waitSeconds = 1
    }

    if (supervise && exitCode != 0 && !killed) {
      logInfo(s"Command exited with status $exitCode, re-launching after $waitSeconds s.")
      sleeper.sleep(waitSeconds)
      waitSeconds = waitSeconds * 2 // exponential back-off
    }

    keepTrying = supervise && exitCode != 0 && !killed
    finalExitCode = Some(exitCode)
  }
}
}

master啟動並管理executor程序原始碼分析:
第一步:呼叫schedule方法的startExecutorsOnWorkers
原始碼位置:org/apache/spark/deploy/master/Master.scala

/**
 * Schedule and launch executors on workers
 */
/**
 * Application的排程機制,預設是SpreadOutApps演算法,另外一種是非SpreadOutApps演算法
 */
private def startExecutorsOnWorkers(): Unit = {
  // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
  // in the queue, then the second app, etc.

  // 首先遍歷waitingApps中ApplicationInfo,並且還需要判斷程式中定義的使用cpu的數量-啟動執行application上
  // worker上的excutor所使用的的cpu的要大於0
  for (app <- waitingApps if app.coresLeft > 0) {

    val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor

    // Filter out workers that don't have enough resources to launch an executor
    // 從workers中,過濾出worker的狀態為alive的,按照cpu的數量進行倒序排序
    val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
      .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
        worker.coresFree >= coresPerExecutor.getOrElse(1))
      .sortBy(_.coresFree).reverse

    // 在worker上排程executor
    val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)

   // SpreadOut演算法,會將每一個application要啟動的executor都平均分佈到各個worker上去
   // 比如說20個cpu core,分配到10個worker上,實際會迴圈兩遍worker,每次迴圈,
   // 給每個worker分配一個 core,最後每個worker分配了2個core
   // 總體概括:平均分佈

    // 非SpreadOut演算法,將每一個application,儘可能少的分配到worker上去
    // 每個application,都儘可能的分配到儘量少的worker上,比如說10個worker
    // 每個worker10個cpu,application要分配20個core,那麼其實,只會分配到2個worker上
    // 每個worker都佔滿10個core,其餘的app,就只能分配到下一個worker上
    // 總體概括:儘可能資源大的分配

   // Now that we've decided how many cores to allocate on each worker, let's allocate them
   // 給每個worker分配完application要求的cpu core之後,遍歷worker,只要判斷之前給這個worker分配到了core
    for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
      // 就在worker上啟動executor
      allocateWorkerResourceToExecutors(
        app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
    }
  }
}

第二步:呼叫第一步的allocateWorkerResourceToExecutors方法
原始碼位置:org/apache/spark/deploy/master/Master.scala

private def allocateWorkerResourceToExecutors(
    app: ApplicationInfo,
    assignedCores: Int,
    coresPerExecutor: Option[Int],
    worker: WorkerInfo): Unit = {
  // If the number of cores per executor is specified, we divide the cores assigned
  // to this worker evenly among the executors with no remainder.
  // Otherwise, we launch a single executor that grabs all the assignedCores on this worker.
  val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)
  val coresToAssign = coresPerExecutor.getOrElse(assignedCores)

  for (i <- 1 to numExecutors) {
    // 首先,在application內部快取結構中,新增executor
    // 並且建立ExecutorDesc物件,其中封裝了,給這個executor分配了多少個cpu core
    // 基於我們的機制,實際上,最後,executor的實際數量,以及executor所對應的cpu是不一致的
    // 我們這裡是根據總的機制來分配的,比如說要求啟動3個executor,每一個executor3個cpu,9個worker,
    // 根據我們的演算法來說的話,就是每一個worker啟動一個executor,一個executor對應一個cpu core
    val exec = app.addExecutor(worker, coresToAssign)
    launchExecutor(worker, exec)
    // 將app的狀態設定為running
    app.state = ApplicationState.RUNNING
  }
}

第三步:呼叫第二步的launchExecutor方法
原始碼位置:org/apache/spark/deploy/master/Master.scala

private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
  logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
  // 將executor加入worker的記憶體快取
  worker.addExecutor(exec)
  // 向worker的actor發生LaunchExecutor訊息
  worker.endpoint.send(LaunchExecutor(masterUrl,
    exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
  // 向executor對應的application的driver,發生executorAdded訊息
  exec.application.driver.send(
    ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
}

第四步:呼叫worker類的LaunchExecutor這個case class
原始碼位置:org/apache/spark/deploy/worker/Worker.scala

  case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
      if (masterUrl != activeMasterUrl) {
        logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
      } else {
        try {
          logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))

          // Create the executor's working directory
          // 建立exector的本地目錄
          val executorDir = new File(workDir, appId + "/" + execId)
          if (!executorDir.mkdirs()) {
            throw new IOException("Failed to create directory " + executorDir)
          }

          // Create local dirs for the executor. These are passed to the executor via the
          // SPARK_EXECUTOR_DIRS environment variable, and deleted by the Worker when the
          // application finishes.
          val appLocalDirs = appDirectories.get(appId).getOrElse {
            Utils.getOrCreateLocalRootDirs(conf).map { dir =>
              val appDir = Utils.createDirectory(dir, namePrefix = "executor")
              Utils.chmod700(appDir)
              appDir.getAbsolutePath()
            }.toSeq
          }
          appDirectories(appId) = appLocalDirs

          // 建立exectorRunner
          val manager = new ExecutorRunner(
            appId,
            execId,
            appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),
            cores_,
            memory_,
            self,
            workerId,
            host,
            webUi.boundPort,
            publicAddress,
            sparkHome,
            executorDir,
            workerUri,
            conf,
            appLocalDirs, ExecutorState.RUNNING)

          // 將executorRunner加入本地快取
          executors(appId + "/" + execId) = manager

          // 啟動ExecutorRunner
          manager.start()

          // 加上executor要使用的資源
          coresUsed += cores_
          memoryUsed += memory_

          // 向master傳送ExecutorStateChanged這個事件
          sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None))
        } catch {
          case e: Exception => {
            logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e)
            if (executors.contains(appId + "/" + execId)) {
              executors(appId + "/" + execId).kill()
              executors -= appId + "/" + execId
            }
            sendToMaster(ExecutorStateChanged(appId, execId, ExecutorState.FAILED,
              Some(e.toString), None))
          }
        }
      }

第五步:呼叫第四步的ExecutorRunner
原始碼位置:org/apache/spark/deploy/worker/ExecutorRunner.scala

private[worker] def start() {
  workerThread = new Thread("ExecutorRunner for " + fullId) {
    //
    override def run() { fetchAndRunExecutor() }
  }
  workerThread.start()
  // Shutdown hook that kills actors on shutdown.
  shutdownHook = ShutdownHookManager.addShutdownHook { () =>
    // It's possible that we arrive here before calling `fetchAndRunExecutor`, then `state` will
    // be `ExecutorState.RUNNING`. In this case, we should set `state` to `FAILED`.
    if (state == ExecutorState.RUNNING) {
      state = ExecutorState.FAILED
    }
    killProcess(Some("Worker shutting down")) }
}

第六步:呼叫第五步的fetchAndRunExecutor
原始碼位置:org/apache/spark/deploy/worker/ExecutorRunner.scala

  /**
   * Download and run the executor described in our ApplicationDescription
   */
  private def fetchAndRunExecutor() {
    try {
      // Launch the process
      // 封裝一個ProcessBuilder
      val builder = CommandUtils.buildProcessBuilder(appDesc.command, new SecurityManager(conf),
        memory, sparkHome.getAbsolutePath, substituteVariables)

      val command = builder.command()
      val formattedCommand = command.asScala.mkString("\"", "\" \"", "\"")
      logInfo(s"Launch command: $formattedCommand")

      builder.directory(executorDir)
      builder.environment.put("SPARK_EXECUTOR_DIRS", appLocalDirs.mkString(File.pathSeparator))
      // In case we are running this from within the Spark Shell, avoid creating a "scala"
      // parent process for the executor command
      builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")

      // Add webUI log urls
      val baseUrl =
        s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
      builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")
      builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")

      process = builder.start()

      // 重定向檔案輸出流,將stdout和stderr報錯到本地目錄
      val header = "Spark Executor Command: %s\n%s\n\n".format(
        formattedCommand, "=" * 40)

      // Redirect its stdout and stderr to files
      val stdout = new File(executorDir, "stdout")
      stdoutAppender = FileAppender(process.getInputStream, stdout, conf)

      val stderr = new File(executorDir, "stderr")
      Files.write(header, stderr, UTF_8)
      stderrAppender = FileAppender(process.getErrorStream, stderr, conf)

      // Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown)
      // or with nonzero exit code

      // 呼叫Process的waitFor()方法,啟動executor程序
      val exitCode = process.waitFor()

      // executor執行完之後拿到返回狀態
      state = ExecutorState.EXITED
      val message = "Command exited with code " + exitCode

      // 向worker傳送ExecutorStateChanged訊息
      worker.send(ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode)))
    } catch {
      case interrupted: InterruptedException => {
        logInfo("Runner thread for executor " + fullId + " interrupted")
        state = ExecutorState.KILLED
        killProcess(None)
      }
      case e: Exception => {
        logError("Error running executor", e)
        state = ExecutorState.FAILED
        killProcess(Some(e.toString))
      }
    }
  }
}