1. 程式人生 > >Spark定製班第9課:Spark Streaming原始碼解讀之Receiver在Driver的精妙實現全生命週期徹底研究和思考

Spark定製班第9課:Spark Streaming原始碼解讀之Receiver在Driver的精妙實現全生命週期徹底研究和思考

本期內容: 1. Receiver啟動的方式設想 2. Receiver啟動原始碼徹底分析 1. Receiver啟動的方式設想   Spark Streaming是個執行在Spark Core上的應用程式。這個應用程式既要接收資料,還要處理資料,這些都是在分散式的叢集中進行的,應該啟動多個Job,讓它們分工並能協調。Receiver的工作是接收資料,應該是用Spark Core中的Job來實現。   Receiver啟動的設計,還要解決以下問題:   1. 一個Executor上啟動多個Receiver、而其它Executor卻空閒的負載不均衡問題;   2. Receiver啟動異常導致整個Spark Streaming應用程式失敗的問題。 2. Receiver啟動原始碼徹底分析
  Spark Streaming的應用程式要處理流資料,肯定是在開始階段就要做好接收資料的準備。   Spark Streaming的應用程式程式碼定義DStream時,會定義一個或多個InputDStream。每個InputDStream分別對應有一個Receiver。   Receiver啟動全生命週期主流程圖如下: 

  Receiver的啟動,是在ssc.start()中。   剖析一下StreamingContext的start():
