1. 程式人生 > >Spark原始碼解讀之RDD構建和轉換過程

Spark原始碼解讀之RDD構建和轉換過程

上一節講了Spark原始碼解讀之Context的初始化過程,發現其實一行簡單的new SparkContext(sparkConf)程式碼,spark內部會去做很多事情。這節主要講RDD的構建和轉換過程。

一、 RDD概述

    RDD (Resilient Distributed Dataset) ,一個彈性分散式資料集,Spark中的基本抽象。代表一個不變(只讀)的、可以並行操作的元素的分割槽集合。Spark中原生的RDD支援從以下三種方式建立:從scala集合中建立、從檔案系統中建立、現有RDD的transform操作建立。RDD主要有以下五個特點:

1. 分割槽集合

    RDD是一個分割槽(partition)的集合,一個RDD有一個或多個分割槽。分割槽的數量決定了並行度。使用textFile建立RDD時可以不指定分割槽數(採用預設的分割槽數),也可以自己指定分割槽數。



2. 計算函式以分割槽為單位

    RDD在任務計算時是以分割槽為單位的,計算函式為compute函式:def compute(split: Partition, context: TaskContext): Iterator[T]。輸入引數分別為RDD對應的分割槽以及task執行環境。不同的RDD子類可以去實現自己的compute方法。

3. RDD依賴於其他RDD

   每個RDD都有依賴關係(源RDD的依賴關係為空),這些依賴關係成為lineage,可以通過toDebugString方法來獲得lineage。


使用textFile建立的RDD的lineage為HadoopRDD -> MapPartitionsRDD。

4. key-value 型別RDD的 Partitioner

    對於非key-value型別的RDD,Partitioner為None,對應key-value型別的RDD,Partitioner預設為HashPartitioner。在進行shuffle操作時,如reduceByKey,sortByKey,Partitioner決定了父RDD shuffle的輸出時對應的分割槽中的資料是如何進行map的。

5. 分割槽支援資料本地性

    Spark在進行任務排程時,會嘗試將任務分配到資料所在的機器上,從而避免了機器間的資料傳輸。RDD獲取優先位置的方法為getPreferredLocations。

    一般只有涉及到從外部儲存結構中讀取資料時才會有優先位置,比如HadoopRDD,ShuffledRDD。

二、 例項講解RDD構建和轉換

在idea中對

val WordCounts = sc.textFile("/hosts.txt")
  .flatMap(text => text.split(" "))
  .map(word => (word, 1))
  .reduceByKey(_ + _)

進行debug。

1.  textFile

    SparkContext的textFile方法會從HDFS或本地讀取檔案,然後建立一個String型別的MapPartitionsRDD。方法如下:

  def textFile(
      path: String,
      minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
    assertNotStopped()
    hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
      minPartitions).map(pair => pair._2.toString).setName(path)
  }

path用於指定檔案的路徑,minPartitions用於指定最小的分割槽數,如果不指定minPartitions,會使用defaultMinPartitions方法獲得minPartitions。之後使用hadoopFile方法建立一個HadoopRDD,由於HadoopRDD是一個key-value型別的RDD,key表示偏移量,value表示具體的內容,所以需要使用RDD的map方法來得到具體的資料(這樣會將HadoopRDD轉為 MapPartitionsRDD)。

def defaultMinPartitions: Int = math.min(defaultParallelism, 2)

1.1 defaultMinPartitions方法 

defaultMinPartitions方法會取defaultParallelism和2最小值作為預設的minPartitions。defaultParallelism方法如下:

  def defaultParallelism: Int = {
    assertNotStopped()
    taskScheduler.defaultParallelism
  }

仔細一看,發現呼叫的是TaskScheduler的defaultParallelism方法,TaskScheduler是一個trait,實際上呼叫的是TaskSchedulerImpl的defaultParallelism方法:

  override def defaultParallelism(): Int = backend.defaultParallelism()

由於採用本地執行模式,所以呼叫的是LocalBackend類的defaultParallelism方法:

  override def defaultParallelism(): Int =
    scheduler.conf.getInt("spark.default.parallelism", totalCores)

1.2 hadoopFile方法

    hadoopFile用於建立HadoopRDD,實現如下:

  def hadoopFile[K, V](
      path: String,
      inputFormatClass: Class[_ <: InputFormat[K, V]],
      keyClass: Class[K],
      valueClass: Class[V],
      minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
    assertNotStopped()
    // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
    val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))
    val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)
    new HadoopRDD(
      this,
      confBroadcast,
      Some(setInputPathsFunc),
      inputFormatClass,
      keyClass,
      valueClass,
      minPartitions).setName(path)
  }

建立HadoopRDD之前,先將hadoopConfiguration進行廣播,然後建立一個setInputPathsFunc方法。HadoopRDD類介紹詳見1.3。

