1. 程式人生 > >1.2 DStream 生成 RDD 例項詳解

1.2 DStream 生成 RDD 例項詳解

轉自:https://github.com/lw-lin/CoolplaySpark/blob/master/Spark%20Streaming%20%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90%E7%B3%BB%E5%88%97/1.2%20DStream%20%E7%94%9F%E6%88%90%20RDD%20%E5%AE%9E%E4%BE%8B%E8%AF%A6%E8%A7%A3.md 

DStream 生成 RDD 例項詳解

[酷玩 Spark] Spark Streaming 原始碼解析系列 ,返回目錄請 猛戳這裡

本文內容適用範圍:

  • 2016.02.25 update, Spark 2.0 全系列 √ (2.0.0-SNAPSHOT 尚未正式釋出)
  • 2016.03.10 update, Spark 1.6 全系列 √ (1.6.0, 1.6.1)
  • 2015.11.09 update, Spark 1.5 全系列 √ (1.5.0, 1.5.1, 1.5.2)
  • 2015.07.15 update, Spark 1.4 全系列 √ (1.4.0, 1.4.1) 

閱讀本文前,請一定先閱讀 Spark Streaming 實現思路與模組概述 一文,其中概述了 Spark Streaming 的 4 大模組的基本作用,有了全域性概念後再看本文對 模組 1 DAG 靜態定義 細節的解釋。

引言

我們在前面的文章講過,Spark Streaming 的 模組 1 DAG 靜態定義

 要解決的問題就是如何把計算邏輯描述為一個 RDD DAG 的“模板”,在後面 Job 動態生成的時候,針對每個 batch,都將根據這個“模板”生成一個 RDD DAG 的例項。

image

在 Spark Streaming 裡,這個 RDD “模板”對應的具體的類是 DStream,RDD DAG “模板”對應的具體類是 DStreamGraph

DStream      的全限定名是:org.apache.spark.streaming.dstream.DStream
DStreamGraph 的全限定名是:org.apache.spark.streaming.DStreamGraph

本文我們就來詳解 DStream 最主要的功能:為每個 batch 生成 RDD 例項。

Quick Example

// ssc.socketTextStream() 將建立一個 SocketInputDStream;這個 InputDStream 的 SocketReceiver 將監聽本機 9999 埠
val lines = ssc.socketTextStream("localhost", 9999)

val words = lines.flatMap(_.split(" "))      // DStream transformation
val pairs = words.map(word => (word, 1))     // DStream transformation
val wordCounts = pairs.reduceByKey(_ + _)    // DStream transformation
wordCounts.print()                           // DStream output

這裡我們找到 ssc.socketTextStream("localhost", 9999) 的原始碼實現:

def socketStream[T: ClassTag](hostname: String, port: Int, converter: (InputStream) => Iterator[T], storageLevel: StorageLevel): ReceiverInputDStream[T] = {
  new SocketInputDStream[T](this, hostname, port, converter, storageLevel)
}

也就是 ssc.socketTextStream() 將 new 出來一個 DStream 具體子類 SocketInputDStream 的例項。

然後我們繼續找到下一行 lines.flatMap(_.split(" ")) 的原始碼實現:

def flatMap[U: ClassTag](flatMapFunc: T => Traversable[U]): DStream[U] = ssc.withScope {
  new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc))
}

也就是 lines.flatMap(_.split(" ")) 將 new 出來一個 DStream 具體子類 FlatMappedDStream 的例項。

後面幾行也是如此,所以我們如果用 DStream DAG 圖來表示之前那段 quick example 的話,就是這個樣子:

image

也即,我們給出的那段程式碼,用具體的實現來替換的話,結果如下:

val lines = new SocketInputDStream("localhost", 9999)   // 型別是 SocketInputDStream

val words = new FlatMappedDStream(lines, _.split(" "))  // 型別是 FlatMappedDStream
val pairs = new MappedDStream(words, word => (word, 1)) // 型別是 MappedDStream
val wordCounts = new ShuffledDStream(pairs, _ + _)      // 型別是 ShuffledDStream
new ForeachDStream(wordCounts, cnt => cnt.print())      // 型別是 ForeachDStream

DStream 通過 generatedRDD 管理已生成的 RDD

DStream 內部用一個型別是 HashMap 的變數 generatedRDD 來記錄已經生成過的 RDD

private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] ()

generatedRDD 的 key 是一個 Time;這個 Time 是與使用者指定的 batchDuration 對齊了的時間 —— 如每 15s 生成一個 batch 的話,那麼這裡的 key 的時間就是 08h:00m:00s08h:00m:15s 這種,所以其實也就代表是第幾個 batch。generatedRDD 的 value 就是 RDD 的例項。

