1. 程式人生 > >[spark streaming] ReceiverTracker 資料產生與儲存

[spark streaming] ReceiverTracker 資料產生與儲存

前言

在Spark Streaming裡,總體負責任務的動態排程是JobScheduler,而JobScheduler有兩個很重要的成員:JobGeneratorReceiverTrackerJobGenerator 負責將每個 batch 生成具體的 RDD DAG ,而ReceiverTracker負責資料的來源。

需要在executor上執行的receiver接收資料的InputDStream都需要繼承ReceiverInputDStream,ReceiverInputDStream有一個def getReceiver(): Receiver[T]方法,子類都需要實現這個方法。如KafkaInputDStream

對應KafkaReceiverFlumeInputDStream對應FlumeReceiverTwitterInputDStream對應TwitterReceiver等。

流程概述:

  • ReceiverTracker 啟動,獲取所有InputDStreams對應的receivers
  • 根據排程策略確定每個Receiver的優先位置(能在哪些executor上執行)
  • 將Receiver包裝成RDD並通過sc提交一個job,執行函式為建立supervisor例項,呼叫start()方法,也即呼叫了Receiver的onStart()方法
  • Receiver的onStart不斷接收資料,通過store方法最終呼叫supervisor來儲存塊
  • 儲存後通知ReceiverTracker此Block的資訊
  • ReceiverTracker將Block訊息交給ReceivedBlockTracker管理

啟動 Receiver

先看看receiverTracker的啟動過程:

ssc.start()
    scheduler.start()
        receiverTracker.start()
        jobGenerator.start()
----
 def start(): Unit = synchronized {
    if (isTrackerStarted) {
      throw new SparkException("ReceiverTracker already started"
) } if (!receiverInputStreams.isEmpty) { endpoint = ssc.env.rpcEnv.setupEndpoint( "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv)) if (!skipReceiverLaunch) launchReceivers() logInfo("ReceiverTracker started") trackerState = Started } }

在start方法中先建立了ReceiverTracker的Endpoint,接著呼叫launchReceivers()方法來啟動Recivers:

 private def launchReceivers(): Unit = {
    val receivers = receiverInputStreams.map { nis =>
      val rcvr = nis.getReceiver()
      rcvr.setReceiverId(nis.id)
      rcvr
    }

    runDummySparkJob()

    logInfo("Starting " + receivers.length + " receivers")
    endpoint.send(StartAllReceivers(receivers))
  }

遍歷所有的InputStream,並得到所對應的Receiver集合receivers。並向ReceiverTrackerEndpoint傳送了StartAllReceivers訊息,看看接收到該訊息後是如何處理的:

 case StartAllReceivers(receivers) =>
        val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors)
        for (receiver <- receivers) {
          val executors = scheduledLocations(receiver.streamId)
          updateReceiverScheduledExecutors(receiver.streamId, executors)
          receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation
          startReceiver(receiver, executors)
        }

通過排程策略來計算決定每個receiver的一組優先位置,即一個Receiver改在哪個executor節點上啟動,排程的主要原則是:

  • 滿足Receiver的preferredLocation。
  • 其次保證將Receiver分佈的儘量均勻。

接著遍歷所有receivers呼叫了startReceiver(receiver, executors)方法來啟動receiver:

 private def startReceiver(
        receiver: Receiver[_],
        scheduledLocations: Seq[TaskLocation]): Unit = {
      def shouldStartReceiver: Boolean = {
        // It's okay to start when trackerState is Initialized or Started
        !(isTrackerStopping || isTrackerStopped)
      }

      val receiverId = receiver.streamId
      if (!shouldStartReceiver) {
        onReceiverJobFinish(receiverId)
        return
      }

      val checkpointDirOption = Option(ssc.checkpointDir)
      val serializableHadoopConf =
        new SerializableConfiguration(ssc.sparkContext.hadoopConfiguration)

      // Function to start the receiver on the worker node
      val startReceiverFunc: Iterator[Receiver[_]] => Unit =
        (iterator: Iterator[Receiver[_]]) => {
          if (!iterator.hasNext) {
            throw new SparkException(
              "Could not start receiver as object not found.")
          }
          if (TaskContext.get().attemptNumber() == 0) {
            val receiver = iterator.next()
            assert(iterator.hasNext == false)
            val supervisor = new ReceiverSupervisorImpl(
              receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
            supervisor.start()
            supervisor.awaitTermination()
          } else {
            // It's restarted by TaskScheduler, but we want to reschedule it again. So exit it.
          }
        }

      // Create the RDD using the scheduledLocations to run the receiver in a Spark job
      val receiverRDD: RDD[Receiver[_]] =
        if (scheduledLocations.isEmpty) {
          ssc.sc.makeRDD(Seq(receiver), 1)
        } else {
          val preferredLocations = scheduledLocations.map(_.toString).distinct
          ssc.sc.makeRDD(Seq(receiver -> preferredLocations))
        }
      receiverRDD.setName(s"Receiver $receiverId")
      ssc.sparkContext.setJobDescription(s"Streaming job running receiver $receiverId")
      ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite()))

      val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](
        receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ())
      // We will keep restarting the receiver job until ReceiverTracker is stopped
      future.onComplete {
        case Success(_) =>
          if (!shouldStartReceiver) {
            onReceiverJobFinish(receiverId)
          } else {
            logInfo(s"Restarting Receiver $receiverId")
            self.send(RestartReceiver(receiver))
          }
        case Failure(e) =>
          if (!shouldStartReceiver) {
            onReceiverJobFinish(receiverId)
          } else {
            logError("Receiver has been stopped. Try to restart it.", e)
            logInfo(s"Restarting Receiver $receiverId")
            self.send(RestartReceiver(receiver))
          }
      }(ThreadUtils.sameThread)
      logInfo(s"Receiver ${receiver.streamId} started")
    }

