1. 程式人生 > >Spark Streaming(二十七)DStream的轉換、輸出、快取持久化、檢查點

Spark Streaming(二十七)DStream的轉換、輸出、快取持久化、檢查點

定義

所謂DStream的轉換其實就是對間隔時間內DStream資料流的RDD進行轉換操作並返回去一個新的DStream

DStream轉換

其實DStream轉換語法跟RDD的轉換語法非常類似,但DStream有它自己的一些特殊的語法,如updateStateByKey()、transform()、以及各種Window語法。

轉換 意思
map(func) 將DStream上的每個RDD通過func函式操作並返回一個新的DStream
flatMap(func) 和map類似,但是每個輸入的元素可以對映成0個或者多個項
filter(func) 過濾出DStream上符合要求的RDD並返回新的DStream
repartiton(numPartitions) 對DStream上的RDD進行重新分割槽,提高並行度
union(otherDStream) 將兩個DStream合併到一起
count() 返回DStream中RDD中元素的總個數,返回只包含一個Long型別的DStream
reduce(func) 通過func函式聚合DStream中每個RDD中的每個元素,返回新的DStream,該新的DStream中只有一個元素,就是聚合以後的而結果
countByValue() 計算DStream上RDD內元素出現的頻次,並返回新的DStream[(K,Long)],K是RDD元素的型別,Long是元素出現的次數
reduceByKey(func,[numPartition]) 聚合DStream上(K,V)型別的RDD裡元素的,根據K統計V的個數,返回新的DStream,新的DStream裡的RDD元素型別也為(K,V),K為鍵,V為K對應值的個數
join(otherDStream,[numPartition]) 將兩個型別為(K,V)和(K,W)的DStream進行join連線,返回一個型別為(K,(V,W))的新的DStream
cogroup(otherDStream,[numPartition]) 對兩個(K,V)和(K,W)型別的DStream上呼叫該函式的時候,返回(K,(Seq [V],Seq [W]))元組的新DStream。
transform(func) 通過轉換函式,將DStream上的每個RDD轉換成另一種RDD,這種函式的操作基本單位為RDD,所以這個函式中的操作語法就是RDD的操作語法。轉換後返回新的DStream

UpdateStateByKey操作

updateStateByKey就是隨著時間的流逝,在SparkStreaming中的可以對每一個Key通過checkPoint來維護state狀態,通過更新函式對每一個Key的狀態進行更新,在更新的時候,對於每一個批次的資料而言,SparkStreaming都會通過updateStateByKey這個函式更新已存在Key對應的State,但是如果通過更新函式對State更新以後返回的是None,那麼此刻的Key對應的State就會被刪除,State可以是任意型別的資料結構。

package com.lyz.streaming.transformation

import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object UpdateStateByKey {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[2]").setAppName("Streamingtest")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

    val streaming = new StreamingContext(conf, Seconds(5))

    /**
      *
      * 在HDFS上儲存state的檢查點,必須設定,否則會報錯,因為每次更新state的時候
      * 都需要去檢查點獲取上一次更新後的state
      */
    streaming.checkpoint("hdfs://xxxxx:8020/spark/checkpoint")

    val stream: InputDStream[(LongWritable, Text)] = streaming.fileStream[LongWritable, Text, TextInputFormat]("hdfs://192.168.101.187:8020/spark/data")
    val mapStream: DStream[(String, Int)] = stream.map(x => (x._2.toString, 1))

    /**
      * 定義更新函式體
      * value:就是新一批次的資料經過邏輯處理得到的結果
      * res:新一批次的資料進來之前的狀態值
      */

    def func(value: Seq[Int], res: Option[Int]): Some[Int] = {
      val sum: Int = value.sum
      val per: Int = res.getOrElse(0)
      Some(sum + per)
    }

    //呼叫更新函式,傳入更新函式體
    val res: DStream[(String, Int)] = mapStream.updateStateByKey(func)
    res.foreachRDD(rdd => {
      rdd.collect().foreach(println(_))
    })

    streaming.start()
    streaming.awaitTermination()

  }
}