1.3 HadoopRDD類

     HadoopRDD的主構造器如下:

class HadoopRDD[K, V](
    sc: SparkContext,
    broadcastedConf: Broadcast[SerializableConfiguration],
    initLocalJobConfFuncOpt: Option[JobConf => Unit],
    inputFormatClass: Class[_ <: InputFormat[K, V]],
    keyClass: Class[K],
    valueClass: Class[V],
    minPartitions: Int)
  extends RDD[(K, V)](sc, Nil) with Logging {

  if (initLocalJobConfFuncOpt.isDefined) {
    sparkContext.clean(initLocalJobConfFuncOpt.get)
  }

    由於所有型別的RDD都繼承最原始的org.apache.spark.rdd.RDD這個抽象類,所有在呼叫HadoopRDD的主構造器時, 會呼叫RDD這個類,HadoopRDD在初始化時會呼叫RDD的主構造器,將SparkContext物件和Dependency型別的序列(此處為Nil,這是由於HadoopRDD是一個源RDD,沒有依賴)傳入。RDD抽象類的介紹詳見1.4。

    建立了HadoopRDD之後,會呼叫SparkContext的clean方法,實現如下:

  private[spark] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = {
    ClosureCleaner.clean(f, checkSerializable)
    f
  }

這裡實際上呼叫了ClosureCleaner的clean方法,目的是為了清楚閉包中的不能序列化的物件,防止RDD在網路傳輸中反序列化失敗。

1.4 RDD抽象類

    RDD類在初始化時,會初始化以下的一些變數。

abstract class RDD[T: ClassTag](
    @transient private var _sc: SparkContext,
    @transient private var deps: Seq[Dependency[_]]
  ) extends Serializable with Logging {
  
  // shuffle操作時,會用到partitioner
  @transient val partitioner: Option[Partitioner] = None 
  
  // 一個唯一的ID,標識這個RDD
  val id: Int = sc.newRddId()
  
  // RDD的名字
  @transient var name: String = null
  
  // Our dependencies and partitions will be gotten by calling subclass's methods below, and will
  // be overwritten when we're checkpointed
  // dependencies和partitions會在checkpoint時重寫
  private var dependencies_ : Seq[Dependency[_]] = null
  @transient private var partitions_ : Array[Partition] = null
  
  private var storageLevel: StorageLevel = StorageLevel.NONE
  @transient private[spark] val creationSite = sc.getCallSite()
  
  @transient private[spark] val scope: Option[RDDOperationScope] = {
    Option(sc.getLocalProperty(SparkContext.RDD_SCOPE_KEY)).map(RDDOperationScope.fromJson)
  }
  
  private[spark] var checkpointData: Option[RDDCheckpointData[T]] = None
  
}

此外,在Spark 1.3版本之後,有一個RDD的伴生物件,伴生物件實現了一些隱式轉換的方法,如:implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])、implicit def rddToSequenceFileRDDFunctions[K, V](rdd: RDD[(K, V)])、implicit def rddToOrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag](rdd: RDD[(K, V)])等等。

1.5 RDD map方法

    在使用hadoopFile方法得到HadoopRDD之後,會呼叫RDD的map方法,將HadoopRDD的value提取出來,作為一個新的RDD(MappartitionsRDD)的資料。map方法實現如下:

  def map[U: ClassTag](f: T => U): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
  }

先呼叫SparkContext的clean方法,之後使用MapPartitionsRDD的主構造器來建立一個MapPartitionsRDD物件。this表示呼叫者,這兒指的是HadoopRDD。

private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
    var prev: RDD[T],
    f: (TaskContext, Int, Iterator[T]) => Iterator[U],  // (TaskContext, partition index, iterator)
    preservesPartitioning: Boolean = false)
  extends RDD[U](prev) {

初始化MapPartitionsRDD,呼叫的是RDD抽象類的一個輔助構造器,如下:

  /** Construct an RDD with just a one-to-one dependency on one parent */
  def this(@transient oneParent: RDD[_]) =
    this(oneParent.context , List(new OneToOneDependency(oneParent)))

輔助構造器再呼叫主構造器,如下:

abstract class RDD[T: ClassTag](
    @transient private var _sc: SparkContext,
    @transient private var deps: Seq[Dependency[_]]
  )

從這兒可以追溯,deps->List(new OneToOneDependency(oneParent)),oneParent->prev->this->HadoopRDD。也就是說在建立MapPartitionsRDD時,會將HadoopRDD作為它的依賴。這個依賴關係存在MapPartitionsRDD,RDD中的getDependencies可以得到deps。(關於Spark中Dependency的講解詳見Spark原始碼解讀之RDD依賴Dependency)

  protected def getDependencies: Seq[Dependency[_]] = deps

使用map方法後,會將HadoopRDD轉為MapPartitionsRDD。

2. RDD flatMap方法

   flatMap方法會將上一步的MapPartitionsRDD進行變換,得到一個新的MapPartitionsRDD。flatMap方法實現如下:

  def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
  }

