1. 程式人生 > >Spark原始碼走讀(二) —— Job的提交

Spark原始碼走讀(二) —— Job的提交

import org.apache.spark.{SparkConf, SparkContext}

object SparkWordCount{
  def main(args: Array[String]) {
    if (args.length == 0) {
      System.exit(1)
    }

    val conf = new SparkConf().setAppName("SparkWordCount")
    val sc = new SparkContext(conf)

    val file=sc.textFile("xxx")
    val counts=file.flatMap(line=>line.split(" "
)) .map(word=>(word,1)) .reduceByKey(_+_) counts.saveAsTextFile("xxx") } }

以上面程式碼為例,可以順藤摸瓜看一看Spark如何提交Job。

SparkContext

上面程式碼首先建立了一個SparkContext物件,然後利用SparkContext物件生成了一個RDD(file),再利用RDD去進行計算,儲存。可以看出SparkContext是Spark應用的入口,而實際上SparkContext負責與整個Spark叢集進行互動,可以建立RDD、accumulators 及廣播變數等。官網上SparkContext與其他元件互動圖如下:
這裡寫圖片描述

並且官網對上圖有個簡述:

  1. Spark應用在叢集上是作為獨立的程序執行的,它們由主程式(Driver Program)中的SparkContext物件去協調
  2. 為了把應用執行在叢集上,
    • SparkContext會連線到幾種Cluster Manager(YARN、MESOS、standalone)去給應用分配資源。
    • 一旦連線上,Spark將會在Worker節點獲取executors,為應用執行計算和儲存資料。
    • 接下來,它會把應用程式碼傳送給executors(定義成jar包或python檔案傳遞給SparkContext),最終SparkContext把tasks傳送給executor執行

由上述描述可見SparkContext作用的重要性。官網上有對上圖還有說明幾個有用的點:

  1. 每個應用都有自己的executor程序,這些程序在應用的整個執行期都存在,且executor中可以採用多執行緒的方式執行Task。這樣做的好處是,應用相互隔離,不僅是排程側如此(每個driver排程它自己的tasks),還包括executor側(不同應用的任務執行在不同JVM中)。然而,這也意味著如果不使用外部儲存系統,資料不能在多個Spark應用(SparkContext例項)之間共享
  2. Spark不感知底層的cluster manager。只要可以獲取executor程序,並且這些程序可以互相通訊,即使在支援其他應用的cluster manager(如Mesos/Yarn)上執行也比較容易。
  3. driver program在它的整個生命週期,必須監聽和接收來自於它的executors的連線。因此,driver program必須能夠從工作節點進行網路定址。
  4. 由於driver在叢集上排程任務,所以它應該離worker節點近點,最好是在同一個區域網上執行。如果必須遠端傳送請求到叢集,最好是給driver開啟一個RPC,並且就近提交操作,而不是遠離工作節點去執行driver。

Spark Job提交流程

RDD的操作分為transformation和action,transformation是惰性計算的,只有遇到action才會開始計算。文章開始的程式碼中saveAsTextFile是action,程式碼為:

/**
   * Save this RDD as a text file, using string representations of elements.
   */
  def saveAsTextFile(path: String): Unit = withScope {
    val nullWritableClassTag = implicitly[ClassTag[NullWritable]]
    val textClassTag = implicitly[ClassTag[Text]]
    val r = this.mapPartitions { iter =>
      val text = new Text()
      iter.map { x =>
        text.set(x.toString)
        (NullWritable.get(), text)
      }
    }
    RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)
      .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
  }

直接看方法尾部的saveAsHadoopFile方法,一層層看下去saveAsHadoopFile -> saveAsHadoopDataset,在saveAsHadoopDataset尾部的self.context.runJob開始執行Job,這裡的context是SparkContext物件。