Transform轉換操作

transform操作允許DStream上間隔時間內裡的RDD轉換成另個RDD。它的作用就是應用Dstream為公開的RDD操作函式。例如DStream上一個新的批次RDD與已存在的RDD進行join連線查詢的API是未公開的,所以是不能這麼使用的,但是利用transform函式就可以使用DStream未公開API

package com.lyz.streaming.transformation

import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}

object TransformTest {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[2]").setAppName("Streamingtest")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    val streaming = new StreamingContext(conf, Seconds(5))

    /**
      *
      * 在HDFS上儲存state的檢查點,必須設定,否則會報錯,因為每次更新state的時候
      * 都需要去檢查點獲取上一次更新後的state
      */
    streaming.checkpoint("hdfs://xxxx:8020/spark/checkpoint")
    val stream: InputDStream[(LongWritable, Text)] = streaming.fileStream[LongWritable, Text, TextInputFormat]("hdfs://192.168.101.187:8020/spark/data")
    val mapStream: DStream[(String, Int)] = stream.map(x => (x._2.toString, 1))

    //外部的RDD
    val rdd: RDD[(String, Int)] = streaming.sparkContext.makeRDD(Array(("aaaa", 1)))

    /**
      * 呼叫DStream的transform函式,將DStream裡的RDD與外部RDD進行操作,返回新的DStream
      * transform強大的原因就是能夠使用DStream為公開的一些RDD操作函式
      */
    val res: DStream[(String, (Int, Int))] = mapStream.transform(rdd1 => {
      rdd1.join(rdd)
    })

    res.foreachRDD(rdd => {
      rdd.collect().foreach(println(_))
    })

    streaming.start()
    streaming.awaitTermination()
  }
}

Window視窗操作

SparkStreaming提供了基於視窗的操作,你可以在滑動視窗內對資料進行轉換。這種視窗操作就是在比SparkStreaming批處理的時間間隔更長的時間間隔內整合多個批處理的結果。
Window視窗操作需要兩個引數,一個引數是視窗的時長(最近的幾個批次),另一個是視窗的滑動步長(多久執行一次計算)。需要特別注意的是這兩個引數一定要是批處理間隔時間的整數倍。例如我們有一個以10秒為間隔的批處理,我們想要沒十秒計算一次30秒內的資料,那麼我們就可以把時長視窗設定成30秒,視窗的滑動步長為10秒。

package com.lyz.streaming.transformation

import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}

object WindowTest {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[2]").setAppName("Streamingtest")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")

    val streaming = new StreamingContext(conf, Seconds(5))

    val stream: InputDStream[(LongWritable, Text)] = streaming.fileStream[LongWritable, Text, TextInputFormat]("hdfs://192.168.101.187:8020/spark/data")
    val mapStream: DStream[(String, Int)] = stream.map(x => (x._2.toString, 1))
    /**
      * 每十秒計算一次前三十秒的資料,其實也就是前6個批次的資料
      */
    val res: DStream[(String, Int)] = mapStream.reduceByKeyAndWindow((w: Int, w1: Int) => w + w1, Seconds(30), Seconds(10))
  }
}

比較常用的Window視窗函式

函式 解釋
window(windowLength,slideInterval) 建立滑動視窗,並返回一個新的DStream然後自定義函式處理這個滑動視窗的資料
reduceByKeyWindow(func,windowLength.slideInterval) 對每個滑動視窗執行reduceByKey的操作
reduceByKeyAndWindow(func,invFunc,windowLength,slideInterval,[numParition]) 這是一個性能更好的reduceByKeyAndWindow函式,
countByWindow(windowLength,slideInterval) 對每一個滑動視窗執行count操作,返回每個窗口裡的元素個數
countByValueAndWindow(windowLength,slideInterval,[numPatition]) 在每個滑動視窗內計算reduceByValue操作,統計每個鍵對應的值在視窗內出現的頻次

DStream的操作結果輸出

操作DStream的輸出結果可以推送到外部資料庫中或者外的檔案系統中。

