1. 程式人生 > >Spark學習筆記(18)Spark Streaming中空RDD處理

Spark學習筆記(18)Spark Streaming中空RDD處理

本期內容: 1 Spark Streaming中的空RDD處理 2 Spark Streaming程式的停止 1 Spark Streaming中的空RDD處理 
在Spark Streaming應用程式中,無論使用什麼 DStream,底層實際上就是操作RDD。 從一個應用程式片段開始,進行剖析: ...     val lines = ssc.socketTextStream("Master", 9999)     val words = lines.flatMap(_.split(" "))     val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.foreachRDD { rdd =>       rdd.foreachPartition { partitionOfRecords => {         // ConnectionPool is a static, lazily initialized pool of connections         val connection = ConnectionPool.getConnection()         partitionOfRecords.foreach(record => {           val sql = "insert into streaming_itemcount(item,count) values('" + record._1 + "'," + record._2 + ")"
          val stmt = connection.createStatement();           stmt.executeUpdate(sql);         })         ConnectionPool.returnConnection(connection)  // return to the pool for future reuse       } ... 程式中有一個這樣的問題:wordCounts.foreachRDD裡面,開始時並沒有判斷rdd是否為空,就進行處理了。 rdd為空時,也獲取CPU core等計算資源,並進行裡面的計算。這顯然是不合適的。 雖然Spark中定義了EmptyRDD,且讓其Compute時丟擲異常,但實際Spark應用程式並沒有使用EmptyRDD。 應該對每個rdd進行處理前,應該判斷rdd是否為空。
再看看RDD.isEmpty:   def isEmpty(): Boolean = withScope {     partitions.length == 0 || take(1).length == 0   } 故前面應用程式的程式碼可以在加一行程式碼: wordCounts.foreachRDD { rdd =>       if (!rdd.isEmpty) {
        rdd.foreachPartition { partitionOfRecords => {         ...
      }
    ...
2 Spark Streaming程式的停止 先看StreamingContext.top:   def stop(       stopSparkContext: Boolean = conf.getBoolean("spark.streaming.stopSparkContextByDefault", true)      ): Unit = synchronized {     stop(stopSparkContext, false)   } 真正好的停止一個Spark Streaming應用程式,應該用另一個stop:   def stop(stopSparkContext: Boolean, stopGracefully: Boolean): Unit = {     var shutdownHookRefToRemove: AnyRef = null     if (AsynchronousListenerBus.withinListenerThread.value) {       throw new SparkException("Cannot stop StreamingContext within listener thread of" +         " AsynchronousListenerBus")     }     synchronized {       try {         state match {           case INITIALIZED =>             logWarning("StreamingContext has not been started yet")           case STOPPED =>             logWarning("StreamingContext has already been stopped")           case ACTIVE =>             scheduler.stop(stopGracefully)             // Removing the streamingSource to de-register the metrics on stop()             env.metricsSystem.removeSource(streamingSource)             uiTab.foreach(_.detach())             StreamingContext.setActiveContext(null)             waiter.notifyStop()             if (shutdownHookRef != null) {               shutdownHookRefToRemove = shutdownHookRef               shutdownHookRef = null             }             logInfo("StreamingContext stopped successfully")         }       } finally {         // The state should always be Stopped after calling `stop()`, even if we haven't started yet         state = STOPPED       }     }     if (shutdownHookRefToRemove != null) {       ShutdownHookManager.removeShutdownHook(shutdownHookRefToRemove)     }     // Even if we have already stopped, we still need to attempt to stop the SparkContext because     // a user might stop(stopSparkContext = false) and then call stop(stopSparkContext = true).     if (stopSparkContext) sc.stop()   } stopGracefully引數預設是false,生產環境應該設定為 true,具體做法是配置檔案中把spark.streaming.stopGeacefullyOnShutdown設定為true,這樣能保證已執行的程式執行完再停止,以保證資料處理的完整。 Spark Streaming程式是怎麼做到的呢?StreamingContext.stopShutDown呼叫了上面的stop。 StreamingContext.stopShutDown:   private def stopOnShutdown(): Unit = {     val stopGracefully = conf.getBoolean("spark.streaming.stopGracefullyOnShutdown", false)     logInfo(s"Invoking stop(stopGracefully=$stopGracefully) from shutdown hook")     // Do not stop SparkContext, let its own shutdown hook stop it stop(stopSparkContext = false, stopGracefully = stopGracefully)   } 在StreamingContext.start中,會加一個hook來呼叫stopShutDown: StreamingContext.start:
  def start(): Unit = synchronized {     state match {       case INITIALIZED =>         startSite.set(DStream.getCreationSite())         StreamingContext.ACTIVATION_LOCK.synchronized {           StreamingContext.assertNoOtherContextIsActive()           try {             validate()             // Start the streaming scheduler in a new thread, so that thread local properties             // like call sites and job groups can be reset without affecting those of the             // current thread.             ThreadUtils.runInNewThread("streaming-start") {               sparkContext.setCallSite(startSite.get)               sparkContext.clearJobGroup()               sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")               scheduler.start()             }             state = StreamingContextState.ACTIVE           } catch {             case NonFatal(e) =>               logError("Error starting the context, marking it as stopped", e)               scheduler.stop(false)               state = StreamingContextState.STOPPED               throw e           }           StreamingContext.setActiveContext(this)         }         shutdownHookRef = ShutdownHookManager.addShutdownHook(           StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)         // Registering Streaming Metrics at the start of the StreamingContext         assert(env.metricsSystem != null)         env.metricsSystem.registerSource(streamingSource)         uiTab.foreach(_.attach())         logInfo("StreamingContext started")       case ACTIVE =>         logWarning("StreamingContext has already been started")       case STOPPED =>         throw new IllegalStateException("StreamingContext has already been stopped")     }   } 在StreamingContext啟動時,就用了鉤子,定義了在shutdown時必須呼叫有stopGracefully引數的stop方法。