Spark原始碼解析(一)
RDD之getNarrowAncestors內部方法分析
最近開始spark的原始碼攻關,其實看原始碼一直是我最怕的東西,因為太多、太雜、太深導致不能夠很好的把我脈絡導致每次最後都放棄。有人跟我說看原始碼可以階段性一個方法一個方法的去學習,去看,每天積累一點總會成功,那麼今天開始我的第一天spark原始碼分析。
我這裡從spark最基本的RDD中的方法說起,我感覺這樣會更容易一些。同時我只對其中感覺比較重要的方法進行一些講解。今天我主要講一下getNarrowAncestors方法,也就是所謂的獲取窄依賴的父RDD的方法,這個方法是一個內部方法,方法整體如下:
/** * Return the ancestors of the given RDD that are related to it only through a sequence of* narrow dependencies. This traverses the given RDD's dependency tree using DFS, but maintains * no ordering on the RDDs returned. */ private[spark] def getNarrowAncestors: Seq[RDD[_]] = { val ancestors = new mutable.HashSet[RDD[_]] def visit(rdd: RDD[_]) { val narrowDependencies = rdd.dependencies.filter(_.isInstanceOf[NarrowDependency[_]]) valnarrowParents = narrowDependencies.map(_.rdd) val narrowParentsNotVisited = narrowParents.filterNot(ancestors.contains) narrowParentsNotVisited.foreach { parent => ancestors.add(parent) visit(parent) } } visit(this) // In case there is a cycle, do not include the root itselfancestors.filterNot(_ == this).toSeq }
方法的返回值為Seq型別,裡面包含RDD,這不難理解,一個RDD他是窄依賴,那麼他有一個父RDD,但是你不能保證他的父RDD沒有依賴,所以返回結果為一個序列,裡面裝有很多的RDD。然後一開始我們看到一個HashSet,他的裡面裝有RDD(不過目前是空的),為什麼用HashSet?好問題,因為它裡面沒有重複,我們不需要獲取一個含有重複的序列(就像別人問你家裡都有誰,你不會說兩次爸爸一樣)。下面定義了一個方法visit,首先這裡很有意思,因為我們在一個方法裡面定義了另外一個方法,所以你不得不佩服scala的靈活性。
這個visit方法需要傳入一個RDD作為引數。然後我們進入方法第一行,rdd呼叫他的dependencies方法,以下是dependencies方法:
/** * Get the list of dependencies of this RDD, taking into account whether the * RDD is checkpointed or not. */ final def dependencies: Seq[Dependency[_]] = { checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse { if (dependencies_ == null) { dependencies_ = getDependencies } dependencies_ } }
這個方法同樣返回一個Seq裡面是Dependecy的型別,Dependency型別是一個抽象類,很簡單,整體如下:
abstract class Dependency[T] extends Serializable { def rdd: RDD[T] }
只有一個RDD,那麼回到上面dependencies方法中,第一行,我們檢視checkpointRDD。可能有很多人不知道這是什麼,那麼請檢視如下程式碼:
/** An Option holding our checkpoint RDD, if we are checkpointed */ private def checkpointRDD: Option[CheckpointRDD[T]] = checkpointData.flatMap(_.checkpointRDD)
可以看到,checkpointRDD是一個Option型別裡面包含的RDD是我們已經checkpointed的。我的理解是,這句可以理解為一個過濾過程。回到上面的dependencies方法中,我們看到checkpointRDD對其內的每個元素(也就是CheckpointRDD型別的RDD)執行方法OneToOneDependency(),程式碼如下:
/** * :: DeveloperApi :: * Represents a one-to-one dependency between partitions of the parent and child RDDs. */ @DeveloperApi class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) { override def getParents(partitionId: Int): List[Int] = List(partitionId) }
這裡為不引起混亂,不在進入更深的方法,這個方法的含義是獲取所有的子RDD與父RDD的對應關係為1對1的父RDD的分割槽號。回到denpendencies方法中,我們看到下面有個方法getDependencies顧名思義,獲取所有的依賴(此處方法不在深入),此時dependencies方法返回結果,返回一開始的visit方法中,為了方便檢視,我再打印出來一遍:
def visit(rdd: RDD[_]) { val narrowDependencies = rdd.dependencies.filter(_.isInstanceOf[NarrowDependency[_]]) val narrowParents = narrowDependencies.map(_.rdd) val narrowParentsNotVisited = narrowParents.filterNot(ancestors.contains) narrowParentsNotVisited.foreach { parent => ancestors.add(parent) visit(parent) } }
獲取到值進行filter過濾,過濾的條件是檢視裡面的RDD是否為NarrowDependency型別。然後我們將過濾後的值儲存變數名字為:narrowDependencies,然後將依賴中的rdd取出儲存變數名字為narrowParents,也就是所謂的父RDD。然後下面幾段程式碼的目的是把父RDD放入到我們一開始建立的HashSet中,重複的去掉,沒有的加入。
再回到getNarrowAncestors方法中,我們看到下面它呼叫了visit方法,把本身作為引數傳入。最後那一句是為了過濾出父類中又包含自身RDD這種迴圈而存在的,把過濾後的轉化為Seq作為結果傳出。