作者:周志湖
微訊號:zhouzhihubeyond

主要內容

  1. Spark Stream 快取
  2. Checkpoint
  3. 案例

1. Spark Stream 快取

通過前面一系列的課程介紹,我們知道DStream是由一系列的RDD構成的,它同一般的RDD一樣,也可以將流式資料持久化到內容當中,採用的同樣是persisit方法,呼叫該方法後DStream將持久化所有的RDD資料。這對於一些需要重複計算多次或資料需要反覆被使用的DStream特別有效。像reduceByWindow、reduceByKeyAndWindow等基於視窗操作的方法,它們預設都是有persisit操作的。reduceByKeyAndWindow方法原始碼具體如下:

def reduceByKeyAndWindow(
      reduceFunc: (V, V) => V,
      invReduceFunc: (V, V) => V,
      windowDuration: Duration,
      slideDuration: Duration,
      partitioner: Partitioner,
      filterFunc: ((K, V)) => Boolean
    ): DStream[(K, V)] = ssc.withScope {

    val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
    val cleanedInvReduceFunc = ssc.sc.clean(invReduceFunc)
    val cleanedFilterFunc = if (filterFunc != null) Some(ssc.sc.clean(filterFunc)) else None
    new ReducedWindowedDStream[K, V](
      self, cleanedReduceFunc, cleanedInvReduceFunc, cleanedFilterFunc,
      windowDuration, slideDuration, partitioner
    )
  }

從上面的方法來看,它最返回的是一個ReducedWindowedDStream物件,跳到該類的原始碼中可以看到在其主建構函式中包含下面兩段程式碼:

private[streaming]
class ReducedWindowedDStream[K: ClassTag, V: ClassTag](
    parent: DStream[(K, V)],
    reduceFunc: (V, V) => V,
    invReduceFunc: (V, V) => V,
    filterFunc: Option[((K, V)) => Boolean],
    _windowDuration: Duration,
    _slideDuration: Duration,
    partitioner: Partitioner
  ) extends DStream[(K, V)](parent.ssc) {
  //省略其它非關鍵程式碼

  //預設被快取到記憶體當中
  // Persist RDDs to memory by default as these RDDs are going to be reused.
  super.persist(StorageLevel.MEMORY_ONLY_SER)
  reducedStream.persist(StorageLevel.MEMORY_ONLY_SER)
}

通過上面的程式碼我們可以看到,通過視窗操作產生的DStream不需要開發人員手動去呼叫persist方法,Spark會自動幫我們將資料快取當記憶體當中。同一般的RDD類似,DStream支援的persisit級別為:
這裡寫圖片描述

2. Checkpoint機制

通過前期對Spark Streaming的理解,我們知道,Spark Streaming應用程式如果不手動停止,則將一直執行下去,在實際中應用程式一般是24小時*7天不間斷執行的,因此Streaming必須對諸如系統錯誤、JVM出錯等與程式邏輯無關的錯誤(failures )具體很強的彈性,具備一定的非應用程式出錯的容錯性。Spark Streaming的Checkpoint機制便是為此設計的,它將足夠多的資訊checkpoint到某些具備容錯性的儲存系統如HDFS上,以便出錯時能夠迅速恢復。有兩種資料可以chekpoint:

(1)Metadata checkpointing
將流式計算的資訊儲存到具備容錯性的儲存上如HDFS,Metadata Checkpointing適用於當streaming應用程式Driver所在的節點出錯時能夠恢復,元資料包括:
Configuration(配置資訊) - 建立streaming應用程式的配置資訊
DStream operations - 在streaming應用程式中定義的DStreaming操作
Incomplete batches - 在列隊中沒有處理完的作業

(2)Data checkpointing
將生成的RDD儲存到外部可靠的儲存當中,對於一些資料跨度為多個bactch的有狀態tranformation操作來說,checkpoint非常有必要,因為在這些transformation操作生成的RDD對前一RDD有依賴,隨著時間的增加,依賴鏈可能會非常長,checkpoint機制能夠切斷依賴鏈,將中間的RDD週期性地checkpoint到可靠儲存當中,從而在出錯時可以直接從checkpoint點恢復。

具體來說,metadata checkpointing主要還是從drvier失敗中恢復,而Data Checkpoing用於對有狀態的transformation操作進行checkpointing

Checkpointing具體的使用方式時通過下列方法:

//checkpointDirectory為checkpoint檔案儲存目錄
streamingContext.checkpoint(checkpointDirectory)

3. 案例

import java.io.File
import java.nio.charset.Charset

import com.google.common.io.Files

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Time, Seconds, StreamingContext}
import org.apache.spark.util.IntParam

