1. 程式人生 > >從HadoopRDD生成各個階段的RDD 原始碼詳解

從HadoopRDD生成各個階段的RDD 原始碼詳解

1、什麼是RDD?

上一章講了Spark提交作業的過程,這一章我們要講RDD。簡單的講,RDD就是Spark的input,知道input是啥吧,就是輸入的資料。

RDD的全名是Resilient Distributed Dataset,意思是容錯的分散式資料集,每一個RDD都會有5個特徵:

1、有一個分片列表。就是能被切分,和hadoop一樣的,能夠切分的資料才能平行計算。

2、有一個函式計算每一個分片,這裡指的是下面會提到的compute函式。

3、對其他的RDD的依賴列表,依賴還具體分為寬依賴和窄依賴,但並不是所有的RDD都有依賴。

4、可選:key-value型的RDD是根據雜湊來分割槽的,類似於mapreduce當中的Paritioner介面,控制key分到哪個reduce。

5、可選:每一個分片的優先計算位置(preferred locations),比如HDFS的block的所在位置應該是優先計算的位置。

 對應著上面這幾點,我們在RDD裡面能找到這4個方法和1個屬性,彆著急,下面我們會慢慢展開說這5個東東。

  //只計算一次  
  protected def getPartitions: Array[Partition]  
  //對一個分片進行計算,得出一個可遍歷的結果
  def compute(split: Partition, context: TaskContext): Iterator[T]
  //只計算一次,計算RDD對父RDD的依賴
  protected
def getDependencies: Seq[Dependency[_]] = deps //可選的,分割槽的方法,針對第4點,類似於mapreduce當中的Paritioner介面,控制key分到哪個reduce @transient val partitioner: Option[Partitioner] = None //可選的,指定優先位置,輸入引數是split分片,輸出結果是一組優先的節點位置 protected def getPreferredLocations(split: Partition): Seq[String] = Nil

2、多種RDD之間的轉換

下面用一個例項講解一下吧,就拿我們常用的一段程式碼來講吧,然後會把我們常用的RDD都會講到。

val hdfsFile = sc.textFile(args(1))
    val flatMapRdd = hdfsFile.flatMap(s => s.split(" "))
    val filterRdd = flatMapRdd.filter(_.length == 2)
    val mapRdd = filterRdd.map(word => (word, 1))
    val reduce = mapRdd.reduceByKey(_ + _)

這裡涉及到很多個RDD,textFile方法返回的是一個HadoopRDD經過map後的MappredRDD,經過flatMap是一個FlatMappedRDD,經過filter方法之後生成了一個FilteredRDD,經過map函式之後,變成一個MappedRDD,最後經過reduceByKey。

我們首先看textFile的這個方法,進入SparkContext這個方法,找到它。

def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = {
    hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minPartitions).map(pair => pair._2.toString)
}

看它的輸入引數,path,TextInputFormat,LongWritable,Text,同志們聯想到什麼?寫過mapreduce的童鞋都應該知道哈。

1、hdfs的地址

2、InputFormat的型別

3、Mapper的第一個型別

4、Mapper的第二型別

這就不難理解為什麼立馬就對hadoopFile後面加了一個map方法,取pair的第二個引數了,最後在shell裡面我們看到它是一個MappredRDD了。

那麼現在如果大家要用的不是textFile,而是一個別的hadoop檔案型別,大家會不會使用hadoopFile來得到自己要得到的型別呢,不要告訴我不會哈,不會的趕緊回去複習mapreduce。

言歸正傳,預設的defaultMinPartitions的2太小了,我們用的時候還是設定大一點吧。

2.1 HadoopRDD

我們繼續追殺下去,看看hadoopFile方法,裡面我們看到它做了3個操作。

1、把hadoop的配置檔案儲存到廣播變數裡。

2、設定路徑的方法

3、new了一個HadoopRDD返回

好,我們接下去看看HadoopRDD這個類吧,我們重點看看它的getPartitions、compute、getPreferredLocations。

先看getPartitions,它的核心程式碼如下:

val inputSplits = inputFormat.getSplits(jobConf, minPartitions)
    val array = new Array[Partition](inputSplits.size)
    for (i <- 0 until inputSplits.size) {
      array(i) = new HadoopPartition(id, i, inputSplits(i))
    }

它呼叫的是inputFormat自帶的getSplits方法來計算分片,然後把分片HadoopPartition包裝到到array裡面返回。

這裡順便順帶提一下,因為1.0又出來一個NewHadoopRDD,它使用的是mapreduce新api的inputformat,getSplits就不要有minPartitions了,別的邏輯都是一樣的,只是使用的類有點區別。

我們接下來看compute方法,它的輸入值是一個Partition,返回是一個Iterator[(K, V)]型別的資料,這裡面我們只需要關注2點即可。

1、把Partition轉成HadoopPartition,然後通過InputSplit建立一個RecordReader

2、重寫Iterator的getNext方法,通過建立的reader呼叫next方法讀取下一個值。