def saveAsHadoopDataset(conf: JobConf): Unit = self.withScope {
  // Rename this as hadoopConf internally to avoid shadowing (see SPARK-2038).
  val hadoopConf = conf
  val outputFormatInstance = hadoopConf.getOutputFormat
  val keyClass = hadoopConf.getOutputKeyClass
  val valueClass = hadoopConf.getOutputValueClass
  if (outputFormatInstance == null) {
    throw new SparkException("Output format class not set")
  }
  if (keyClass == null) {
    throw new SparkException("Output key class not set")
  }
  if (valueClass == null) {
    throw new SparkException("Output value class not set")
  }
  SparkHadoopUtil.get.addCredentials(hadoopConf)

  logDebug("Saving as hadoop file of type (" + keyClass.getSimpleName + ", " +
    valueClass.getSimpleName + ")")

  if (SparkHadoopWriterUtils.isOutputSpecValidationEnabled(self.conf)) {
    // FileOutputFormat ignores the filesystem parameter
    val ignoredFs = FileSystem.get(hadoopConf)
    hadoopConf.getOutputFormat.checkOutputSpecs(ignoredFs, hadoopConf)
  }

  val writer = new SparkHadoopWriter(hadoopConf)
  writer.preSetup()

  val writeToFile = (context: TaskContext, iter: Iterator[(K, V)]) => {
    // Hadoop wants a 32-bit task attempt ID, so if ours is bigger than Int.MaxValue, roll it
    // around by taking a mod. We expect that no task will be attempted 2 billion times.
    val taskAttemptId = (context.taskAttemptId % Int.MaxValue).toInt

    val (outputMetrics, callback) = SparkHadoopWriterUtils.initHadoopOutputMetrics(context)

    writer.setup(context.stageId, context.partitionId, taskAttemptId)
    writer.open()
    var recordsWritten = 0L

    Utils.tryWithSafeFinallyAndFailureCallbacks {
      while (iter.hasNext) {
        val record = iter.next()
        writer.write(record._1.asInstanceOf[AnyRef], record._2.asInstanceOf[AnyRef])

        // Update bytes written metric every few records
        SparkHadoopWriterUtils.maybeUpdateOutputMetrics(outputMetrics, callback, recordsWritten)
        recordsWritten += 1
      }
    }(finallyBlock = writer.close())
    writer.commit()
    outputMetrics.setBytesWritten(callback())
    outputMetrics.setRecordsWritten(recordsWritten)
  }

  self.context.runJob(self, writeToFile)
  writer.commitJob()
}

runJob一層層看下去會發現呼叫了DAGScheduler的runJob,原始碼如下:

def runJob[T, U](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int],
    callSite: CallSite,
    resultHandler: (Int, U) => Unit,
    properties: Properties): Unit = {
  val start = System.nanoTime

  //提交作業 
  val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
  // Note: Do not call Await.ready(future) because that calls `scala.concurrent.blocking`,
  // which causes concurrent SQL executions to fail if a fork-join pool is used. Note that
  // due to idiosyncrasies in Scala, `awaitPermission` is not actually used anywhere so it's
  // safe to pass in null here. For more detail, see SPARK-13747.
  val awaitPermission = null.asInstanceOf[scala.concurrent.CanAwait]
  waiter.completionFuture.ready(Duration.Inf)(awaitPermission)
  waiter.completionFuture.value.get match {
    case scala.util.Success(_) =>
      logInfo("Job %d finished: %s, took %f s".format
        (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
    case scala.util.Failure(exception) =>
      logInfo("Job %d failed: %s, took %f s".format
        (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
      // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
      val callerStackTrace = Thread.currentThread().getStackTrace.tail
      exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
      throw exception
  }
}

原始碼中的submitJob原始碼如下:

def submitJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      resultHandler: (Int, U) => Unit,
      properties: Properties): JobWaiter[U] = {
    // Check to make sure we are not launching a task on a partition that does not exist.
    val maxPartitions = rdd.partitions.length
    partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
      throw new IllegalArgumentException(
        "Attempting to access a non-existent partition: " + p + ". " +
          "Total number of partitions: " + maxPartitions)
    }

    val jobId = nextJobId.getAndIncrement()
    if (partitions.size == 0) {
      // Return immediately if the job is running 0 tasks
      return new JobWaiter[U](this, jobId, 0, resultHandler)
    }

    assert(partitions.size > 0)
    val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
    //JobWaiter物件是等待DAGScheduler job去完成的物件。
    //當任務執行完,它會把任務結果傳給給定的handler函式
    val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
    //eventProcessLoop的實際型別是DAGSchedulerEventProcessLoop,post實際是將JobSubmitted放入eventQueue中,由eventThread後臺處理
    eventProcessLoop.post(JobSubmitted(
      jobId, rdd, func2, partitions.toArray, callSite, waiter,
      SerializationUtils.clone(properties)))
    waiter
  }

這裡eventProcessLoop的實際型別是DAGSchedulerEventProcessLoop,而DAGSchedulerEventProcessLoop繼承了EventLoop[DAGSchedulerEvent],EventLoop原始碼如下:

private[spark] abstract class EventLoop[E](name: String) extends Logging {

//事件佇列
  private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]()

  private val stopped = new AtomicBoolean(false)

  private val eventThread = new Thread(name) {
    setDaemon(true)

    override def run(): Unit = {
      try {
        while (!stopped.get) {
          //從事件佇列中取出事件
          val event = eventQueue.take()
          try {
          //處理事件,這裡呼叫的DAGSchedulerEventProcessLoop的onReceive
            onReceive(event)
          } catch {
            case NonFatal(e) =>
              try {
                onError(e)
              } catch {
                case NonFatal(e) => logError("Unexpected error in " + name, e)
              }
          }
        }
      } catch {
        case ie: InterruptedException => // exit even if eventQueue is not empty
        case NonFatal(e) => logError("Unexpected error in " + name, e)
      }
    }

  }

  def start(): Unit = {
    if (stopped.get) {
      throw new IllegalStateException(name + " has already been stopped")
    }
    // Call onStart before starting the event thread to make sure it happens before onReceive
    onStart()
    eventThread.start()
  }
//省略其餘原始碼

這裡呼叫的onReceiveDAGSchedulerEventProcessLooponReceive,原始碼如下:

/**
 * The main event loop of the DAG scheduler.
 */
override def onReceive(event: DAGSchedulerEvent): Unit = {
  val timerContext = timer.time()
  try {
    doOnReceive(event)
  } finally {
    timerContext.stop()
  }
}

再看doOnReceive原始碼:

private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
    //JobSubmitted在這裡處理
    case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
      dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)

    case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
      dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)

    case StageCancelled(stageId, reason) =>
      dagScheduler.handleStageCancellation(stageId, reason)

    case JobCancelled(jobId, reason) =>
      dagScheduler.handleJobCancellation(jobId, reason)

    case JobGroupCancelled(groupId) =>
      dagScheduler.handleJobGroupCancelled(groupId)

    case AllJobsCancelled =>
      dagScheduler.doCancelAllJobs()

    case ExecutorAdded(execId, host) =>
      dagScheduler.handleExecutorAdded(execId, host)

    case ExecutorLost(execId, reason) =>
      val filesLost = reason match {
        case SlaveLost(_, true) => true
        case _ => false
      }
      dagScheduler.handleExecutorLost(execId, filesLost)

    case BeginEvent(task, taskInfo) =>
      dagScheduler.handleBeginEvent(task, taskInfo)

    case GettingResultEvent(taskInfo) =>
      dagScheduler.handleGetTaskResult(taskInfo)

    case completion: CompletionEvent =>
      dagScheduler.handleTaskCompletion(completion)

    case TaskSetFailed(taskSet, reason, exception) =>
      dagScheduler.handleTaskSetFailed(taskSet, reason, exception)

    case ResubmitFailedStages =>
      dagScheduler.resubmitFailedStages()
  }

從這裡可以發現最終是呼叫了DAGScheduler的handleJobSubmitted方法進行job的提交。job提交之後涉及到Stage的劃分和task的提交。