/**
 * Counts words in text encoded with UTF8 received from the network every second.
 *
 * Usage: RecoverableNetworkWordCount <hostname> <port> <checkpoint-directory> <output-file>
 *   <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive
 *   data. <checkpoint-directory> directory to HDFS-compatible file system which checkpoint data
 *   <output-file> file to which the word counts will be appended
 *
 * <checkpoint-directory> and <output-file> must be absolute paths
 *
 * To run this on your local machine, you need to first run a Netcat server
 *
 *      `$ nc -lk 9999`
 *
 * and run the example as
 *
 *      `$ ./bin/run-example org.apache.spark.examples.streaming.RecoverableNetworkWordCount \
 *              localhost 9999 ~/checkpoint/ ~/out`
 *
 * If the directory ~/checkpoint/ does not exist (e.g. running for the first time), it will create
 * a new StreamingContext (will print "Creating new context" to the console). Otherwise, if
 * checkpoint data exists in ~/checkpoint/, then it will create StreamingContext from
 * the checkpoint data.
 *
 * Refer to the online documentation for more details.
 */
object RecoverableNetworkWordCount {

  def createContext(ip: String, port: Int, outputPath: String, checkpointDirectory: String)
    : StreamingContext = {


    //程式第一執行時會建立該條語句,如果應用程式失敗,則會從checkpoint中恢復,該條語句不會執行
    println("Creating new context")
    val outputFile = new File(outputPath)
    if (outputFile.exists()) outputFile.delete()
    val sparkConf = new SparkConf().setAppName("RecoverableNetworkWordCount").setMaster("local[4]")
    // Create the context with a 1 second batch size
    val ssc = new StreamingContext(sparkConf, Seconds(1))
    ssc.checkpoint(checkpointDirectory)

    //將socket作為資料來源
    val lines = ssc.socketTextStream(ip, port)
    val words = lines.flatMap(_.split(" "))
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => {
      val counts = "Counts at time " + time + " " + rdd.collect().mkString("[", ", ", "]")
      println(counts)
      println("Appending to " + outputFile.getAbsolutePath)
      Files.append(counts + "\n", outputFile, Charset.defaultCharset())
    })
    ssc
  }
  //將String轉換成Int
  private object IntParam {
  def unapply(str: String): Option[Int] = {
    try {
      Some(str.toInt)
    } catch {
      case e: NumberFormatException => None
    }
  }
}
  def main(args: Array[String]) {
    if (args.length != 4) {
      System.err.println("You arguments were " + args.mkString("[", ", ", "]"))
      System.err.println(
        """
          |Usage: RecoverableNetworkWordCount <hostname> <port> <checkpoint-directory>
          |     <output-file>. <hostname> and <port> describe the TCP server that Spark
          |     Streaming would connect to receive data. <checkpoint-directory> directory to
          |     HDFS-compatible file system which checkpoint data <output-file> file to which the
          |     word counts will be appended
          |
          |In local mode, <master> should be 'local[n]' with n > 1
          |Both <checkpoint-directory> and <output-file> must be absolute paths
        """.stripMargin
      )
      System.exit(1)
    }
   val Array(ip, IntParam(port), checkpointDirectory, outputPath) = args
    //getOrCreate方法,從checkpoint中重新建立StreamingContext物件或新建立一個StreamingContext物件
    val ssc = StreamingContext.getOrCreate(checkpointDirectory,
      () => {
        createContext(ip, port, outputPath, checkpointDirectory)
      })
    ssc.start()
    ssc.awaitTermination()
  }
}

輸入引數配置如下:
這裡寫圖片描述

執行狀態圖如下:
這裡寫圖片描述

首次執行時:

//建立新的StreamingContext
Creating new context
15/11/30 07:20:32 WARN MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set.
15/11/30 07:20:33 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes
Counts at time 1448896840000 ms []
Appending to /root/out2
15/11/30 07:20:47 WARN BlockManager: Block input-0-1448896847000 replicated to only 0 peer(s) instead of 1 peers
Counts at time 1448896850000 ms [(Spark,1), (Context,1)]

手動將程式停止,然後重新執行

//這時從checkpoint目錄中讀取元資料資訊,進行StreamingContext的恢復
Counts at time 1448897070000 ms []
Appending to /root/out2
Counts at time 1448897080000 ms []
Appending to /root/out2
Counts at time 1448897090000 ms []
Appending to /root/out2
15/11/30 07:24:58 WARN BlockManager: Block input-0-1448897098600 replicated to only 0 peer(s) instead of 1 peers
[Stage 8:>                                                          (0 + 0) / 4]Counts at time 1448897100000 ms [(Spark,1), (Context,1)]
Appending to /root/out2