// 轉換成HadoopPartition
	val split = theSplit.asInstanceOf[HadoopPartition]
	logInfo("Input split: " + split.inputSplit)
	var reader: RecordReader[K, V] = null
	val jobConf = getJobConf()
	val inputFormat = getInputFormat(jobConf)
	  context.stageId, theSplit.index, context.attemptId.toInt, jobConf)
	// 通過Inputform的getRecordReader來建立這個InputSpit的Reader
	reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)

	// 呼叫Reader的next方法
	val key: K = reader.createKey()
	val value: V = reader.createValue()
	override def getNext() = {
	  try {
	    finished = !reader.next(key, value)
	  } catch {
	    case eof: EOFException =>
		finished = true
	  }
	  (key, value)
	}
View Code

從這裡我們可以看得出來compute方法是通過分片來獲得Iterator介面,以遍歷分片的資料。

getPreferredLocations方法就更簡單了,直接呼叫InputSplit的getLocations方法獲得所在的位置。

2.2 依賴

下面我們看RDD裡面的map方法

def map[U: ClassTag](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f))

直接new了一個MappedRDD,還把匿名函式f處理了再傳進去,我們繼續追殺到MappedRDD。

private[spark]
class MappedRDD[U: ClassTag, T: ClassTag](prev: RDD[T], f: T => U)
  extends RDD[U](prev) {
  override def getPartitions: Array[Partition] = firstParent[T].partitions
  override def compute(split: Partition, context: TaskContext) =
    firstParent[T].iterator(split, context).map(f)
}

MappedRDD把getPartitions和compute給重寫了,而且都用到了firstParent[T],這個firstParent是何須人也?我們可以先點選進入RDD[U](prev)這個建構函式裡面去。

def this(@transient oneParent: RDD[_]) = this(oneParent.context , List(new OneToOneDependency(oneParent)))

就這樣你會發現它把RDD複製給了deps,HadoopRDD成了MappedRDD的父依賴了,這個OneToOneDependency是一個窄依賴,子RDD直接依賴於父RDD,繼續看firstParent。

protected[spark] def firstParent[U: ClassTag] = {
  dependencies.head.rdd.asInstanceOf[RDD[U]]
}

由此我們可以得出兩個結論:

1、getPartitions直接沿用了父RDD的分片資訊

2、compute函式是在父RDD遍歷每一行資料時套一個匿名函式f進行處理

好吧,現在我們可以理解compute函式真正是在幹嘛的了

它的兩個顯著作用:

1、在沒有依賴的條件下,根據分片的資訊生成遍歷資料的Iterable介面

2、在有前置依賴的條件下,在父RDD的Iterable介面上給遍歷每個元素的時候再套上一個方法

我們看看點選進入map(f)的方法進去看一下

def map[B](f: A => B): Iterator[B] = new AbstractIterator[B] {
    def hasNext = self.hasNext
    def next() = f(self.next())
  }

看黃色的位置,看它的next函式,不得不說,寫得真的很妙!

我們接著看RDD的flatMap方法,你會發現它和map函式幾乎沒什麼區別,只是RDD變成了FlatMappedRDD,但是flatMap和map的效果還是差別挺大的。

比如((1,2),(3,4)), 如果是呼叫了flatMap函式,我們訪問到的就是(1,2,3,4)4個元素;如果是map的話,我們訪問到的就是(1,2),(3,4)兩個元素。

有興趣的可以去看看FlatMappedRDD和FilteredRDD這裡就不講了,和MappedRDD類似。

2.3 reduceByKey

前面的RDD轉換都簡單,可是到了reduceByKey可就不簡單了哦,因為這裡有一個同相同key的內容聚合的一個過程,所以它是最複雜的那一類。

那reduceByKey這個方法在哪裡呢,它的PairRDDFunctions裡面,這是個隱式轉換,所以比較隱蔽哦,你在RDD裡面是找不到的。

def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = {
    combineByKey[V]((v: V) => v, func, func, partitioner)
  }

它呼叫的是combineByKey方法,過程過程蠻複雜的,摺疊起來,喜歡看的人看看吧。

def combineByKey[C](createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C,
      partitioner: Partitioner,
      mapSideCombine: Boolean = true,
      serializer: Serializer = null): RDD[(K, C)] = {

    val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners)
    if (self.partitioner == Some(partitioner)) {
      // 一般的RDD的partitioner是None,這個條件不成立,即使成立只需要對這個資料做一次按key合併value的操作即可
      self.mapPartitionsWithContext((context, iter) => {
        new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
      }, preservesPartitioning = true)
    } else if (mapSideCombine) {
      // 預設是走的這個方法,需要map端的combinber.
      val combined = self.mapPartitionsWithContext((context, iter) => {
        aggregator.combineValuesByKey(iter, context)
      }, preservesPartitioning = true)
      val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner)
        .setSerializer(serializer)
      partitioned.mapPartitionsWithContext((context, iter) => {
        new InterruptibleIterator(context, aggregator.combineCombinersByKey(iter, context))
      }, preservesPartitioning = true)
    } else {
      // 不需要map端的combine,直接就來shuffle
      val values = new ShuffledRDD[K, V, (K, V)](self, partitioner).setSerializer(serializer)
      values.mapPartitionsWithContext((context, iter) => {
        new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
      }, preservesPartitioning = true)
    }
  }