函式 解釋
print() 在Driver端列印DStream上的前十個元素
saveAsTextFiles(prefix,[suffix]) 儲存DStream內容到文字檔案中,每個批次產生的結果的名字為字首為prefix,字尾為[.suffix],所以全稱為"prefix-TIME_IN_MS[.suffix]"
saveAsObjectFiles(prefix,[suffix]) 儲存DStream內容為Sequence檔案,這檔案是Java物件序列化後的,每個批次產生的結果的名字為字首為prefix,字尾為[.suffix],所以全稱為"prefix-TIME_IN_MS[.suffix]"
saveAsHadoopFiles(prefix,[suffix]) 儲存DStream內容到Hadoop檔案系統中,每個批次產生的結果的名字為字首為prefix,字尾為[.suffix],所以全稱為"prefix-TIME_IN_MS[.suffix]"
foreachRDD(func) 迴圈DStream上的RDD,並且將函式應用到每個RDD上,並且這個函式可以將RDD推送到外部系統中,注意這個函式時在Driver端執行的。它內部的RDD的操作是在每個Woker分割槽上執行的

foreachRDD應用設計

foreachRDD幾首一個函式,通過這個函式我可以把RDD儲存到外部系統中,既然要儲存到外部系統就需要與外部系統建立連線,我們都知道RDD的資料是散落在各個woker上的,處理RDD的函式也是在各個worker上進行的,我們需要將資料儲存到外部資料,那麼我們就必須在每個worker上都存在一個連線,因為連線是不能被序列化的,所以這個連線肯定是不能由driver傳送到worker上的,而是在worker上創建出來。那麼怎麼樣才能在worker上建立連線呢?那就是在rdd.foreach程式碼塊裡建立連線,而不是在stream.foreachRDD程式碼塊裡建立。

 res.foreachRDD(rdd => {
      rdd.foreach(r => {
        //1、建立連線
        //2、儲存資料
        //3、關閉連線
      })
    })

我們都知道遍歷RDD的時候其實就是在遍歷每一條記錄,如上程式碼所示我們為每一條記錄都建立了連線,顯然這樣是非常影響系統性能的,那麼我們怎麼辦呢?既然資料是在各個worker的分割槽上的,那麼我們可以為每個分割槽建立一個連線。

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val connection = createNewConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    connection.close()
  }
}

DataFrame和SQL操作DStream

在處理DStream資料流的時候,我們可以利用DataFrame和SQL操作DStream。利用DataFrame和SQL處理DStream的前提是必須利用初始化Streaming的SparkConf來建立SparkSession。例子如下

package com.lyz.streaming.transformation

import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}

object DateFrameAndSqlTest {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[2]").setAppName("Streamingtest")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    val streaming = new StreamingContext(conf, Seconds(5))

    /**
      *
      * 在HDFS上儲存state的檢查點,必須設定,否則會報錯,因為每次更新state的時候
      * 都需要去檢查點獲取上一次更新後的state
      */
    streaming.checkpoint("hdfs://192.168.101.187:8020/spark/checkpoint")
    val stream: InputDStream[(LongWritable, Text)] = streaming.fileStream[LongWritable, Text, TextInputFormat]("hdfs://192.168.101.187:8020/spark/data")
    val mapStream: DStream[(String, Int)] = stream.map(x => (x._2.toString, 1))

    mapStream.foreachRDD(rdd => {
      //利用SparkConf來初始化SparkSession。
      val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate()

      //匯入隱式轉換來將RDD
      import sparkSession.implicits._
      //將RDD轉換成DF
      val df: DataFrame = rdd.toDF("word", "count")
      df.createOrReplaceTempView("compute")
      val computeDF: DataFrame = sparkSession.sql("select * from compute")
      computeDF.show()
    })
    streaming.start()
    streaming.awaitTermination()
  }
}

DStream快取和持久化