/**  * Start the execution of the streams.  *  * @throws IllegalStateException if the StreamingContext is already stopped.
 */ def start(): Unit = synchronized {   state match {     case INITIALIZED =>       startSite.set(DStream.getCreationSite())       StreamingContext.ACTIVATION_LOCK.synchronized {         StreamingContext.assertNoOtherContextIsActive()         try {           validate()           // Start the streaming scheduler in a new thread, so that thread local properties
          // like call sites and job groups can be reset without affecting those of the           // current thread.           ThreadUtils.runInNewThread("streaming-start") {             sparkContext.setCallSite(startSite.get)             sparkContext.clearJobGroup()             sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false") // 啟動子執行緒,一方面為了本地初始化工作,另外一方面是不要阻塞主執行緒。 scheduler.start()           }           state = StreamingContextState.ACTIVE         } catch {           case NonFatal(e) =>             logError("Error starting the context, marking it as stopped", e)             scheduler.stop(false)             state = StreamingContextState.STOPPED             throw e         }         StreamingContext.setActiveContext(this)       }       shutdownHookRef = ShutdownHookManager.addShutdownHook(         StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)       // Registering Streaming Metrics at the start of the StreamingContextassert(env.metricsSystem != null)       env.metricsSystem.registerSource(streamingSource)       uiTab.foreach(_.attach())       logInfo("StreamingContext started")     case ACTIVE =>       logWarning("StreamingContext has already been started")     case STOPPED =>       thrownew IllegalStateException("StreamingContext has already been stopped")   } }   而在JobScheduler的start方法中ReceiverTracker的start方法被呼叫,Receiver就啟動了。   JobScheduler的start:
def start(): Unit = synchronized {   if (eventLoop != null) return // scheduler has already been started   logDebug("Starting JobScheduler")   eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {     override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)     override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)   }   eventLoop.start()   // attach rate controllers of input streams to receive batch completion updates   for {     inputDStream <- ssc.graph.getInputStreams     rateController <- inputDStream.rateController   } ssc.addStreamingListener(rateController)   listenerBus.start(ssc.sparkContext)   receiverTracker = new ReceiverTracker(ssc)   inputInfoTracker = new InputInfoTracker(ssc) //啟動receiverTracker receiverTracker.start()   jobGenerator.start()   logInfo("Started JobScheduler") }   ReceiverTracker的start方法啟動RPC訊息通訊體,為啥呢?因為ReceiverTracker會監控整個叢集中的Receiver,Receiver轉過來要向ReceiverTrackerEndpoint彙報自己的狀態,接收的資料,包括生命週期等資訊。   ReceiverTracker.start:
/** Start the endpoint and receiver execution thread. */ def start(): Unit = synchronized {   if (isTrackerStarted) {     thrownew SparkException("ReceiverTracker already started")   } // Receiver的啟動是依據輸入資料流的。 if (!receiverInputStreams.isEmpty) { endpoint = ssc.env.rpcEnv.setupEndpoint(       "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))     if (!skipReceiverLaunch) launchReceivers()     logInfo("ReceiverTracker started")     trackerState = Started   } }   基於ReceiverInputDStream(是在Driver端)來獲得具體的Receivers例項,然後再把他們分佈到Worker節點上。一個ReceiverInputDStream只對應一個Receiver。   ReceiverTracker.launchReceivers:
/**  * Get the receivers from the ReceiverInputDStreams, distributes them to the  * worker nodes as a parallel collection, and runs them.  */ private def launchReceivers(): Unit = {   val receivers = receiverInputStreams.map(nis => { // 一個數據輸入來源(receiverInputDStream)只對應一個Receiver val rcvr = nis.getReceiver()     rcvr.setReceiverId(nis.id)     rcvr   }) runDummySparkJob()   logInfo("Starting " + receivers.length + " receivers") // 此時的endpoint就是上面程式碼中在ReceiverTracker的start方法中構造的ReceiverTrackerEndpoint endpoint.send(StartAllReceivers(receivers)) }   先看其中的runDummySparkJob()。   runDummySparkJob()是為了確保所有節點活著,而且避免所有的receivers集中在一個節點上。   ReceiverTracker.runDummySparkJob():
/**  * Run the dummy Spark job to ensure that all slaves have registered. This avoids all the  * receivers to be scheduled on the same node.  *  * TODO Should poll the executor number and wait for executors according to  * "spark.scheduler.minRegisteredResourcesRatio"and  * "spark.scheduler.maxRegisteredResourcesWaitingTime" rather than running a dummy job.  */ private def runDummySparkJob(): Unit = {   if (!ssc.sparkContext.isLocal) {     ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()   }   assert(getExecutors.nonEmpty) }   再回去看ReceiverTracker.launchReceivers()中的getReceiver()。   ReceiverInputDStream.getReceiver():
/**  * Gets the receiver object that will be sent to the worker nodes  * to receive data. This method needs to defined by any specific implementation  * of a ReceiverInputDStream.  */ def getReceiver(): Receiver[T] //返回的是Receiver物件   ReceiverInputDStream的getReceiver()方法返回Receiver物件。 該方法實際上要靠ReceiverInputDStream的子類實現。   相應的,ReceiverInputDStream的子類中必須要實現這個getReceiver()方法。ReceiverInputDStream的子類還必須定義自己對應的Receiver子類,因為這個Receiver子類會在getReceiver()方法中用來建立這個Receiver子類的物件。   根據繼承關係,這裡看一下ReceiverInputDStream的子類SocketInputDStream中的getReceiver方法。   SocketInputDStream.getReceiver:
def getReceiver(): Receiver[T] = {     new SocketReceiver(host, port, bytesToObjects, storageLevel)   } }   SocketInputDStream中還定義了相應的Receiver子類SocketReceiver。SocketReceiver類中還必須定義onStart方法。
  onStart方法會啟動後臺執行緒,呼叫receive方法。 private[streaming] class SocketReceiver[T: ClassTag](     host: String,     port: Int,     bytesToObjects: InputStream => Iterator[T],     storageLevel: StorageLevel   ) extends Receiver[T](storageLevel) with Logging {   再回到ReceiverTracker.launchReceivers()中,看最後的程式碼 endpoint.send(StartAllReceivers(receivers))。這個程式碼給ReceiverTrackerEndpoint物件傳送了StartAllReceivers訊息,ReceiverTrackerEndpoint物件接收後所做的處理在ReceiverTrackerEndpoint.receive中。   ReceiverTracker.ReceiverTrackerEndpoint.receive: /** RpcEndpoint to receive messages from the receivers. */ private class ReceiverTrackerEndpoint(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint { private val submitJobThreadPool = ExecutionContext.fromExecutorService(     ThreadUtils.newDaemonCachedThreadPool("submit-job-thread-pool"))   privateval walBatchingThreadPool = ExecutionContext.fromExecutorService(     ThreadUtils.newDaemonCachedThreadPool("wal-batching-thread-pool"))   @volatile private var active: Boolean = true   override def receive: PartialFunction[Any, Unit] = {     // Local messages     case StartAllReceivers(receivers) => // schedulingPolicy排程策略 // receivers就是要啟動的receiver // getExecutors獲得叢集中的Executors的列表 // scheduleReceivers就可以確定receiver可以執行在哪些Executor上       val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors)       for (receiver <- receivers) { // scheduledLocations根據receiver的Id就找到了當前那些Executors可以執行      Receiverval executors = scheduledLocations(receiver.streamId)         updateReceiverScheduledExecutors(receiver.streamId, executors)         receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation // 上述程式碼之後要啟動的Receiver確定了,具體Receiver執行在哪些Executors上也確定了。 // 迴圈receivers,每次將一個receiver傳入過去。 startReceiver(receiver, executors)       } // 用於接收RestartReceiver訊息,重新啟動Receiver.     case RestartReceiver(receiver) =>       // Old scheduled executors minus the ones that are not active any more // 如果Receiver失敗的話,從可選列表中減去。 // 剛在排程為Receiver分配給哪個Executor的時候會有一些列可選的Executor列表       val oldScheduledExecutors = getStoredScheduledExecutors(receiver.streamId) // 重新獲取Executors      val scheduledLocations = if (oldScheduledExecutors.nonEmpty) {           // Try global scheduling again           oldScheduledExecutors         } else { // 如果可選的Executor使用完了,則會重新執行rescheduleReceiver重新獲取Executor.           val oldReceiverInfo = receiverTrackingInfos(receiver.streamId)           // Clear "scheduledLocations" to indicate we are going to do local scheduling           val newReceiverInfo = oldReceiverInfo.copy(             state = ReceiverState.INACTIVE, scheduledLocations = None)           receiverTrackingInfos(receiver.streamId) = newReceiverInfo           schedulingPolicy.rescheduleReceiver(             receiver.streamId,             receiver.preferredLocation,             receiverTrackingInfos,             getExecutors)         }       // Assume there is one receiver restarting at one time, so we don't need to update       // receiverTrackingInfos // 重複呼叫startReceiver startReceiver(receiver, scheduledLocations)     case c: CleanupOldBlocks =>       receiverTrackingInfos.values.flatMap(_.endpoint).foreach(_.send(c))     case UpdateReceiverRateLimit(streamUID, newRate) =>       for (info <- receiverTrackingInfos.get(streamUID); eP <- info.endpoint) {         eP.send(UpdateRateLimit(newRate))       }     // Remote messagescase ReportError(streamId, message, error) =>       reportError(streamId, message, error)   }   從註釋中可以看到,Spark Streaming指定receiver在哪些Executors上執行,而不是基於Spark Core中的Task來指定。   Spark使用submitJob的方式啟動Receiver,而在應用程式執行的時候會有很多Receiver,這個時候是啟動一個Receiver呢,還是把所有的Receiver通過這一個Job啟動?    在ReceiverTracker的receive方法中startReceiver方法第一個引數就是receiver,從實現中可以看出for迴圈不斷取出receiver,然後呼叫startReceiver。由此就可以得出一個Job只啟動一個Receiver。   如果Receiver啟動失敗,此時並不會認為是作業失敗,會重新發訊息給ReceiverTrackerEndpoint重新啟動Receiver,這樣也就確保了Receivers一定會被啟動,這樣就不會像Task啟動Receiver的話如果失敗受重試次數的影響。   ReceiverTracker.startReceiver:     /**      * Start a receiver along with its scheduled executors      */     private def startReceiver(         receiver: Receiver[_],         // scheduledLocations指定的是在具體的那臺物理機器上執行。         scheduledLocations: Seq[TaskLocation]): Unit = {       // 判斷下Receiver的狀態是否正常。       def shouldStartReceiver: Boolean = {         // It's okay to start when trackerState is Initialized or Started         !(isTrackerStopping || isTrackerStopped)       }       val receiverId = receiver.streamId       if (!shouldStartReceiver) {         // 如果不需要啟動Receiver則會呼叫         onReceiverJobFinish(receiverId)         return       }       val checkpointDirOption = Option(ssc.checkpointDir)       val serializableHadoopConf =         new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration)       // startReceiverFunc封裝了在worker上啟動receiver的動作。       // Function to start the receiver on the worker node       val startReceiverFunc: Iterator[Receiver[_]] => Unit =         (iterator: Iterator[Receiver[_]]) => {           if (!iterator.hasNext) {             throw new SparkException(               "Could not start receiver as object not found.")           }           if (TaskContext.get().attemptNumber() == 0) {             val receiver = iterator.next()             assert(iterator.hasNext == false)             // ReceiverSupervisorImpl是Receiver的監控器,同時負責資料的寫等操作。             val supervisor = new ReceiverSupervisorImpl(               receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption) supervisor.start()             supervisor.awaitTermination()           } else { // 如果你想重新啟動receiver的話,你需要重新完成上面的排程,重新schedule,而不是Task重試。             // It's restarted by TaskScheduler, but we want to reschedule it again. So exit it.           }         }       // Create the RDD using the scheduledLocations to run the receiver in a Spark job       val receiverRDD: RDD[Receiver[_]] =         if (scheduledLocations.isEmpty) {           ssc.sc.makeRDD(Seq(receiver), 1)         } else {           val preferredLocations = scheduledLocations.map(_.toString).distinct           ssc.sc.makeRDD(Seq(receiver -> preferredLocations))         }       // receiverId可以看出,receiver只有一個       receiverRDD.setName(s"Receiver $receiverId")       ssc.sparkContext.setJobDescription(s"Streaming job running receiver $receiverId")       ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite()))       // 每個Receiver的啟動都會觸發一個Job,而不是一個作業的Task去啟動所有的Receiver.       // 應用程式一般會有很多Receiver,       // 呼叫SparkContext的submitJob,為了啟動Receiver,啟動了Spark一個作業。       val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](         receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ())       // We will keep restarting the receiver job until ReceiverTracker is stopped       future.onComplete {         case Success(_) =>           if (!shouldStartReceiver) {             onReceiverJobFinish(receiverId)           } else {             logInfo(s"Restarting Receiver $receiverId")             self.send(RestartReceiver(receiver))           }         case Failure(e) =>           if (!shouldStartReceiver) {             onReceiverJobFinish(receiverId)           } else {             logError("Receiver has been stopped. Try to restart it.", e)             logInfo(s"Restarting Receiver $receiverId")             self.send(RestartReceiver(receiver))           }       // 使用執行緒池的方式提交Job,這樣的好處是可以併發的啟動Receiver。       }(submitJobThreadPool)       logInfo(s"Receiver ${receiver.streamId} started")     }   當Receiver啟動失敗的話,就會觸發ReceiverTrackEndpoint重新啟動一個Spark Job去啟動Receiver. /**  * This message will trigger ReceiverTrackerEndpoint to restart a Spark job for the receiver.  */ private[streaming] case class RestartReceiver(receiver: Receiver[_]) extends ReceiverTrackerLocalMessage // 當Receiver關閉的話,並不需要重新啟動Spark Job. /**  * Call when a receiver is terminated. It means we won't restart its Spark job.  */ private def onReceiverJobFinish(receiverId: Int): Unit = {   receiverJobExitLatch.countDown() // 使用foreach將receiver從receiverTrackingInfo中去掉。   receiverTrackingInfos.remove(receiverId).foreach { receiverTrackingInfo =>     if (receiverTrackingInfo.state == ReceiverState.ACTIVE) {       logWarning(s"Receiver $receiverId exited but didn't deregister")     }   } }   回頭再看ReceiverTracker.startReceiver中的程式碼supervisor.start()。在子類ReceiverSupervisorImpl中並沒有start方法,因此呼叫的是父類ReceiverSupervisor的start方法。   ReceiverSupervisor.start:
/** Start the supervisor */ def start() { onStart() // 具體實現是子類實現的。 startReceiver() }