注意這裡巧妙的將receiver包裝成了RDD,並把scheduledLocations作為RDD的優先位置locationPrefs。

然後通過sc提交了一個Spark Core Job,執行函式是startReceiverFunc(也就是要在executor上執行的),在該方法中建立一個ReceiverSupervisorImpl物件,並呼叫了start()方法,在該方法中會呼叫 receiver的onStart 後立即返回。

receiver的onStart 方法一般會新建執行緒或執行緒池來接收資料,比如在 KafkaReceiver 中,就新建了執行緒池,線上程池中接收 topics 的資料。

supervisor.start() 返回後,由 supervisor.awaitTermination() 阻塞住執行緒,以讓這個 task 一直不退出,從而可以源源不斷接收資料。

Receiver 資料處理

前面提到receiver的onStart()方法會新建執行緒或執行緒池來接收資料,那接收的資料怎麼處理的呢?都會呼叫receiver的store(),而store方法又呼叫了supervisor的方法。對應的store方法有多種形式:

  • pushSingle: 對應單條小資料,需要通過BlockGenerator聚集多條資料後再成塊的儲存
  • pushArrayBuffer: 對應陣列形式的資料
  • pushIterator: 對應 iterator 形式資料
  • pushBytes: 對應 ByteBuffer 形式的塊資料

除了pushSingle需要通過BlockGenerator將資料聚整合一個塊的時候再儲存,其他方法都是直接成塊儲存。

看看pushSingle是怎麼通過聚集的方式儲存塊的:

def pushSingle(data: Any) {
    defaultBlockGenerator.addData(data)
  }
------
def addData(data: Any): Unit = {
    if (state == Active) {
      waitToPush()
      synchronized {
        if (state == Active) {
          currentBuffer += data
        } else {
          throw new SparkException(
            "Cannot add data as BlockGenerator has not been started or has been stopped")
        }
      }
    } else {
      throw new SparkException(
        "Cannot add data as BlockGenerator has not been started or has been stopped")
    }
  }

這裡的先呼叫waitToPush(),會有rateLimiter檢查速率,防止加入過快,如果過快會block住等到下一秒再新增,一秒能新增的條數受spark.streaming.receiver.maxRate控制,即一個Receiver每秒能新增的條數。
檢查完後會將資料新增到一個變長陣列currentBuffer中。

另外,BlockGenerator被初始化的時候就建立了一個定時器:

private val blockIntervalMs = conf.getTimeAsMs("spark.streaming.blockInterval", "200ms")
  require(blockIntervalMs > 0, s"'spark.streaming.blockInterval' should be a positive value")

  private val blockIntervalTimer =
    new RecurringTimer(clock, blockIntervalMs, updateCurrentBuffer, "BlockGenerator")

定時間隔預設200ms,可通過spark.streaming.blockInterval配置,每次定時執行的是updateCurrentBuffer方法:

private def updateCurrentBuffer(time: Long): Unit = {
    try {
      var newBlock: Block = null
      synchronized {
        if (currentBuffer.nonEmpty) {
          val newBlockBuffer = currentBuffer
          currentBuffer = new ArrayBuffer[Any]
          val blockId = StreamBlockId(receiverId, time - blockIntervalMs)
          listener.onGenerateBlock(blockId)
          newBlock = new Block(blockId, newBlockBuffer)
        }
      }

      if (newBlock != null) {
        blocksForPushing.put(newBlock)  // put is blocking when queue is full
      }
    } catch {
      case ie: InterruptedException =>
        logInfo("Block updating timer thread was interrupted")
      case e: Exception =>
        reportError("Error in block updating thread", e)
    }
  }
  • 將 currentBuffer 賦值給 newBlockBuffer
  • 重新為currentBuffer分配一個新物件,以供儲存新的資料
  • 將currentBuffer封裝為Block後新增到blocksForPushing中,blocksForPushing是一個預設長度為10的Queue,可通過spark.streaming.blockQueueSize配置

BlockGenerator初始化的時候還啟動了一個執行緒來從blocksForPushing佇列中取出Block通過supervisor來儲存塊的:

private val blockPushingThread = new Thread() { override def run() { keepPushingBlocks() } }

supervisor 儲存資料塊

先儲存再向上報告:

#pushAndReportBlock
val blockStoreResult = receivedBlockHandler.storeBlock(blockId, receivedBlock)
logDebug(s"Pushed block $blockId in ${(System.currentTimeMillis - time)} ms")
val numRecords = blockStoreResult.numRecords
val blockInfo = ReceivedBlockInfo(streamId, numRecords, metadataOption, blockStoreResult) trackerEndpoint.askWithRetry[Boolean](AddBlock(blockInfo))

儲存資料塊有對應的receivedBlockHandler,在啟用了WAL(spark.streaming.receiver.writeAheadLog.enable為true)的情況下對應的是WriteAheadLogBasedBlockHandler(啟用了WAL的情況下在應用程式掛掉後可以從WAL恢復資料),未啟用的情況下對應的是BlockManagerBasedBlockHandler。

private val receivedBlockHandler: ReceivedBlockHandler = {
    if (WriteAheadLogUtils.enableReceiverLog(env.conf)) {
      if (checkpointDirOption.isEmpty) {
        throw new SparkException(
          "Cannot enable receiver write-ahead log without checkpoint directory set. " +
            "Please use streamingContext.checkpoint() to set the checkpoint directory. " +
            "See documentation for more details.")
      }
      new WriteAheadLogBasedBlockHandler(env.blockManager, env.serializerManager, receiver.streamId,
        receiver.storageLevel, env.conf, hadoopConf, checkpointDirOption.get)
    } else {
      new BlockManagerBasedBlockHandler(env.blockManager, receiver.storageLevel)
    }

storeBlock方法部分程式碼:

case ArrayBufferBlock(arrayBuffer) =>
    numRecords = Some(arrayBuffer.size.toLong)
    blockManager.putIterator(blockId, arrayBuffer.iterator, storageLevel,tellMaster = true)
case IteratorBlock(iterator) =>
    val countIterator = new CountingIterator(iterator)
    val putResult = blockManager.putIterator(blockId, countIterator, storageLevel,tellMaster = true)
    numRecords = countIterator.count
    putResult
case ByteBufferBlock(byteBuffer) =>
    blockManager.putBytes(blockId, new ChunkedByteBuffer(byteBuffer.duplicate()), storageLevel, tellMaster = true)

兩種handler都是通過blockManager來儲存block到記憶體或者磁碟,儲存的細節可見BlockManager 解析

通知 ReceiverTracker

儲存了block後,接著建立了ReceivedBlockInfo例項,對應該block的一些資訊,包括streamId(一個InputDStream對應一個Receiver,一個Receiver對應一個streamId)、block中資料的條數、storeResult等資訊。

接著將receivedBlockInfo作為引數和ReceiverTracker通訊傳送AddBlock訊息,ReceiverTracker收到訊息後的處理如下:

 case AddBlock(receivedBlockInfo) =>
        if (WriteAheadLogUtils.isBatchingEnabled(ssc.conf, isDriver = true)) {
          walBatchingThreadPool.execute(new Runnable {
            override def run(): Unit = Utils.tryLogNonFatalError {
              if (active) {
                context.reply(addBlock(receivedBlockInfo))
              } else {
                throw new IllegalStateException("ReceiverTracker RpcEndpoint shut down.")
              }
            }
          })
        } else {
          context.reply(addBlock(receivedBlockInfo))
        }

都會呼叫addBlock(receivedBlockInfo)方法:

private def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
    receivedBlockTracker.addBlock(receivedBlockInfo)
  }

ReceiverTracker有個專門管理block的成員receivedBlockTracker,通過addBlock(receivedBlockInfo)來新增block資訊:

def addBlock(receivedBlockInfo: ReceivedBlockInfo): Boolean = {
    try {
      val writeResult = writeToLog(BlockAdditionEvent(receivedBlockInfo))
      if (writeResult) {
        synchronized {
          getReceivedBlockQueue(receivedBlockInfo.streamId) += receivedBlockInfo
        }
        logDebug(s"Stream ${receivedBlockInfo.streamId} received " +
          s"block ${receivedBlockInfo.blockStoreResult.blockId}")
      } else {
        logDebug(s"Failed to acknowledge stream ${receivedBlockInfo.streamId} receiving " +
          s"block ${receivedBlockInfo.blockStoreResult.blockId} in the Write Ahead Log.")
      }
      writeResult
    } catch {
      case NonFatal(e) =>
        logError(s"Error adding block $receivedBlockInfo", e)
        false
    }
  }

若啟用WAL則會先將block資訊以WAL儲存,之後都會將block資訊儲存到streamIdToUnallocatedBlockQueuesmutable.HashMap[Int, ReceivedBlockQueue]中,其中key為InputDStream唯一id,value為已儲存但未分配的block資訊。之後為 batch 分配blocks,會訪問該結構來獲取每個 InputDStream 對應的未消費的 blocks。