1. 程式人生 > >Spark學習筆記(26)在DStream的Action操作之外也可能產生Job操作

Spark學習筆記(26)在DStream的Action操作之外也可能產生Job操作

本期內容: 1. Spark Streaming產生Job的機制 2. Spark Streaming的其它產生Job的方式  1. Spark Streaming產生Job的機制 Scala程式中,函式可以作為引數傳遞,因為函式也是物件。有函式物件不意味著函式馬上就執行。Spark Streaming中,常利用執行緒的run來呼叫函式,從而導致函式的最終執行。 Spark Streaming中,Job物件包含函式成員。 NetworkWordCount程式中,DStream.print導致了Job的產生。 DStream.print:
  def print(num: Int): Unit = ssc.withScope {
    def foreachFunc: (RDD[T], Time) => Unit = {       (rdd: RDD[T], time: Time) => {         val firstNum = rdd.take(num + 1)         // scalastyle:off println         println("-------------------------------------------")         println("Time: " + time)         println("-------------------------------------------")
        firstNum.take(num).foreach(println)         if (firstNum.length > num) println("...")         println()         // scalastyle:on println       }     } foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)   } Spark Streaming應用程式中,除了print,saveAsObjectFiles、saveAsTextFiles等也能呼叫foreachRDD,生成ForEachDStream,才能在後面產生Job。
DStream.foreachRDD:  private def foreachRDD(                           foreachFunc: (RDD[T], Time) => Unit,                           displayInnerRDDOps: Boolean): Unit = { new ForEachDStream(this,       context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()   } 通過register註冊,新生成的ForEachDStream加入到DStreamGraph的成員outputDStreams中。 如果沒有print、count、saveAsObjectFiles、saveAsTextFiles等這樣的程式碼,DStreamGraph中outputDStreams就為空,那麼DStreamGraph.generateJobs就產生結果呢?
DStreamGraph.generateJobs:
  def generateJobs(time: Time): Seq[Job] = {     logDebug("Generating jobs for time " + time)     val jobs = this.synchronized { outputStreams.flatMap { outputStream =>         val jobOption = outputStream.generateJob(time)         jobOption.foreach(_.setCallSite(outputStream.creationSite))         jobOption       }     }     logDebug("Generated " + jobs.length + " jobs for time " + time)     jobs   } DStreamGraph.generateJobs就會產生空的Job序列。
通過對DStream(或其子類)定製自己的方法,可以使foreachFunc的定義中不含有RDD.take這樣的語句。 這樣的話,foreachRDD中的foreachFunc不一定會產生Job。如果其中的函式foreachFunc裡面沒有Action操作,就不會觸發Job。 2. Spark Streaming的其它產生Job的方式 

一定要action才會有Job嗎?不是。ForEachDStream.transform就可能產生Job。ForEachDStream.transform有兩個定義,是呼叫關係。 ForEachDStream.transform:   /**    * Return a new DStream in which each RDD is generated by applying a function    * on each RDD of 'this' DStream.    */   def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U] = ssc.withScope {     // because the DStream is reachable from the outer object here, and because     // DStreams can't be serialized with closures, we can't proactively check     // it for serializability and so we pass the optional false to SparkContext.clean     val cleanedF = context.sparkContext.clean(transformFunc, false)     transform((r: RDD[T], t: Time) => cleanedF(r))   }   /**    * Return a new DStream in which each RDD is generated by applying a function    * on each RDD of 'this' DStream.    */   def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = ssc.withScope {     // because the DStream is reachable from the outer object here, and because     // DStreams can't be serialized with closures, we can't proactively check     // it for serializability and so we pass the optional false to SparkContext.clean     val cleanedF = context.sparkContext.clean(transformFunc, false)     val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => {       assert(rdds.length == 1)       cleanedF(rdds.head.asInstanceOf[RDD[T]], time)     } new TransformedDStream[U](Seq(this), realTransformFunc)   } 其中的函式型別的引數transformFunc是輸入RDD併產生一個新的RDD。最終實際會生成TransformedDStream物件。 在第8課中提到過,一般的DStream子類的Compute方法,僅僅是呼叫父類DStream的getOrCompute,而TransformedDStream的compte方法不是這樣。 TransformedDStream.compute:
  override def compute(validTime: Time): Option[RDD[U]] = {     val parentRDDs = parents.map { parent => parent.getOrCompute(validTime).getOrElse(       // Guard out against parent DStream that return None instead of Some(rdd) to avoid NPE       throw new SparkException(s"Couldn't generate RDD from parent at time $validTime"))     }     val transformedRDD = transformFunc(parentRDDs, validTime)     if (transformedRDD == null) {       throw new SparkException("Transform function must not return null. " +         "Return SparkContext.emptyRDD() instead to represent no element " +         "as the result of transformation.")     }     Some(transformedRDD)   } 和別的DStream子類不同,TransformedDStream的compute方法還呼叫了transformFunc,函式transformFunc是被馬上執行的。這就不會等到JobScheduler排程後再執行。 transformFunc其中如果有count、print等action操作,就也會觸發這個Job的執行。這其實可以理解為是個漏洞。 此前說的各種操作是lazy級別,不能馬上拿到結果。而由於transformFunc不接受Spark的統一排程,這樣可以根據計算結果做出判斷再後續操作。不會因為lazy級別而不能必須做後續的transform。