1. 程式人生 > >SparkStreaming的WordCount示例及原始碼分析(一)

SparkStreaming的WordCount示例及原始碼分析(一)

一.程式碼示例

object WordCount {
    def main(args: Array[String]) {
        val conf = new SparkConf().setAppName("WordCount").setMaster("local[2]")
        
        //設定batchDuration時間間隔來控制Job生成的頻率並且建立Spark Streaming執行的入口
        val ssc = new StreamingContext(conf, Seconds(5)) //5秒間隔
        val lines = ssc.socketTextStream(
            "127.0.0.1",
            6666,
            StorageLevel.MEMORY_AND_DISK_SER) // 伺服器地址,埠,序列化方案
        val words = lines.flatMap(_.split(","))
        val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
        wordCounts.print()
        
        //真正的排程開始
        ssc.start()
        ssc.awaitTermination()
    }
}

二. 原始碼分析

       建立StreamingContext:
  /**
   * Create a StreamingContext by providing the configuration necessary for a new SparkContext.
   * @param conf a org.apache.spark.SparkConf object specifying Spark parameters
   * @param batchDuration the time interval at which streaming data will be divided into batches
   */
  def this(conf: SparkConf, batchDuration: Duration) = {
    this(StreamingContext.createNewSparkContext(conf), null, batchDuration)
  }
       StreamingContext類結構如下:
class StreamingContext private[streaming] (
    sc_ : SparkContext,
    cp_ : Checkpoint,
    batchDur_ : Duration
  )
       資料來源獲取是通過ssc.socketTextStream(…)來獲取的,socketTextStream()會返回一個ReceiverInputDStream物件:
/**
 * Create a input stream from TCP source hostname:port. Data is received using
 * a TCP socket and the receive bytes is interpreted as UTF8 encoded `\n` delimited
 * lines.
 * @param hostname      Hostname to connect to for receiving data
 * @param port          Port to connect to for receiving data
 * @param storageLevel  Storage level to use for storing the received objects
 *                      (default: StorageLevel.MEMORY_AND_DISK_SER_2)
 */
def socketTextStream(
    hostname: String,
    port: Int,
    storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
  ): ReceiverInputDStream[String] = withNamedScope("socket text stream") {
  socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
}
/**
 * Create a input stream from TCP source hostname:port. Data is received using
 * a TCP socket and the receive bytes it interepreted as object using the given
 * converter.
 * @param hostname      Hostname to connect to for receiving data
 * @param port          Port to connect to for receiving data
 * @param converter     Function to convert the byte stream to objects
 * @param storageLevel  Storage level to use for storing the received objects
 * @tparam T            Type of the objects received (after converting bytes to objects)
 */
def socketStream[T: ClassTag](
    hostname: String,
    port: Int,
    converter: (InputStream) => Iterator[T],
    storageLevel: StorageLevel
  ): ReceiverInputDStream[T] = {
  new SocketInputDStream[T](this, hostname, port, converter, storageLevel)
}
private[streaming]
class SocketInputDStream[T: ClassTag](
    ssc_ : StreamingContext,
    host: String,
    port: Int,
    bytesToObjects: InputStream => Iterator[T],
    storageLevel: StorageLevel
  ) extends ReceiverInputDStream[T](ssc_) {

  def getReceiver(): Receiver[T] = {
    new SocketReceiver(host, port, bytesToObjects, storageLevel)
  }
}
       ReceiverInputDStream是InputDStream的子類,在InputDStream中:
abstract class InputDStream[T: ClassTag] (ssc_ : StreamingContext)
  extends DStream[T](ssc_) {


  private[streaming] var lastValidTime: Time = null


  ssc.graph.addInputStream(this)
  ...
       可以看出在例項化時會向ssc.graph新增該InputDStream,ssc.graph即DStreamGraph。
final private[streaming] class DStreamGraph extends Serializable with Logging {


  private val inputStreams = new ArrayBuffer[InputDStream[_]]()
  private val outputStreams = new ArrayBuffer[DStream[_]]()
       DStreamGraph的成員inputStreams就是存放InputDStream的資訊。
       接下來是一系列DStream操作:
val words = lines.flatMap(_.split(","))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
       flatMap()、map()等是Transformations Operations,每個Operations會返回一個對應的DStream,拿map來看:
  /** Return a new DStream by applying a function to all elements of this DStream. */
  def map[U: ClassTag](mapFunc: T => U): DStream[U] = ssc.withScope {
    new MappedDStream(this, context.sparkContext.clean(mapFunc))
  }
       這裡返回了一個MappedDStream,第一個引數是parent DStream,這就是表明DStream會儲存依賴關係。print()是Output Operations操作,會生成一個ForEachDStream並註冊到DStreamGraph中:
    new ForEachDStream(this,
      context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()
  /**
   * Register this streaming as an output stream. This would ensure that RDDs of this
   * DStream will be generated.
   */
  private[streaming] def register(): DStream[T] = {
    ssc.graph.addOutputStream(this)
    this
  }
       在addOutputStream方法中就是將這個ForEachDStream新增到DStreamGraph的成員outputStreams中。

       之後呼叫ssc.start(),在StreamingContext中:
/**
 * Start the execution of the streams.
 *
 * @throws IllegalStateException if the StreamingContext is already stopped.
 */
def start(): Unit = synchronized {
  state match {
    case INITIALIZED =>
      startSite.set(DStream.getCreationSite())
      StreamingContext.ACTIVATION_LOCK.synchronized {
        StreamingContext.assertNoOtherContextIsActive()
        try {
          validate()


          // Start the streaming scheduler in a new thread, so that thread local properties
          // like call sites and job groups can be reset without affecting those of the
          // current thread.
          ThreadUtils.runInNewThread("streaming-start") {
            sparkContext.setCallSite(startSite.get)
            sparkContext.clearJobGroup()
            sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")
            scheduler.start()
          }
          state = StreamingContextState.ACTIVE
        } catch {
          case NonFatal(e) =>
            logError("Error starting the context, marking it as stopped", e)
            scheduler.stop(false)
            state = StreamingContextState.STOPPED
            throw e
        }
        StreamingContext.setActiveContext(this)
      }
      shutdownHookRef = ShutdownHookManager.addShutdownHook(
        StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)
      // Registering Streaming Metrics at the start of the StreamingContext
      assert(env.metricsSystem != null)
      env.metricsSystem.registerSource(streamingSource)
      uiTab.foreach(_.attach())
      logInfo("StreamingContext started")
    case ACTIVE =>
      logWarning("StreamingContext has already been started")
    case STOPPED =>
      throw new IllegalStateException("StreamingContext has already been stopped")
  }
}
       先檢查當前StreamingContext的狀態,為ACTIVE時不允許新的StreamingContext運行了,因為目前Spark還不支援多個SparkContext同時執行。如果是INITIALIZED狀態,則會啟動一個streaming-start執行緒,呼叫scheduler.start()方法,這裡的scheduler定義如下:
private[streaming] val scheduler = new JobScheduler(this)

       在Spark Streaming裡,總體負責動態作業排程的具體類是JobScheduler, treamingContext例項擁有JobScheduler例項,在ssc.start() 開始執行時,會呼叫JobScheduler例項的start()方法。
  def start(): Unit = synchronized {
    if (eventLoop != null) return // scheduler has already been started


    logDebug("Starting JobScheduler")
    eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
      override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)


      override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
    }
    eventLoop.start()


    // attach rate controllers of input streams to receive batch completion updates
    for {
      inputDStream <- ssc.graph.getInputStreams
      rateController <- inputDStream.rateController
    } ssc.addStreamingListener(rateController)


    listenerBus.start(ssc.sparkContext)
    receiverTracker = new ReceiverTracker(ssc)
    inputInfoTracker = new InputInfoTracker(ssc)
    receiverTracker.start()
    jobGenerator.start()
    logInfo("Started JobScheduler")
  }
       JobScheduler是Spark Streaming的Job總排程者。在JobScheduler的start()方法中,會首先建立EventLoop[JobSchedulerEvent]類用來處理各類JobSchedulerEvent。

       JobScheduler有兩個非常重要的成員:JobGenerator和ReceiverTracker。JobScheduler將每個batch的RDD DAG具體生成工作委託給JobGenerator,而將源頭輸入資料的記錄工作委託給ReceiverTracker。