需要注意,每一個不同的 DStream 例項,都有一個自己的 generatedRDD。如在下圖中,DStream a, b, c, d 各有自己的generatedRDD 變數;圖中也示意了 DStream a 的 generatedRDD 變數。

image

DStream 對這個 HashMap 的存取主要是通過 getOrCompute(time: Time) 方法,實現也很簡單,就是一個 —— 查表,如果有就直接返回,如果沒有就生成了放入表、再返回 —— 的邏輯:

private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {
    // 從 generatedRDDs 裡 get 一下:如果有 rdd 就返回,沒有 rdd 就進行 orElse 下面的 rdd 生成步驟
    generatedRDDs.get(time).orElse {
      // 驗證 time 需要是 valid
      if (isTimeValid(time)) {
        // 然後呼叫 compute(time) 方法獲得 rdd 例項,並存入 rddOption 變數
        val rddOption = createRDDWithLocalProperties(time) {
          PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
            compute(time)
          }
        }

        rddOption.foreach { case newRDD =>
          if (storageLevel != StorageLevel.NONE) {
            newRDD.persist(storageLevel)
            logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel")
          }
          if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
            newRDD.checkpoint()
            logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing")
          }
          // 將剛剛例項化出來的 rddOption 放入 generatedRDDs 對應的 time 位置
          generatedRDDs.put(time, newRDD)
        }
        // 返回剛剛例項化出來的 rddOption
        rddOption
      } else {
        None
      }
    }
  }

最主要還是呼叫了一個 abstract 的 compute(time) 方法。這個方法用於生成 RDD 例項,生成後被放進 generatedRDD 裡供後續的查詢和使用。這個 compute(time) 方法在 DStream 類裡是 abstract 的,但在每個具體的子類裡都提供了實現。

(a) InputDStream 的 compute(time) 實現

InputDStream 是個有很多子類的抽象類,我們看一個具體的子類 FileInputDStream

// 來自 FileInputDStream
override def compute(validTime: Time): Option[RDD[(K, V)]] = {
    // 通過一個 findNewFiles() 方法,找到 validTime 以後產生的新 file 的資料
    val newFiles = findNewFiles(validTime.milliseconds)
    logInfo("New files at time " + validTime + ":\n" + newFiles.mkString("\n"))
    batchTimeToSelectedFiles += ((validTime, newFiles))
    recentlySelectedFiles ++= newFiles

    // 找到了一些新 file;以新 file 的陣列為引數,通過 filesToRDD() 生成單個 RDD 例項 rdds
    val rdds = Some(filesToRDD(newFiles))

    val metadata = Map(
      "files" -> newFiles.toList,
      StreamInputInfo.METADATA_KEY_DESCRIPTION -> newFiles.mkString("\n"))
    val inputInfo = StreamInputInfo(id, 0, metadata)
    ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)

    // 返回生成的單個 RDD 例項 rdds
    rdds
  }

而 filesToRDD() 實現如下:

// 來自 FileInputDStream
private def filesToRDD(files: Seq[String]): RDD[(K, V)] = {
  // 對每個 file,都 sc.newAPIHadoopFile(file) 來生成一個 RDD
  val fileRDDs = files.map { file =>
    val rdd = serializableConfOpt.map(_.value) match {
      case Some(config) => context.sparkContext.newAPIHadoopFile(
        file,
        fm.runtimeClass.asInstanceOf[Class[F]],
        km.runtimeClass.asInstanceOf[Class[K]],
        vm.runtimeClass.asInstanceOf[Class[V]],
        config)
      case None => context.sparkContext.newAPIHadoopFile[K, V, F](file)
    }
    if (rdd.partitions.size == 0) {
      logError("File " + file + " has no data in it. Spark Streaming can only ingest " +
        "files that have been \"moved\" to the directory assigned to the file stream. " +
        "Refer to the streaming programming guide for more details.")
    }
    rdd
  }
  // 將每個 file 對應的 RDD 進行 union,返回一個 union 後的 UnionRDD
  new UnionRDD(context.sparkContext, fileRDDs)
}

所以,結合以上 compute(validTime: Time) 和 filesToRDD(files: Seq[String]) 方法,我們得出 FileInputDStream 為每個 batch 生成 RDD 的例項過程如下:

  • (1) 先通過一個 findNewFiles() 方法,找到 validTime 以後產生的多個新 file
  • (2) 對每個新 file,都將其作為引數呼叫 sc.newAPIHadoopFile(file),生成一個 RDD 例項
  • (3) 將 (2) 中的多個新 file 對應的多個 RDD 例項進行 union,返回一個 union 後的 UnionRDD

