1. 程式人生 > >Spark學習之15:Spark Streaming執行流程(1)

Spark學習之15:Spark Streaming執行流程(1)

DStream的所有轉換和計算操作最終會基於該雜湊表中的RDD進行。

1. Receiver啟動

流程如下:
Receiver啟動分為兩個部分,一是在Driver端初始化ReceiverTracker等,二是在將所有的Receiver封裝成RDD,併發送的Executor執行。

1.1. StreamingContext.start

該方法是流程執行入口:
  def start(): Unit = synchronized {
    if (state == Started) {
      throw new SparkException("StreamingContext has already been started")
    }
    if (state == Stopped) {
      throw new SparkException("StreamingContext has already been stopped")
    }
    validate()
    sparkContext.setCallSite(DStream.getCreationSite())
    scheduler.start()
    state = Started
  }
(1)檢查StreamingContext狀態; (2)啟動JobScheduler。

1.2. JobScheduler.start

  def start(): Unit = synchronized {
    if (eventActor != null) return // scheduler has already been started
    logDebug("Starting JobScheduler")
    eventActor = ssc.env.actorSystem.actorOf(Props(new Actor {
      def receive = {
        case event: JobSchedulerEvent => processEvent(event)
      }
    }), "JobScheduler")
    listenerBus.start()
    receiverTracker = new ReceiverTracker(ssc)
    receiverTracker.start()
    jobGenerator.start()
    logInfo("Started JobScheduler")
  }
(1)建立匿名Actor用於處理JobSchedulerEvent訊息; (2)建立並啟動ReceiverTracker; (3)啟動JobGenerator。

1.3. ReceiverTracker

1.3.1 ReceiverTracker初始化

private[streaming]
class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false) extends Logging {
  private val receiverInputStreams = ssc.graph.getReceiverInputStreams()
  private val receiverInputStreamIds = receiverInputStreams.map { _.id }
  private val receiverExecutor = new ReceiverLauncher()
  private val receiverInfo = new HashMap[Int, ReceiverInfo] with SynchronizedMap[Int, ReceiverInfo]
  private val receivedBlockTracker = new ReceivedBlockTracker(
    ssc.sparkContext.conf,
    ssc.sparkContext.hadoopConfiguration,
    receiverInputStreamIds,
    ssc.scheduler.clock,
    Option(ssc.checkpointDir)
  )
  private val listenerBus = ssc.scheduler.listenerBus
  // actor is created when generator starts.
  // This not being null means the tracker has been started and not stopped
  private var actor: ActorRef = null
(1)從DStreamGraph中讀取所有的ReceiverInputDStream;DStreamGraph儲存了所有的InputDStream和output DStream; (2)建立ReceiverLauncher物件,ReceiverTracker的內部類,用於啟動Receiver; (3)建立ReceivedBlockTracker物件,用於儲存Receiver建立好的Block資料描述資訊,具體的Block資料儲存在Executor的BlockManager中;其功能類似Driver端的BlockManager。

1.3.2. ReceiverTracker.start

  def start() = synchronized {
    if (actor != null) {
      throw new SparkException("ReceiverTracker already started")
    }
    if (!receiverInputStreams.isEmpty) {
      actor = ssc.env.actorSystem.actorOf(Props(new ReceiverTrackerActor),
        "ReceiverTracker")
      if (!skipReceiverLaunch) receiverExecutor.start()
      logInfo("ReceiverTracker started")
    }
  }
(1)建立ReceiverTrackerActor物件,內部類,處理Receiver的註冊及Receiver建立的Block資訊; (2)啟動ReceiverLauncher;ReceiverLauncher物件負責建立執行緒,並在執行緒中呼叫自身的startReceivers方法來啟動所有的Receiver。

1.3.3. ReceiverLauncher.startReceivers

