1. 程式人生 > >Spark 原始碼簡單跟蹤

Spark 原始碼簡單跟蹤

本文介紹下Spark 到底是如何執行sc.TextFile(...).map(....).count() 這種程式碼的,從driver端到executor端。

另外還有pid,iter都是哪來的呢? 如果你照著原始碼點進去你會很困惑。為莫名其妙怎麼就有了這些iterator呢?

Transform 和Action的來源

一般剛接觸Spark 的同學,都會被告知這兩個概念。Transform就是RDD的轉換,從一個RDD轉化到另一個RDD(也有多個的情況)。 Action則是出發實際的執行動作。

標題中的map就是一個典型的tansform操作,看原始碼,無非就是從當前的RDD構建了一個新的MapPartitionsRDD

def map[U: ClassTag](f: T => U): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
  }

這個新的RDD 接受了this作為引數,也就記住了他的父RDD。同時接受了一個匿名函式:

 (context, pid, iter) => iter.map(cleanF))

至於這個context,pid,iter是怎麼來的,你當前是不知道的。你只是知道這個新的RDD,有這麼一個函式。至於什麼時候這個函式會被呼叫,我們下面會講解到。

而一個Action是什麼樣的呢?我們看看count:

def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum

發現不一樣了,要真的開始run Job了。sparkContext 的runJob 有很多種形態,這裡你看到的是接受當前這個RDD 以及一個函式(Utils.getIteratorSize _)。

當然,這裡的Utils.getItteratorSize 是一個已經實現好的函式:

  def getIteratorSize[T](iterator: Iterator[T]): Long = {
    var count = 0L
    while (iterator.hasNext) {
      count += 1L
      iterator.next()
    }
    count
  } 

它符合 sc.runJob 需要接受的簽名形態:

 func: Iterator[T] => U

Driver端的工作

這裡你會見到一些熟悉的身影,比如dagScheduler,TaskScheduler,SchedulerBackend等。我們慢慢分解。

我們深入runJob,你馬上就可以看到了dagScheduler了。

dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)

這裡的cleanedFunc 就是前面那個 func: Iterator[T] => U 函式。在我們的例子裡,就是一個計數的函式。

這樣我們就順利的離開SparkContext 進入DAGScheduler的王國了。

dagScheduler會進一步提交任務。

 val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties) 

請記住上面第二個引數,func其實就是前面的 Utils.getItteratorSize 函式,不過簽名略有改變,添加了context,變成了這種形態:

(TaskContext, Iterator[_]) => _

接著會變成一個事件,發到事件佇列裡,其中 func2 還是上面的func,只是被改了名字而已。

eventProcessLoop.post(JobSubmitted(  jobId, rdd, func2, partitions.toArray, callSite, waiter,  SerializationUtils.clone(properties)))

dag會通過handleJobSubmitted 函式處理這個事件。在這裡完成Stage的拆分。這個不是我們這次關注的主題,所以不詳細討論。最後,會把Stage進行提交:

 submitMissingTasks(finalStage)

提交到哪去了呢?會根據Stage的型別,生成實際的任務,然後序列化。序列化後通過廣播機制傳送到所有節點上去。

var taskBinary: Broadcast[Array[Byte]] = null
    try {
      // For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
      // For ResultTask, serialize and broadcast (rdd, func).
      val taskBinaryBytes: Array[Byte] = stage match {
        case stage: ShuffleMapStage =>
          closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef).array()
        case stage: ResultStage =>
          closureSerializer.serialize((stage.rdd, stage.resultOfJob.get.func): AnyRef).array()
      }

      taskBinary = sc.broadcast(taskBinaryBytes)

然後生成tasks物件,ShuffleMapTask 或者ResultTask,我們這裡的count是ResultTask,通過下面的方式提交:

 taskScheduler.submitTasks(new TaskSet(  tasks.toArray, stage.id, stage.latestInfo.attemptId, stage.firstJobId, properties))

現在我們進入 TaskSchedulerImpl 的地盤了。在submitTasks裡我們呼叫了backend.我們接著就進入到CoarseGrainedSchedulerBackend.DriverEndpoint裡。這個DriverEndPoint做完應該怎麼把Task分配到哪些Executor的計算後,最後會去做真正的launchTask的工作:

executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))

把序列化好的任務傳送到Executor 上。到這裡,Driver端的工作就完整了。

有一點你可能會比較好奇,為什麼要做兩次序列化,傳送兩次的? 也就是前面的taskBinary,還有serializedTask。 taskBinany 包括一些RDD,函式等資訊。而serializedTask 這是整個Task的任務資訊,比如對應的那個分割槽號等。後面我們還會看到taskBinary的身影。

Executor端

Executor 的入口是org.apache.spark.executor. Executor類。你可以看到夢寐以求的launchTask 方法

 def launchTask(
      context: ExecutorBackend,
      taskId: Long,
      attemptNumber: Int,
      taskName: String,
      serializedTask: ByteBuffer): Unit = {
    val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,
      serializedTask)
    runningTasks.put(taskId, tr)
    threadPool.execute(tr)
  }

核心你看到了,是TaskRunner方法。進去看看,核心程式碼如下:

 val res = task.run(
            taskAttemptId = taskId,
            attemptNumber = attemptNumber,
            metricsSystem = env.metricsSystem)

這個task(ResultTask).run裡是我們最後的核心,真正的邏輯呼叫發生在這裡:

override def runTask(context: TaskContext): U = {
    // Deserialize the RDD and the func using the broadcast variables.
    val deserializeStartTime = System.currentTimeMillis()
    val ser = SparkEnv.get.closureSerializer.newInstance()
    val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
      ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)
    _executorDeserializeTime = System.currentTimeMillis() - deserializeStartTime

    metrics = Some(context.taskMetrics)
    func(context, rdd.iterator(partition, context))
  }

前面通過taskBinary 還原出RDD,func。 而這裡的func就是我們那個經過改良的Utils.getItteratorSize函式,前面在driver端就被改造成func(context, rdd.iterator(partition, context)) 這種形態了。但是函式體還是下面的

  def getIteratorSize[T](iterator: Iterator[T]): Long = {
    var count = 0L
    while (iterator.hasNext) {
      count += 1L
      iterator.next()
    }
    count
  } 

也就是是一個計數函式。引數iterator則是通過rdd.iterator(partition, context)拿到了。

總結

到此,我們完成了整個程式碼的流轉過程。之所以很多人看到這些地會比較疑惑,是因為看到的程式碼都是在driver端的。但是最後這些任務都要被序列化傳送到Executor端。所以一般我們看到的流程不是連續的。