3. RDD map方法

    map方法在1.5部分已經詳細講解了。

4. RDD reduceByKey方法

    reduceByKey方法是PairRDDFunctions型別RDD特有的方法,由於上一步map方法產生的是一個MapPartitionsRDD,在這可以使用 toDebugString方法來看下RDD之間的依賴關係。


上圖可以看出使用textFile方法生先後生成了HadoopRDD和MapPartitionsRDD,呼叫flatMap和map方法都新生成了MapPartitionsRDD所以在呼叫reduceByKey方法時,會先呼叫所有RDD都有的一個隱式轉換方法rddToPairRDDFunctions,將非PairRDDFunctions型別的RDD轉為PairRDDFunctions型別的RDD。rddToPairRDDFunctions方法如下:

  implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
    (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairRDDFunctions[K, V] = {
    new PairRDDFunctions(rdd)
  }
class PairRDDFunctions[K, V](self: RDD[(K, V)])
    (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null)
  extends Logging
  with SparkHadoopMapReduceUtil
  with Serializable
{

在構建PairRDDFunctions類時,傳入的self即MapPartitionsRDD。

接下來看下reduceByKey的實現:

  /**
   * Merge the values for each key using an associative reduce function. This will also perform
   * the merging locally on each mapper before sending results to a reducer, similarly to a
   * "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/
   * parallelism level.
   */
  def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {
    reduceByKey(defaultPartitioner(self), func)
  }
  
  /**
   * Merge the values for each key using an associative reduce function. This will also perform
   * the merging locally on each mapper before sending results to a reducer, similarly to a
   * "combiner" in MapReduce.
   */
  def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
    combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
  }
  
  def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
    // 將傳入的RDD序列的按照partitions大小進行降序排序
    val bySize = (Seq(rdd) ++ others).sortBy(_.partitions.size).reverse
    // 任何一個RDD有partitioner,則返回該partitioner
    for (r <- bySize if r.partitioner.isDefined && r.partitioner.get.numPartitions > 0) {
      return r.partitioner.get
    }
    // 返回一個HashPartitioner,如果配置了"spark.default.parallelism"引數,
    // 該HashPartitioner的partitions為配置引數的值,否則partitions為所傳入RDD序列中最大的partitions
    if (rdd.context.conf.contains("spark.default.parallelism")) {
      new HashPartitioner(rdd.context.defaultParallelism)
    } else {
      new HashPartitioner(bySize.head.partitions.size)
    }
  }
}
  

reduceByKey方法最終會呼叫combineByKeyWithClassTag方法,其處理步驟如下:

1. 建立Aggregator 

2. self.partitioner為MapPartitionsRDD的partitioner,而建立MapPartitionsRDD時,它的partitioner為None,因而建立ShuffledRDD。

  def combineByKeyWithClassTag[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C,
      partitioner: Partitioner,
      mapSideCombine: Boolean = true,
      serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
    require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
    if (keyClass.isArray) {
      if (mapSideCombine) {
        throw new SparkException("Cannot use map-side combining with array keys.")
      }
      if (partitioner.isInstanceOf[HashPartitioner]) {
        throw new SparkException("Default partitioner cannot partition array keys.")
      }
    }
    val aggregator = new Aggregator[K, V, C](
      self.context.clean(createCombiner),
      self.context.clean(mergeValue),
      self.context.clean(mergeCombiners))
    if (self.partitioner == Some(partitioner)) {
      self.mapPartitions(iter => {
        val context = TaskContext.get()
        new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
      }, preservesPartitioning = true)
    } else {
      new ShuffledRDD[K, V, C](self, partitioner)
        .setSerializer(serializer)
        .setAggregator(aggregator)
        .setMapSideCombine(mapSideCombine)
    }
  }

ShuffledRDD的構造方法如下:

class ShuffledRDD[K: ClassTag, V: ClassTag, C: ClassTag](
    @transient var prev: RDD[_ <: Product2[K, V]],
    part: Partitioner)
  extends RDD[(K, C)](prev.context, Nil) {

從這兒可以看出,在構造ShuffledRDD時,並沒有直接建立它的依賴(因為繼承RDD時,使用RDD的主構造器,傳入的deps引數為Nil)。那麼ShuffledRDD的依賴是什麼時候被建立的呢?其實是在getDependencies方法被呼叫時才建立的。

  override def getDependencies: Seq[Dependency[_]] = {
    List(new ShuffleDependency(prev, part, serializer, keyOrdering, aggregator, mapSideCombine))
  }

由此可見,本例中改ShuffledRDD的依賴為MapPartitionsRDD,依賴型別為ShuffledDependency。

下一篇將介紹Spark原始碼解讀之Job提交