1. 程式人生 > >Spark原始碼解析(一)

Spark原始碼解析(一)

RDD之getNarrowAncestors內部方法分析

最近開始spark的原始碼攻關,其實看原始碼一直是我最怕的東西,因為太多、太雜、太深導致不能夠很好的把我脈絡導致每次最後都放棄。有人跟我說看原始碼可以階段性一個方法一個方法的去學習,去看,每天積累一點總會成功,那麼今天開始我的第一天spark原始碼分析。

我這裡從spark最基本的RDD中的方法說起,我感覺這樣會更容易一些。同時我只對其中感覺比較重要的方法進行一些講解。今天我主要講一下getNarrowAncestors方法,也就是所謂的獲取窄依賴的父RDD的方法,這個方法是一個內部方法,方法整體如下:

/**
 * Return the ancestors of the given RDD that are related to it only through a sequence of
* narrow dependencies. This traverses the given RDD's dependency tree using DFS, but maintains * no ordering on the RDDs returned. */ private[spark] def getNarrowAncestors: Seq[RDD[_]] = { val ancestors = new mutable.HashSet[RDD[_]] def visit(rdd: RDD[_]) { val narrowDependencies = rdd.dependencies.filter(_.isInstanceOf[NarrowDependency[_]]) val
narrowParents = narrowDependencies.map(_.rdd) val narrowParentsNotVisited = narrowParents.filterNot(ancestors.contains) narrowParentsNotVisited.foreach { parent => ancestors.add(parent) visit(parent) } } visit(this) // In case there is a cycle, do not include the root itself
ancestors.filterNot(_ == this).toSeq }

方法的返回值為Seq型別,裡面包含RDD,這不難理解,一個RDD他是窄依賴,那麼他有一個父RDD,但是你不能保證他的父RDD沒有依賴,所以返回結果為一個序列,裡面裝有很多的RDD。然後一開始我們看到一個HashSet,他的裡面裝有RDD(不過目前是空的),為什麼用HashSet?好問題,因為它裡面沒有重複,我們不需要獲取一個含有重複的序列(就像別人問你家裡都有誰,你不會說兩次爸爸一樣)。下面定義了一個方法visit,首先這裡很有意思,因為我們在一個方法裡面定義了另外一個方法,所以你不得不佩服scala的靈活性。

這個visit方法需要傳入一個RDD作為引數。然後我們進入方法第一行,rdd呼叫他的dependencies方法,以下是dependencies方法:

/**
 * Get the list of dependencies of this RDD, taking into account whether the
 * RDD is checkpointed or not.
 */
final def dependencies: Seq[Dependency[_]] = {
  checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse {
    if (dependencies_ == null) {
      dependencies_ = getDependencies
    }
    dependencies_
}
}

這個方法同樣返回一個Seq裡面是Dependecy的型別,Dependency型別是一個抽象類,很簡單,整體如下:

abstract class Dependency[T] extends Serializable {
  def rdd: RDD[T]
}

只有一個RDD,那麼回到上面dependencies方法中,第一行,我們檢視checkpointRDD。可能有很多人不知道這是什麼,那麼請檢視如下程式碼:

/** An Option holding our checkpoint RDD, if we are checkpointed */
private def checkpointRDD: Option[CheckpointRDD[T]] = checkpointData.flatMap(_.checkpointRDD)

可以看到,checkpointRDD是一個Option型別裡面包含的RDD是我們已經checkpointed的。我的理解是,這句可以理解為一個過濾過程。回到上面的dependencies方法中,我們看到checkpointRDD對其內的每個元素(也就是CheckpointRDD型別的RDD)執行方法OneToOneDependency(),程式碼如下:

/**
 * :: DeveloperApi ::
 * Represents a one-to-one dependency between partitions of the parent and child RDDs.
 */
@DeveloperApi
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
  override def getParents(partitionId: Int): List[Int] = List(partitionId)
}

這裡為不引起混亂,不在進入更深的方法,這個方法的含義是獲取所有的子RDD與父RDD的對應關係為1對1的父RDD的分割槽號。回到denpendencies方法中,我們看到下面有個方法getDependencies顧名思義,獲取所有的依賴(此處方法不在深入),此時dependencies方法返回結果,返回一開始的visit方法中,為了方便檢視,我再打印出來一遍:

def visit(rdd: RDD[_]) {
  val narrowDependencies = rdd.dependencies.filter(_.isInstanceOf[NarrowDependency[_]])
  val narrowParents = narrowDependencies.map(_.rdd)
  val narrowParentsNotVisited = narrowParents.filterNot(ancestors.contains)
  narrowParentsNotVisited.foreach { parent =>
    ancestors.add(parent)
    visit(parent)
  }
}

獲取到值進行filter過濾,過濾的條件是檢視裡面的RDD是否為NarrowDependency型別。然後我們將過濾後的值儲存變數名字為:narrowDependencies,然後將依賴中的rdd取出儲存變數名字為narrowParents,也就是所謂的父RDD。然後下面幾段程式碼的目的是把父RDD放入到我們一開始建立的HashSet中,重複的去掉,沒有的加入。

再回到getNarrowAncestors方法中,我們看到下面它呼叫了visit方法,把本身作為引數傳入。最後那一句是為了過濾出父類中又包含自身RDD這種迴圈而存在的,把過濾後的轉化為Seq作為結果傳出。