1. 程式人生 > >Spark Streaming啟動&DStreamGraph原始碼分析

Spark Streaming啟動&DStreamGraph原始碼分析

  在github上看到一個十分好的總結:https://github.com/lw-lin/CoolplaySpark, 對Spark Streaming整體的設計思想講的算是個人見過十分好的了,看完之後有種原來如此,看完之後對整體的架構有了較為清晰的認識,不過由於篇幅問題,這個專案文件中是偏總結和思想的灌輸,沒有過於追究一些細節內容,本文以及後續將在此基礎上進行原始碼的閱讀,對細節進行更多的研究,剛好最近專案中產生的一些疑問通過閱讀原始碼也得到了很好的解釋,這種感覺真的是不要太美好的呢呢呢!!!強烈建議先看完這個專案文件中的內容,再看原始碼,有種胸有成竹事半功倍之效。
  這裡先從本人最近專案中遇到的一個場景的簡化程式碼開始進入本次的原始碼探索之旅:

val updateFunc = (values: Seq[Int], state: Option[Int]) => {
      if (checkDelTime()) {
        if (values.isEmpty) {
          None
        } else {
          Some(values.sum)
        }
      } else {
        val currentCount = values.sum
        val previousCount = state.getOrElse(0)
        Some(currentCount + previousCount)
      }
    }
    
val newUpdateFunc = (iterator: Iterator[(String, Seq[Int], Option[Int])]) => {
      iterator.flatMap(t => updateFunc(t._2, t._3).map(s => (t._1, s)))
    }
    
val sparkConf = new SparkConf().setAppName("KafkaReceiverWordCount").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf,Seconds(5))

val kafkaParmas = Map[String,Object](
  "bootstrap.servers"->"hadoop001:9092",
  "key.deserializer"->classOf[StringDeserializer],
  "value.deserializer"->classOf[StringDeserializer],
  "group.id"->"test-consumer-group",
  "auto.offset.reset"->"latest",
  "enable.auto.commit"->(false: java.lang.Boolean)
)

val topics = Array("hello_topic","flume_kafka_streaming_topic")
val kafkaStream = KafkaUtils.createDirectStream(ssc,PreferConsistent,Subscribe[String,String](topics,kafkaParmas))
val fDStream  = kafkaStream.map(x => {
  val outputs = "topic = "+x.topic() + "value = " + x.value()
  println("|"+outputs+"|")
  x.value()
})
val wc = fDStream.flatMap(_.split(" ")).map((_,1)).updateStateByKey(newUpdateFunc, new HashPartitioner(ssc.sparkContext.defaultParallelism), true)
wc.foreachRDD(rdd=>{
    ...
})

ssc.start()
ssc.awaitTermination()

  以上程式碼是從kafka消費資料,然後持續統計所有批次的wordcount例子,這一串程式碼中,只有ssc.start()是發起任務的入口,這一行程式碼前的所有程式碼均可以理解為是對應用啟動後,所需要執行的任務進行配置,其中有應用的配置即sparkconf相關的配置,還有就是後續例項化DAG Graph所需靜態模版的配置(這一部分內容請看開篇提供的spark streaming的相關介紹),即我們在程式碼中寫呼叫的map、filter、flatmap、updateStateByKey等等這些運算元,其實是在配置靜態DAG Graph模版,每個批次的資料進行處理的時候,其實就是依據我們編寫程式碼生成的靜態DAG Graph模版來生成當前批次真正的DAG Graph例項,通過這個例項進行從後向前的追溯計算,來得到結果,以上這段話是較為籠統的介紹,理解起來可能不是十分清晰,我們還是直接進入ssc.start()

中來窺探一番其中的神祕~~
  ssc.start()的原始碼如下:

def start(): Unit = synchronized {
    state match {
      case INITIALIZED =>
        startSite.set(DStream.getCreationSite())
        StreamingContext.ACTIVATION_LOCK.synchronized {
          StreamingContext.assertNoOtherContextIsActive()
          try {
            validate()

            ThreadUtils.runInNewThread("streaming-start") {
              sparkContext.setCallSite(startSite.get)
              sparkContext.clearJobGroup()
              sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")
              scheduler.start()
            }
            state = StreamingContextState.ACTIVE
          } catch {
            ...
          }
          StreamingContext.setActiveContext(this)
        }
        ...
      case ACTIVE =>
        logWarning("StreamingContext has already been started")
      case STOPPED =>
        throw new IllegalStateException("StreamingContext has already been stopped")
    }
  }

  初次啟動時,state預設值就是INITIALIZED,進入匹配的程式碼塊中,先進行一些校驗工作忽略,SparkStreamingContext中核心部分主要就是JobSchedulerDStreamGraph,以上程式碼部分是啟動了一個叫streaming-start執行緒,這段程式碼後面的{...}部分其實就相當於java中的Runnable中的run方法中的內容,屬於一個程式碼塊,我們直接看runInNewThread的原始碼:

def runInNewThread[T](
      threadName: String,
      isDaemon: Boolean = true)(body: => T): T = {
    @volatile var exception: Option[Throwable] = None
    @volatile var result: T = null.asInstanceOf[T]

    val thread = new Thread(threadName) {
      override def run(): Unit = {
        try {
          result = body
        } catch {
          case NonFatal(e) =>
            exception = Some(e)
        }
      }
    }
    thread.setDaemon(isDaemon)
    thread.start()
    thread.join()
    ...

  很顯然,核心是通過執行緒呼叫了JobSchedulerstart()方法,而SparkContext中的這個JobScheduler的作用見名知意就是用來做任務排程的,是driver端進行任務排程的東廠大總管!

JobScheduler啟動流程分析

  JobScheduler的內部結構其實相對簡單,我們通過程式碼註釋來進行介紹:

----JobScheduler.scala

//所有待處理job的容器
private val jobSets: java.util.Map[Time, JobSet] = new ConcurrentHashMap[Time, JobSet] 
//能夠同時執行job的併發數設定
private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1)
//執行緒池
private val jobExecutor =
ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor")
//負責job生成,核心類,JobScheduler是大總管負責排程,這個jobGenerator就是小太監,負責實施總管釋出的命令。
private val jobGenerator = new JobGenerator(this)
//時鐘
val clock = jobGenerator.clock
//生產消費者模式,進行訊息釋出,提供給UI相關處理資料,不在本次討論範圍內,後續再進行研究
val listenerBus = new StreamingListenerBus()

// These two are created only when scheduler starts.
// eventLoop not being null means the scheduler has been started and not stopped
//資料處理追蹤,在DAG Graph中有所謂的input stream的DStream,這個類是用來記錄這些input stream接收資料的處理情況
var receiverTracker: ReceiverTracker = null
// A tracker to track all the input stream information as well as processed record number
// 這個是追蹤整體輸入資料處理情況的
var inputInfoTracker: InputInfoTracker = null

//進行事件接收和處理
private var eventLoop: EventLoop[JobSchedulerEvent] = null

def start(): Unit = synchronized {
if (eventLoop != null) return // scheduler has already been started

logDebug("Starting JobScheduler")
//例項化eventLoop並啟動進行JobScheduler釋出的事件處理
eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
  override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)

  override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
}
eventLoop.start()

// attach rate controllers of input streams to receive batch completion updates
// 對每個DStream的處理進度進行追蹤的監聽 不在本次研究範圍內,pass
for {
  inputDStream <- ssc.graph.getInputStreams
  rateController <- inputDStream.rateController
} ssc.addStreamingListener(rateController)

listenerBus.start(ssc.sparkContext)
receiverTracker = new ReceiverTracker(ssc)
inputInfoTracker = new InputInfoTracker(ssc)
//
receiverTracker.start()
//啟動jobGenerator
jobGenerator.start()
logInfo("Started JobScheduler")
}
 ...

  這裡關於核心還是大總管派出了小太監JobGenerator去執行自己給他賦予的命令,而命令就是告訴他每隔一個批次就產生一批jobs,我們來具體看JobGenerator的原始碼,關鍵內容看註釋部分:

