1. 程式人生 > >SparkStreaming(17):updateStateByKey運算元,保留上一次計算結果

SparkStreaming(17):updateStateByKey運算元,保留上一次計算結果

1.實現功能

如果SparkStreaming程式斷掉,重新啟動,可以讀取斷掉之前的結果。通過,使用SparkStreaming的HA:checkpoints。

2.程式碼

package _0809kafka

//import com.beifeng.util.SparkUtil
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}

/**
 *
 * 之前做的計算當中,當前批次的計算值不會累加到下一個批次
 *
 * 當前批次的值計算完之後,存到外部儲存系統中
 * 下一個批次計算完值之後,在取出上一個批次計算出來的值,
 * 做相加,更新會原位置上
 *
 * checkpoint會保留上一個程式的ssc的狀態和UpdateStateByKey的結果
 * 但是構造ssc的時候,必須按照規矩寫,否則就讀不到UpdateStateByKey上一次的結果
 */
object UpdateStateByKeyAPI_1020HA {
  def main(args: Array[String]) {
    //使用checkpoint來儲存批次的資料
    //1、建立sparkConf
    val sparkConf: SparkConf = new SparkConf()
      .setAppName("UpdateStateByKeyAPI")
      .setMaster("local[2]")
    //2、建立sparkContext
    val sc = new SparkContext(sparkConf)

//    val path = s"file:///E:\\workspace\\SparkPro\\checkpoint\\streaming_05"
    val path = s"file:///E:\\Tools\\WorkspaceforMyeclipse\\scalaProjectMaven\\streaming_07"

    def creatingFunc():StreamingContext ={
      val ssc = new StreamingContext(sc,Seconds(10))
      ssc.checkpoint(path)
      val socketDStream: ReceiverInputDStream[String] = ssc.socketTextStream("bigdata.ibeifeng.com",9999)

      //api updateStateByKey
      val resultDStream: DStream[(String, Long)] = socketDStream.mapPartitions(iter =>{
        //對於當前批次的值做資料轉換
        iter.flatMap(_.split(" "))
          .filter(_.nonEmpty)
          .map(word => (word,1))
      })
        //對於當前批次的值,做累加(aggr聚合)操作
        .reduceByKey(_ + _)
        //對於value的操作,相同key怎麼處理對應的value
        .updateStateByKey((seq: Seq[Int],state: Option[Long])=>{
        //當前批次的相同key的value的聚合值
        val sum = seq.sum
        val preState= state.getOrElse(0L)
        /**
         * if(sum + preState > 1000){
         * Some(sum + preState)
         * }else{
         * //清空當前key的value值
         * None
         * }
         */
        Some(sum + preState)
      })

      resultDStream.foreachRDD((rdd,time) =>{
        println(s"----------------當前時間為:${time}----------------")
        //比如說:某些key不列印,某些值過於小也可以不列印,或者列印排序後的前5
        rdd.filter(t =>{
          t._2 > 100
        }).foreach(println)
      })
      ssc
    }

    val ssc = StreamingContext.getActiveOrCreate(path,creatingFunc)


    ssc.start()
    ssc.awaitTermination()


  }
}

3.測試

(1)開啟nc             nc -lt 9999

(2)執行程式

(3)結果:

----------------當前時間為:1540004570000 ms----------------
(hadoop,212)
(ccs,159)
----------------當前時間為:1540004580000 ms----------------
[Stage 9:=================================================>         (5 + 1) / 6]
(hadoop,360)
(ccs,270)

(測試成功~)