    private def startReceivers() {
      val receivers = receiverInputStreams.map(nis => {
        val rcvr = nis.getReceiver()
        rcvr.setReceiverId(nis.id)
        rcvr
      })
      // Right now, we only honor preferences if all receivers have them
      val hasLocationPreferences = receivers.map(_.preferredLocation.isDefined).reduce(_ && _)
      // Create the parallel collection of receivers to distributed them on the worker nodes
      val tempRDD =
        if (hasLocationPreferences) {
          val receiversWithPreferences = receivers.map(r => (r, Seq(r.preferredLocation.get)))
          ssc.sc.makeRDD[Receiver[_]](receiversWithPreferences)
        } else {
          ssc.sc.makeRDD(receivers, receivers.size)
        }
      val checkpointDirOption = Option(ssc.checkpointDir)
      val serializableHadoopConf = new SerializableWritable(ssc.sparkContext.hadoopConfiguration)
      // Function to start the receiver on the worker node
      val startReceiver = (iterator: Iterator[Receiver[_]]) => {
        if (!iterator.hasNext) {
          throw new SparkException(
            "Could not start receiver as object not found.")
        }
        val receiver = iterator.next()
        val supervisor = new ReceiverSupervisorImpl(
          receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
        supervisor.start()
        supervisor.awaitTermination()
      }
      // Run the dummy Spark job to ensure that all slaves have registered.
      // This avoids all the receivers to be scheduled on the same node.
      if (!ssc.sparkContext.isLocal) {
        ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()
      }
      // Distribute the receivers and start them
      logInfo("Starting " + receivers.length + " receivers")
      running = true
      ssc.sparkContext.runJob(tempRDD, ssc.sparkContext.clean(startReceiver))
      running = false
      logInfo("All of the receivers have been terminated")
    }
(1)遍歷receiverInputStreams中的ReceiverInputDStream,以建立對應的Receiver,SocketInputDStream對應為SocketReceiver; (2)通過makeRDD方法將所有的Receiver封裝成RDD,分割槽數等於Receiver數量,每個Receiver將會佔用一個Task,即一個core; (3)建立startReceiver函式物件,將在Executor端執行; (4)基於Receiver的RDD提交Job,每個Receiver將作為一個Task在Executor上執行。 Receiver類圖關係:
到此,Driver端的啟動準備工作就結束了,接下來開始Executor端的執行。

1.4. Receiver Task

每個Receiver都會作為一個Task在Executor端執行,執行的具體函式就是上面建立的startReceiver函式物件。程式碼如下:
      // Function to start the receiver on the worker node
      val startReceiver = (iterator: Iterator[Receiver[_]]) => {
        if (!iterator.hasNext) {
          throw new SparkException(
            "Could not start receiver as object not found.")
        }
        val receiver = iterator.next()
        val supervisor = new ReceiverSupervisorImpl(
          receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
        supervisor.start()
        supervisor.awaitTermination()
      }
每個Receiver對應一個ReceiverSupervisorImpl,負責監管Receiver。

1.4.1. ReceiverSupervisor.start

  /** Start the supervisor */
  def start() {
    onStart()
    startReceiver()
  }
(1)onStart方法(ReceiverSupervisorImpl)用於啟動BlockGenerator(呼叫其start方法); (2)startReceiver則啟動對應的Receiver。 雖然是先啟動BlockGenerator,但資料由startReceiver產生,這裡先介紹startReceiver方法。

1.5. ReceiverSupervisor.startReceiver

  def startReceiver(): Unit = synchronized {
    try {
      logInfo("Starting receiver")
      receiver.onStart()
      logInfo("Called receiver onStart")
      onReceiverStart()
      receiverState = Started
    } catch {
      case t: Throwable =>
        stop("Error starting receiver " + streamId, Some(t))
    }
  }
呼叫Receiver的onStart方法,啟動Receiver資料接收流程。 下面以SocketReceiver為例,描述資料接收過程。

1.5.1. SocketReceiver.onStart

  def onStart() {
    // Start the thread that receives data over a connection
    new Thread("Socket Receiver") {
      setDaemon(true)
      override def run() { receive() }
    }.start()
  }
建立執行緒並啟動,線上程中執行receive方法。

1.5.2. SocketReceiver.receive

  def receive() {
    var socket: Socket = null
    try {
      logInfo("Connecting to " + host + ":" + port)
      socket = new Socket(host, port)
      logInfo("Connected to " + host + ":" + port)
      val iterator = bytesToObjects(socket.getInputStream())
      while(!isStopped && iterator.hasNext) {
        store(iterator.next)
      }
      logInfo("Stopped receiving")
      restart("Retrying connecting to " + host + ":" + port)
    } catch {
      ...
    } finally {
      if (socket != null) {
        socket.close()
        logInfo("Closed socket to " + host + ":" + port)
      }
    }
  }
(1)建立TCP連線; (2)迴圈讀取資料,並呼叫store方法儲存資料。

1.5.3. Receiver.store

  def store(dataItem: T) {
    executor.pushSingle(dataItem)
  }
executor是ReceiverSupervisorImpl物件,在ReceiverSupervisor構造體中設定。 ReceiverSupervisorImpl.pushSingle程式碼如下:
  def pushSingle(data: Any) {
    blockGenerator.addData(data)
  }
BlockGenerator.addData方法如下:
  def addData (data: Any): Unit = synchronized {
    waitToPush()
    currentBuffer += data
  }
addData方法將data資料放入currentBuffer陣列,該陣列由BlockGenerator的定時器執行緒定時呼叫,把其中的內容封裝成Block。

1.6 BlockGenerator

1.6.1. BlockGenerator初始化及啟動

private[streaming] class BlockGenerator(
    listener: BlockGeneratorListener,
    receiverId: Int,
    conf: SparkConf
  ) extends RateLimiter(conf) with Logging {
  private case class Block(id: StreamBlockId, buffer: ArrayBuffer[Any])
  private val clock = new SystemClock()
  private val blockInterval = conf.getLong("spark.streaming.blockInterval", 200)
  private val blockIntervalTimer =
    new RecurringTimer(clock, blockInterval, updateCurrentBuffer, "BlockGenerator")
  private val blockQueueSize = conf.getInt("spark.streaming.blockQueueSize", 10)
  private val blocksForPushing = new ArrayBlockingQueue[Block](blockQueueSize)
  private val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } }
  @volatile private var currentBuffer = new ArrayBuffer[Any]
  @volatile private var stopped = false
  /** Start block generating and pushing threads. */
  def start() {
    blockIntervalTimer.start()
    blockPushingThread.start()
    logInfo("Started BlockGenerator")
  }
(1)建立RecurringTimer物件,其內部包含一個執行緒,啟動後將週期性的呼叫updateCurrentBuffer方法; (2)建立blockPushingThread執行緒,啟動後將呼叫keepPushingBlocks方法; (3)start方法負責啟動上述的兩個執行緒。

1.6.2. BlockGenerator.updateCurrentBuffer

  private def updateCurrentBuffer(time: Long): Unit = synchronized {
    try {
      val newBlockBuffer = currentBuffer
      currentBuffer = new ArrayBuffer[Any]
      if (newBlockBuffer.size > 0) {
        val blockId = StreamBlockId(receiverId, time - blockInterval)
        val newBlock = new Block(blockId, newBlockBuffer)
        listener.onGenerateBlock(blockId)
        blocksForPushing.put(newBlock)  // put is blocking when queue is full
        logDebug("Last element in " + blockId + " is " + newBlockBuffer.last)
      }
    } catch {
      case ie: InterruptedException =>
        logInfo("Block updating timer thread was interrupted")
      case e: Exception =>
        reportError("Error in block updating thread", e)
    }
  }
(1)這個一個同步方法; (2)建立新Buffer與currentBuffer進行交換,currentBuffer中儲存的是Receiver接收到的資料; (3)將currentBuffer中的所有資料封裝成Block物件,blockId名稱格式:input-${streamId}-${uniqueId},其中streamId表示Receiver的編號,uniqueId為時間戳; (4)將Block物件放入blocksForPushing佇列。

1.6.3 BlockGenerator.keepPushingBlocks

  private def keepPushingBlocks() {
    logInfo("Started block pushing thread")
    try {
      while(!stopped) {
        Option(blocksForPushing.poll(100, TimeUnit.MILLISECONDS)) match {
          case Some(block) => pushBlock(block)
          case None =>
        }
      }
      ...
    } catch {
      ...
    }
  }
blocksForPushing取出一個Block,然後呼叫BlockGenerator.pushBlock方法。
  private def pushBlock(block: Block) {
    listener.onPushBlock(block.id, block.buffer)
    logInfo("Pushed block " + block.id)
  }