其它 InputDStream 的為每個 batch 生成 RDD 例項的過程也比較類似了。

(b) 一般 DStream 的 compute(time) 實現

前一小節的 InputDStream 沒有上游依賴的 DStream,可以直接為每個 batch 產生 RDD 例項。一般 DStream 都是由transofrmation 生成的,都有上游依賴的 DStream,所以為了為 batch 產生 RDD 例項,就需要在 compute(time) 方法裡先獲取上游依賴的 DStream 產生的 RDD 例項。

具體的,我們看兩個具體 DStream —— MappedDStreamFilteredDStream —— 的實現:

MappedDStream 的 compute(time) 實現

MappedDStream 很簡單,全類實現如下:

package org.apache.spark.streaming.dstream

import org.apache.spark.streaming.{Duration, Time}
import org.apache.spark.rdd.RDD
import scala.reflect.ClassTag

private[streaming]
class MappedDStream[T: ClassTag, U: ClassTag] (
    parent: DStream[T],
    mapFunc: T => U
  ) extends DStream[U](parent.ssc) {

  override def dependencies: List[DStream[_]] = List(parent)

  override def slideDuration: Duration = parent.slideDuration

  override def compute(validTime: Time): Option[RDD[U]] = {
    parent.getOrCompute(validTime).map(_.map[U](mapFunc))
  }
}

可以看到,首先在建構函式裡傳入了兩個重要內容:

  • parent,是本 MappedDStream 上游依賴的 DStream
  • mapFunc,是本次 map() 轉換的具體函式
    • 在前文 DStream, DStreamGraph 詳解 中的 quick example 裡的 val pairs = words.map(word => (word, 1)) 的mapFunc 就是 word => (word, 1)

所以在 compute(time) 的具體實現裡,就很簡單了:

  • (1) 獲取 parent DStream 在本 batch 裡對應的 RDD 例項
  • (2) 在這個 parent RDD 例項上,以 mapFunc 為引數呼叫 .map(mapFunc) 方法,將得到的新 RDD 例項返回
    • 完全相當於用 RDD API 寫了這樣的程式碼:return parentRDD.map(mapFunc)

FilteredDStream 的 compute(time) 實現

再看看 FilteredDStream 的全部實現:

package org.apache.spark.streaming.dstream

import org.apache.spark.streaming.{Duration, Time}
import org.apache.spark.rdd.RDD
import scala.reflect.ClassTag

private[streaming]
class FilteredDStream[T: ClassTag](
    parent: DStream[T],
    filterFunc: T => Boolean
  ) extends DStream[T](parent.ssc) {

  override def dependencies: List[DStream[_]] = List(parent)

  override def slideDuration: Duration = parent.slideDuration

  override def compute(validTime: Time): Option[RDD[T]] = {
    parent.getOrCompute(validTime).map(_.filter(filterFunc))
  }
}

同 MappedDStream 一樣,FilteredDStream 也在建構函式裡傳入了兩個重要內容:

  • parent,是本 FilteredDStream 上游依賴的 DStream
  • filterFunc,是本次 filter() 轉換的具體函式

所以在 compute(time) 的具體實現裡,就很簡單了:

  • (1) 獲取 parent DStream 在本 batch 裡對應的 RDD 例項
  • (2) 在這個 parent RDD 例項上,以 filterFunc 為引數呼叫 .filter(filterFunc) 方法,將得到的新 RDD 例項返回
    • 完全相當於用 RDD API 寫了這樣的程式碼:return parentRDD.filter(filterFunc)

總結一般 DStream 的 compute(time) 實現

總結上面 MappedDStream 和 FilteredDStream 的實現,可以看到:

  • DStream 的 .map() 操作生成了 MappedDStream,而 MappedDStream 在每個 batch 裡生成 RDD 例項時,將對 parentRDD呼叫 RDD 的 .map() 操作 —— DStream.map() 操作完美複製為每個 batch 的 RDD.map() 操作
  • DStream 的 .filter() 操作生成了 FilteredDStream,而 FilteredDStream 在每個 batch 裡生成 RDD 例項時,將對parentRDD 呼叫 RDD 的 .filter() 操作 —— DStream.filter() 操作完美複製為每個 batch 的 RDD.filter() 操作

在最開始, DStream 的 transformation 的 API 設計與 RDD 的 transformation 設計保持了一致,就使得,每一個dStreamA.transformation() 得到的新