1.2 DStream 生成 RDD 例項詳解
轉自:https://github.com/lw-lin/CoolplaySpark/blob/master/Spark%20Streaming%20%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90%E7%B3%BB%E5%88%97/1.2%20DStream%20%E7%94%9F%E6%88%90%20RDD%20%E5%AE%9E%E4%BE%8B%E8%AF%A6%E8%A7%A3.md
DStream 生成 RDD 例項詳解
[酷玩 Spark] Spark Streaming 原始碼解析系列 ,返回目錄請 猛戳這裡
本文內容適用範圍:
- 2016.02.25 update, Spark 2.0 全系列 √ (2.0.0-SNAPSHOT 尚未正式釋出)
- 2016.03.10 update, Spark 1.6 全系列 √ (1.6.0, 1.6.1)
- 2015.11.09 update, Spark 1.5 全系列 √ (1.5.0, 1.5.1, 1.5.2)
- 2015.07.15 update, Spark 1.4 全系列 √ (1.4.0, 1.4.1)
閱讀本文前,請一定先閱讀 Spark
Streaming 實現思路與模組概述 一文,其中概述了 Spark Streaming 的 4 大模組的基本作用,有了全域性概念後再看本文對 模組 1 DAG 靜態定義
細節的解釋。
引言
我們在前面的文章講過,Spark Streaming 的 模組 1 DAG 靜態定義
在 Spark Streaming 裡,這個 RDD “模板”對應的具體的類是 DStream
,RDD DAG “模板”對應的具體類是 DStreamGraph
。
DStream 的全限定名是:org.apache.spark.streaming.dstream.DStream DStreamGraph 的全限定名是:org.apache.spark.streaming.DStreamGraph
本文我們就來詳解 DStream
最主要的功能:為每個 batch 生成 RDD
例項。
Quick Example
// ssc.socketTextStream() 將建立一個 SocketInputDStream;這個 InputDStream 的 SocketReceiver 將監聽本機 9999 埠 val lines = ssc.socketTextStream("localhost", 9999) val words = lines.flatMap(_.split(" ")) // DStream transformation val pairs = words.map(word => (word, 1)) // DStream transformation val wordCounts = pairs.reduceByKey(_ + _) // DStream transformation wordCounts.print() // DStream output
這裡我們找到 ssc.socketTextStream("localhost", 9999)
的原始碼實現:
def socketStream[T: ClassTag](hostname: String, port: Int, converter: (InputStream) => Iterator[T], storageLevel: StorageLevel): ReceiverInputDStream[T] = { new SocketInputDStream[T](this, hostname, port, converter, storageLevel) }
也就是 ssc.socketTextStream()
將 new
出來一個 DStream
具體子類 SocketInputDStream
的例項。
然後我們繼續找到下一行 lines.flatMap(_.split(" "))
的原始碼實現:
def flatMap[U: ClassTag](flatMapFunc: T => Traversable[U]): DStream[U] = ssc.withScope { new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc)) }
也就是 lines.flatMap(_.split(" "))
將 new
出來一個 DStream
具體子類 FlatMappedDStream
的例項。
後面幾行也是如此,所以我們如果用 DStream DAG 圖來表示之前那段 quick example 的話,就是這個樣子:
也即,我們給出的那段程式碼,用具體的實現來替換的話,結果如下:
val lines = new SocketInputDStream("localhost", 9999) // 型別是 SocketInputDStream val words = new FlatMappedDStream(lines, _.split(" ")) // 型別是 FlatMappedDStream val pairs = new MappedDStream(words, word => (word, 1)) // 型別是 MappedDStream val wordCounts = new ShuffledDStream(pairs, _ + _) // 型別是 ShuffledDStream new ForeachDStream(wordCounts, cnt => cnt.print()) // 型別是 ForeachDStream
DStream
通過 generatedRDD
管理已生成的 RDD
DStream
內部用一個型別是 HashMap
的變數 generatedRDD
來記錄已經生成過的 RDD
:
private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] ()
generatedRDD
的 key 是一個 Time
;這個 Time
是與使用者指定的 batchDuration
對齊了的時間
—— 如每 15s 生成一個 batch 的話,那麼這裡的 key 的時間就是 08h:00m:00s
,08h:00m:15s
這種,所以其實也就代表是第幾個
batch。generatedRDD
的 value 就是 RDD
的例項。
需要注意,每一個不同的 DStream
例項,都有一個自己的 generatedRDD
。如在下圖中,DStream
a, b, c, d
各有自己的generatedRDD
變數;圖中也示意了 DStream
a
的 generatedRDD
變數。
DStream
對這個 HashMap
的存取主要是通過 getOrCompute(time:
Time)
方法,實現也很簡單,就是一個 —— 查表,如果有就直接返回,如果沒有就生成了放入表、再返回 —— 的邏輯:
private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = { // 從 generatedRDDs 裡 get 一下:如果有 rdd 就返回,沒有 rdd 就進行 orElse 下面的 rdd 生成步驟 generatedRDDs.get(time).orElse { // 驗證 time 需要是 valid if (isTimeValid(time)) { // 然後呼叫 compute(time) 方法獲得 rdd 例項,並存入 rddOption 變數 val rddOption = createRDDWithLocalProperties(time) { PairRDDFunctions.disableOutputSpecValidation.withValue(true) { compute(time) } } rddOption.foreach { case newRDD => if (storageLevel != StorageLevel.NONE) { newRDD.persist(storageLevel) logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel") } if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) { newRDD.checkpoint() logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing") } // 將剛剛例項化出來的 rddOption 放入 generatedRDDs 對應的 time 位置 generatedRDDs.put(time, newRDD) } // 返回剛剛例項化出來的 rddOption rddOption } else { None } } }
最主要還是呼叫了一個 abstract 的 compute(time)
方法。這個方法用於生成 RDD
例項,生成後被放進 generatedRDD
裡供後續的查詢和使用。這個 compute(time)
方法在 DStream
類裡是
abstract 的,但在每個具體的子類裡都提供了實現。
(a) InputDStream
的 compute(time)
實現
InputDStream
是個有很多子類的抽象類,我們看一個具體的子類 FileInputDStream
。
// 來自 FileInputDStream override def compute(validTime: Time): Option[RDD[(K, V)]] = { // 通過一個 findNewFiles() 方法,找到 validTime 以後產生的新 file 的資料 val newFiles = findNewFiles(validTime.milliseconds) logInfo("New files at time " + validTime + ":\n" + newFiles.mkString("\n")) batchTimeToSelectedFiles += ((validTime, newFiles)) recentlySelectedFiles ++= newFiles // 找到了一些新 file;以新 file 的陣列為引數,通過 filesToRDD() 生成單個 RDD 例項 rdds val rdds = Some(filesToRDD(newFiles)) val metadata = Map( "files" -> newFiles.toList, StreamInputInfo.METADATA_KEY_DESCRIPTION -> newFiles.mkString("\n")) val inputInfo = StreamInputInfo(id, 0, metadata) ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo) // 返回生成的單個 RDD 例項 rdds rdds }
而 filesToRDD()
實現如下:
// 來自 FileInputDStream private def filesToRDD(files: Seq[String]): RDD[(K, V)] = { // 對每個 file,都 sc.newAPIHadoopFile(file) 來生成一個 RDD val fileRDDs = files.map { file => val rdd = serializableConfOpt.map(_.value) match { case Some(config) => context.sparkContext.newAPIHadoopFile( file, fm.runtimeClass.asInstanceOf[Class[F]], km.runtimeClass.asInstanceOf[Class[K]], vm.runtimeClass.asInstanceOf[Class[V]], config) case None => context.sparkContext.newAPIHadoopFile[K, V, F](file) } if (rdd.partitions.size == 0) { logError("File " + file + " has no data in it. Spark Streaming can only ingest " + "files that have been \"moved\" to the directory assigned to the file stream. " + "Refer to the streaming programming guide for more details.") } rdd } // 將每個 file 對應的 RDD 進行 union,返回一個 union 後的 UnionRDD new UnionRDD(context.sparkContext, fileRDDs) }
所以,結合以上 compute(validTime: Time)
和 filesToRDD(files:
Seq[String])
方法,我們得出 FileInputDStream
為每個 batch 生成 RDD 的例項過程如下:
- (1) 先通過一個 findNewFiles() 方法,找到 validTime 以後產生的多個新 file
- (2) 對每個新 file,都將其作為引數呼叫 sc.newAPIHadoopFile(file),生成一個 RDD 例項
- (3) 將 (2) 中的多個新 file 對應的多個 RDD 例項進行 union,返回一個 union 後的 UnionRDD
其它 InputDStream
的為每個 batch 生成 RDD
例項的過程也比較類似了。
(b)
一般 DStream
的 compute(time)
實現
前一小節的 InputDStream
沒有上游依賴的 DStream
,可以直接為每個
batch 產生 RDD
例項。一般 DStream
都是由transofrmation 生成的,都有上游依賴的 DStream
,所以為了為
batch 產生 RDD
例項,就需要在 compute(time)
方法裡先獲取上游依賴的 DStream
產生的 RDD
例項。
具體的,我們看兩個具體 DStream
—— MappedDStream
, FilteredDStream
——
的實現:
MappedDStream
的 compute(time)
實現
MappedDStream
很簡單,全類實現如下:
package org.apache.spark.streaming.dstream import org.apache.spark.streaming.{Duration, Time} import org.apache.spark.rdd.RDD import scala.reflect.ClassTag private[streaming] class MappedDStream[T: ClassTag, U: ClassTag] ( parent: DStream[T], mapFunc: T => U ) extends DStream[U](parent.ssc) { override def dependencies: List[DStream[_]] = List(parent) override def slideDuration: Duration = parent.slideDuration override def compute(validTime: Time): Option[RDD[U]] = { parent.getOrCompute(validTime).map(_.map[U](mapFunc)) } }
可以看到,首先在建構函式裡傳入了兩個重要內容:
- parent,是本
MappedDStream
上游依賴的DStream
- mapFunc,是本次 map() 轉換的具體函式
- 在前文 DStream,
DStreamGraph 詳解 中的 quick example 裡的
val pairs = words.map(word => (word, 1))
的mapFunc
就是word => (word, 1)
- 在前文 DStream,
DStreamGraph 詳解 中的 quick example 裡的
所以在 compute(time)
的具體實現裡,就很簡單了:
- (1) 獲取 parent
DStream
在本 batch 裡對應的RDD
例項 - (2) 在這個 parent
RDD
例項上,以mapFunc
為引數呼叫.map(mapFunc)
方法,將得到的新RDD
例項返回- 完全相當於用 RDD API 寫了這樣的程式碼:
return parentRDD.map(mapFunc)
- 完全相當於用 RDD API 寫了這樣的程式碼:
FilteredDStream
的 compute(time)
實現
再看看 FilteredDStream
的全部實現:
package org.apache.spark.streaming.dstream import org.apache.spark.streaming.{Duration, Time} import org.apache.spark.rdd.RDD import scala.reflect.ClassTag private[streaming] class FilteredDStream[T: ClassTag]( parent: DStream[T], filterFunc: T => Boolean ) extends DStream[T](parent.ssc) { override def dependencies: List[DStream[_]] = List(parent) override def slideDuration: Duration = parent.slideDuration override def compute(validTime: Time): Option[RDD[T]] = { parent.getOrCompute(validTime).map(_.filter(filterFunc)) } }
同 MappedDStream
一樣,FilteredDStream
也在建構函式裡傳入了兩個重要內容:
- parent,是本
FilteredDStream
上游依賴的DStream
- filterFunc,是本次 filter() 轉換的具體函式
所以在 compute(time)
的具體實現裡,就很簡單了:
- (1) 獲取 parent
DStream
在本 batch 裡對應的RDD
例項 - (2) 在這個 parent
RDD
例項上,以filterFunc
為引數呼叫.filter(filterFunc)
方法,將得到的新RDD
例項返回- 完全相當於用 RDD API 寫了這樣的程式碼:
return parentRDD.filter(filterFunc)
- 完全相當於用 RDD API 寫了這樣的程式碼:
總結一般 DStream
的 compute(time)
實現
總結上面 MappedDStream
和 FilteredDStream
的實現,可以看到:
DStream
的.map()
操作生成了MappedDStream
,而MappedDStream
在每個 batch 裡生成RDD
例項時,將對parentRDD
呼叫RDD
的.map()
操作 ——DStream.map()
操作完美複製為每個 batch 的RDD.map()
操作DStream
的.filter()
操作生成了FilteredDStream
,而FilteredDStream
在每個 batch 裡生成RDD
例項時,將對parentRDD
呼叫RDD
的.filter()
操作 ——DStream.filter()
操作完美複製為每個 batch 的RDD.filter()
操作
在最開始, DStream
的 transformation 的 API 設計與 RDD
的 transformation 設計保持了一致,就使得,每一個dStreamA
.transformation()
得到的新