與RDD快取和持久化類似,我們在程式中可以把程式中將要被多次計算的DStream快取到記憶體中,也就是對相同資料進行多次計算。呼叫DStream的persist方法來實現DStream的快取。對於視窗函式的操作這些底層已經對資料進行快取了,因此視窗函式生成的DStream已經儲存在了記憶體中,開發人員無需呼叫persist方法。

對於通過網路傳輸的的資料流,預設的持久化是將資料複製到兩個節點上進行備份實現容錯

注意:與RDD不同,DStream的預設持久化級別是將資料持久化儲存在記憶體中。

DStream的檢查點

由於流式處理程式需要全天不間斷的執行,執行期間有很大機率會出現程式故障,為了應用程式的故障容錯性,Streaming設定了檢查點功能,將足夠的資訊儲存到檢查點中,以便對系統故障進行恢復。Streaming可以對兩種資料進行checkPoint。

  • 元資料檢查點:將定義流式計算資訊儲存到檢查點上(HDFS目錄),用於恢復應用程式的驅動程式的故障。其中應用程式的元資料資訊包含建立流式應用的配置、定義流式應用程式的操作集、部分未完成的批次
  • 資料檢查點:將生成的RDD儲存到檢查點上 。在應用程式中跨多個批次的RDD的具有有狀態的轉換中,這種檢查點必須設定,例如視窗操作和有狀態更新。因為在多個批次組合生成一個RDD的時候,結果RDD會依賴先前批次的RDD,這樣如果批次都的話,就會形成很多的依賴關係,設定RDD檢查點就是為了切斷依賴關係。

合適設定檢查點

有狀態操作的轉換,如果在應用程式中只用了例如updateStateByKey侯喆是reduceByKeyAndWindow,就必須要設定RDD的檢查點。

恢復故障的應用驅動程式 ,應用程式的元資料檢查點是儲存應用程式當前進度資訊的,所以驅動程式故障恢復就是恢復當前應用程式執行的進度。

如何配置檢查點

  • 配置資料檢查點,在執行DStream有狀態的操作的時候,必須設定檢查點,設定檢查點只需要呼叫StreamingContext.checkPoint(“hdfs://hadoop001:8020/sparkStreaming/checkPoint”)並傳入HDFS上的目錄,這樣就可以將上次執行的結果RDD儲存到該目錄下,供下個批次與該資料進行整合。
  • 應用元資料資訊檢查點。在應用程式第一次啟動的時候,會建立SparkContext,並呼叫start和awaitTermination方法開始流失資料的處理,當應用程式啟動失敗的時候,驅動程式會從檢查點上恢復SparkContext。
package com.lyz.streaming.checkpoint

import org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}

object MetaStoreCheckPointTest {
  def main(args: Array[String]): Unit = {
    //設定檢查點目錄
    val checkPointDir = "hdfs://192.168.101.187:8020/spark/checkpoint"
    //從檢查點上得到StreamingContext,如果檢查點上沒有StreamingContext那麼就建立一個新的StreamingContext。
    val streaming: StreamingContext = StreamingContext.getOrCreate(checkPointDir, () => createContext(checkPointDir))
    streaming.start()
    streaming.awaitTermination()

  }
  
  def createContext(checkPointDir: String): StreamingContext = {
    //建立SparkConf
    val conf = new SparkConf().setMaster("local[2]").setAppName("Streamingtest")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    //建立StreamingContext
    val streaming = new StreamingContext(conf, Seconds(5))

    /**
      *
      * 在HDFS上儲存state的檢查點,必須設定,否則會報錯,因為每次更新state的時候
      * 都需要去檢查點獲取上一次更新後的state
      */

    streaming.checkpoint(checkPointDir)
    val stream: InputDStream[(LongWritable, Text)] = streaming.fileStream[LongWritable, Text, TextInputFormat]("hdfs://192.168.101.187:8020/spark/data")
    val mapStream: DStream[(String, Int)] = stream.map(x => (x._2.toString, 1))
    val res: DStream[(String, Int)] = mapStream.reduceByKeyAndWindow((a: Int, b: Int) => a + b, Seconds(30), Seconds(10))
    res.foreachRDD(_.foreach(println(_)))
    streaming
  }
}