JobGenerator分析
----JobGenerator

//開篇先宣告五個事件型別,不同事件不同的處理邏輯(廢話。。。)
private[scheduler] sealed trait JobGeneratorEvent
private[scheduler] case class GenerateJobs(time: Time) extends JobGeneratorEvent
private[scheduler] case class ClearMetadata(time: Time) extends JobGeneratorEvent
private[scheduler] case class DoCheckpoint(
    time: Time, clearCheckpointDataLater: Boolean) extends JobGeneratorEvent
private[scheduler] case class ClearCheckpointData(time: Time) extends JobGeneratorEvent

/**
 * This class generates jobs from DStreams as well as drives checkpointing and cleaning
 * up DStream metadata.
 */
//注意這裡JobGenerator的建構函式中入參是有JobScheduler的,小太監辦事是會受到大總管的監督的,
//針對小太監處理的結果,大總管需要作出對應的自己的處理。
private[streaming]
class JobGenerator(jobScheduler: JobScheduler) extends Logging {

  private val ssc = jobScheduler.ssc
  private val conf = ssc.conf
  private val graph = ssc.graph

  val clock = ...

  //核心成員,這是一個定時器,就是通過他JobGenerator會不斷的定期去產生job,時間就是一個批次
  private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
    longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")

  // This is marked lazy so that this is initialized after checkpoint duration has been set
  // in the context and the generator has been started.
  private lazy val shouldCheckpoint = ssc.checkpointDuration != null && ssc.checkpointDir != null

  private lazy val checkpointWriter = if (shouldCheckpoint) {
    new CheckpointWriter(this, ssc.conf, ssc.checkpointDir, ssc.sparkContext.hadoopConfiguration)
  } else {
    null
  }

  // eventLoop is created when generator starts.
  // This not being null means the scheduler has been started and not stopped
  //又見到了熟悉的eventLoop,事件容器
  private var eventLoop: EventLoop[JobGeneratorEvent] = null

  // last batch whose completion,checkpointing and metadata cleanup has been completed
  private var lastProcessedBatch: Time = null

  /** Start generation of jobs */
  def start(): Unit = synchronized {
    if (eventLoop != null) return // generator has already been started

    // Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock.
    // See SPARK-10125
    checkpointWriter

    //例項化eventLoop,實現對應的processEvent和onError方法
    eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {
      override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)

      override protected def onError(e: Throwable): Unit = {
        jobScheduler.reportError("Error in job generator", e)
      }
    }
    eventLoop.start()
    
    //如果之前checkpoint過了,再次重啟直接載入上次checkpoint的檔案進行不想恢復,checkpoint看似美好,其實實際使用的時候是會有問題的,
    //如果上次關閉是由於bug導致,而這次我們修改了程式碼邏輯,那麼如果再次啟動還是會用之前的checkpoint的內容,導致其實執行的程式碼還是老的程式碼,
    //想要新程式碼生效必須刪除checkpoint檔案,但是這樣會導致其它的資料消費記錄也GG從而無法從上次失敗開始消費資料,還是會有資料損失。
    if (ssc.isCheckpointPresent) {
      restart()
    } else {
    //初次見面啟動~
      startFirstTime()
    }
  }
  private def startFirstTime() {
    val startTime = new Time(timer.getStartTime())
    //啟動DAG Graph靜態模版配置
    graph.start(startTime - graph.batchDuration)
    //開啟無敵小陀螺,不知疲憊的定時器
    timer.start(startTime.milliseconds)
    logInfo("Started JobGenerator at " + startTime)
  }
...

  以上程式碼為主要的關於JobGenerator的成員和start方法的介紹,主要就是啟動定時器的工作,定時器執行緒啟動後會每個批次提交一個生成Job的時間插入到eventLoop中,然後對事件處理後生成對應jobs,返回生成的jobs插入到JobScheduler中的jobSets中,提交到執行緒池中等待處理。

DStreamGraph解析

  回到上面jobGenerator啟動方法中,初次啟動既然呼叫了graph.start(...)那麼我們繼續看DStreamGraph中關於start()的原始碼:

def start(time: Time) {
    this.synchronized {
      if (zeroTime != null) {
        throw new Exception("DStream graph computation already started")
      }
      zeroTime = time
      startTime = time
      outputStreams.foreach(_.initialize(zeroTime))
      outputStreams.foreach(_.remember(rememberDuration))
      outputStreams.foreach(_.validateAtStart)
      inputStreams.par.foreach(_.start())
    }
  }

  很顯然這裡是關於DStreamGraph中關於outputStreaminputStream的配置,注意由於DStream也是一種模版,所以這裡既是對DStream模版內容的配置,主要配置的就是快取時間之類的內容。
程式碼到這裡,已經基本介紹了關於SparkStreamingContext中的兩巨頭之JobScheduler啟動的流程,但是另外一個重點關於DStreamGraph的描述只是它的作用和功能等,但是它又是什麼時候產生的呢?接下來對此進行解釋,我們先繼續回到SparkStreamingContext的原始碼,其中關於DStreamGraph的宣告為:

private[streaming] val graph: DStreamGraph = {
    if (isCheckpointPresent) {
      cp_.graph.setContext(this)
      cp_.graph.restoreCheckpointData()
      cp_.graph
    } else {
      require(batchDur_ != null, "Batch duration for StreamingContext cannot be null")
      val newGraph = new DStreamGraph()
      newGraph.setBatchDuration(batchDur_)
      newGraph
    }
  }

  這裡只是在建立SparkStreamingContext的時候順帶建立的一個空的DStreamGraph物件,然後設定了一下它的批處理時間(直接從上次恢復的不看)。
  文章最開頭說的,關於我們平時寫的sparkstreaming的程式碼中的各種transformation以及action操作,只是在編寫靜態DAG Graph的模版,那麼這個模版是如何隨著我們編寫的程式碼來生成的呢?這裡我們需要先看DStreamGraph這個類的成員的基本情況:

final private[streaming] class DStreamGraph extends Serializable with Logging {

  private val inputStreams = new ArrayBuffer[InputDStream[_]]()
  private val outputStreams = new ArrayBuffer[DStream[_]]()

  var rememberDuration: Duration = null
  var checkpointInProgress = false

  var zeroTime: Time = null
  var startTime: Time = null
  var batchDuration: Duration = null

  def start(time: Time) {
    this.synchronized {
      if (zeroTime != null) {
        throw new Exception("DStream graph computation already started")
      }
      zeroTime = time
      startTime = time
      outputStreams.foreach(_.initialize(zeroTime))
      outputStreams.foreach(_.remember(rememberDuration))
      outputStreams.foreach(_.validateAtStart)
      inputStreams.par.foreach(_.start())
    }
  }
...

  這個類的構造其實很簡單,核心就是inputStreamsoutputStreams這兩個陣列,這兩兄弟是構造DStreamGraph模版的主力軍,不過一開始在SparkStreamingContext建立DStreamGraph的時候很顯然這些內容都是空的,而我們編寫的各種mapfilterprintupdateStateByKey等等運算元,就是來進行這兩兄弟填充的,那麼是如何填充的呢?這裡我們先看看幾個常見運算元的原始碼:

map運算元: val c = rdda.map(…)

def map[U: ClassTag](mapFunc: T => U): DStream[U] = ssc.withScope {
    new MappedDStream(this, context.sparkContext.clean(mapFunc))
  }

得到的是一個MappedDStream物件,繼續看這個物件的程式碼:

class MappedDStream[T: ClassTag, U: ClassTag] (
    parent: DStream[T],
    mapFunc: T => U
  ) extends DStream[U](parent.ssc) {

  override def dependencies: List[DStream[_]] = List(parent)

  override def slideDuration: Duration = parent.slideDuration

  override def compute(validTime: Time): Option[RDD[U]] = {
    parent.getOrCompute(validTime).map(_.map[U](mapFunc))
  }
}

  這個類繼承自DStream,我們先注意看MappedDStream中的dependencies,這裡的parent就是宣告MappedDStream時的對應DStream,這裡就是程式碼rdda.map(...)rdda的型別,也即每次呼叫map運算元的時候,生成的新的DStream均會在自己的dependencies自己的老父親是誰,類似的其它所有的運算元,都是這種結構,只要由一個DStream生成一個新的DStream,均會記錄之間的關係,這樣下來我們編寫的處理邏輯所有運算元之間就形成了一條龍了,從資料來源到最後的foreachRDD這種運算元。
  不過以上說的DStream之間記錄老父親互相之間建立聯絡的方式,只是在DStream之間,並沒有看到和DStreamGraph有半毛錢關係。和DStreamGraph建立關係其實是在最後呼叫action型別運算元的時候,例如上述的foreachRDD運算元,我們來看看它和普通的transformation有什麼區別呢,方法的原始碼如下:

def foreachRDD(foreachFunc: RDD[T] => Unit): Unit = ssc.withScope {
    val cleanedF = context.sparkContext.clean(foreachFunc, false)
    this.foreachRDD((r: RDD[T], t: Time) => cleanedF(r))
  }
def foreachRDD(foreachFunc: (RDD[T], Time) => Unit): Unit = ssc.withScope {
    new ForEachDStream(this, context.sparkContext.clean(foreachFunc, false)).register()
  }

  可以看到,這裡得到的是一個ForEachDStream類,然而仔細發現它還有一個register()小尾巴,我們看看它偷偷做了什麼:

private[streaming] def register(): DStream[T] = {
    ssc.graph.addOutputStream(this)
    this
  }
def addOutputStream(outputStream: DStream[_]) {
    this.synchronized {
      outputStream.setGraph(this)
      outputStreams += outputStream
    }
  }

  這裡就發現原來這裡出現了DStreamGraph的身影!這裡呼叫graph.addOutputStream(this)將我們進行foreachRDD得到的DStream插入到了outputStream陣列中,如果再看其它action類運算元和transformation類運算元,會發現他們的邏輯和這裡都一樣,進行到這裡就會發現只要呼叫了action的運算元,都會被作為outputStream加入到DStreamGraph中。
  以上DStreamGraph中的兩兄弟outputStream已經知道如何得到資料的了,那麼另外一個inputStream又是如何得到填充的呢?讓我們先看看對應的DStreamGraph中給inputStream插入資料的方法為:

def addInputStream(inputStream: InputDStream[_]) {
    this.synchronized {
      inputStream.setGraph(this)
      inputStreams += inputStream
    }
  }

  再通過Find usage讓我們看看哪裡會呼叫這個方法,發現只有在一個InputDStream的類中有呼叫,而且是在這個類建立時自動呼叫的,程式碼如下:

ssc.graph.addInputStream(this)

  而這個InputDStream的實現類主要是:DirectKafkaInputDStreamFileInputDStreamSocketInputDStream等,很顯然這些都是資料來源的DStream,所以就是說在剛開始宣告這些類的時候就會自動被加入到DStreamGraph中的inputStream
至此,我們需要注意到其實inputStreamoutputStream分別就是我們處理邏輯的資料入口和最終輸出的出口,DStreamGraph中記錄了入口和出口,而且之前已經說過每個DStream之間通過dependencies已經建立過了聯絡,所以知道了開頭和結尾,中間的關係又有了,那麼至此一張無形的關係網模版已經悄然形成!!!
  以上就是關於DStreamGraph在應用啟動時的建立過程了,再加上說明的JobScheduler的邏輯,我們的關於專案啟動的主體流程已經搞定~

個人收穫

  看完這一部分原始碼,體會最深的是內部的各種生產者消費者模式的使用對事件的處理,DStreamGraph的設計也很巧妙,各種類的封裝抽象很到位,不過巧妙帶來的問題就是作為原始碼閱讀者我看起來有點費勁,到現在還沒有找到InputStream是在什麼時候設定的快取的???至於對具體優化和編寫sparkstreaming應用時的一些感悟,貌似並沒有任何關係。。。算是提示自己的思想吧,訓練思維!