View Code

按照一個比較標準的流程來看的話,應該是走的中間的這條路徑,它幹了三件事:

1、給每個分片的資料在外面套一個combineValuesByKey方法的MapPartitionsRDD。

2、用MapPartitionsRDD來new了一個ShuffledRDD出來。

3、對ShuffledRDD做一次combineCombinersByKey。

下面我們先看MapPartitionsRDD,我把和別的RDD有別的兩行給拿出來了,很明顯的區別,f方法是套在iterator的外邊,這樣才能對iterator的所有資料做一個合併。

  override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None
  override def compute(split: Partition, context: TaskContext) =
    f(context, split.index, firstParent[T].iterator(split, context))
}

 接下來我們看Aggregator的combineValuesByKey的方法吧。

def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]],
                         context: TaskContext): Iterator[(K, C)] = {
    // 是否使用外部排序,是由引數spark.shuffle.spill,預設是true
    if (!externalSorting) {
      val combiners = new AppendOnlyMap[K,C]
      var kv: Product2[K, V] = null
      val update = (hadValue: Boolean, oldValue: C) => {
        if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2)
      }
      // 用map來去重,用update方法來更新值,如果沒值的時候,返回值,如果有值的時候,通過mergeValue方法來合併
      // mergeValue方法就是我們在reduceByKey裡面寫的那個匿名函式,在這裡就是(_ + _)
      while (iter.hasNext) {
        kv = iter.next()
        combiners.changeValue(kv._1, update)
      }
      combiners.iterator
    } else {  
      // 用了一個外部排序的map來去重,就不停的往裡面插入值即可,基本原理和上面的差不多,區別在於需要外部排序   
      val combiners = new ExternalAppendOnlyMap[K, V, C](createCombiner, mergeValue, mergeCombiners)
      while (iter.hasNext) {
        val (k, v) = iter.next()
        combiners.insert(k, v)
      }
      combiners.iterator
}
View Code

這個就是一個很典型的按照key來做合併的方法了,我們繼續看ShuffledRDD吧。

ShuffledRDD和之前的RDD很明顯的特徵是

1、它的依賴傳了一個Nil(空列表)進去,表示它沒有依賴。

2、它的compute計算方式比較特別,這個在之後的文章說,過程比較複雜。

在new完ShuffledRDD之後又來了一遍mapPartitionsWithContext,不過呼叫的匿名函式變成了combineCombinersByKey。

combineCombinersByKey和combineValuesByKey的邏輯基本相同,只是輸入輸出的型別有區別。combineCombinersByKey只是做單純的合併,不會對輸入輸出的型別進行改變,combineValuesByKey會把iter[K, V]的V值變成iter[K, C]。

case class Aggregator[K, V, C] (
  createCombiner: V => C,
  mergeValue: (C, V) => C,
  mergeCombiners: (C, C) => C)
  ......
}

 這個方法會根據我們傳進去的匿名方法的引數的型別做一個自動轉換。

到這裡,作業都沒有真正執行,只是將RDD各種巢狀,我們通過RDD的id和型別的變化觀測到這一點,RDD[1]->RDD[2]->RDD[3]......

3 、 其它 RDD

平常我們除了從hdfs上面取資料之後,我們還可能從資料庫裡面取資料,那怎麼辦呢?沒關係,有個JdbcRDD!

val rdd = new JdbcRDD(
	sc,
	() => { DriverManager.getConnection("jdbc:derby:target/JdbcRDDSuiteDb") },
	"SELECT DATA FROM FOO WHERE ? <= ID AND ID <= ?",
	1, 100, 3,
	(r: ResultSet) => { r.getInt(1) } 
   ).cache()

前幾個引數大家都懂,我們重點說一下後面1, 100, 3是咋回事?

在這個JdbcRDD裡面它預設我們是會按照一個long型別的欄位對資料進行切分,(1,100)分別是最小值和最大值,3是分片的數量。

比如我們要一次查ID為1-1000,000的的使用者,分成10個分片,我們就天(1, 1000,000, 10)即可,在sql語句裡面還必須有"? <= ID AND ID <= ?"的句式,別嘗試著自己造句哦!

最後是怎麼處理ResultSet的方法,自己愛怎麼處理怎麼處理去吧。不過確實覺著用得不方便的可以自己重寫一個RDD。

小結:

這一章重點介紹了各種RDD那5個特徵,以及RDD之間的轉換,希望大家可以對RDD有更深入的瞭解,下一章我們將要講作業的執行過程,敬請關注!