SparkStreaming(17):updateStateByKey運算元,保留上一次計算結果
阿新 • • 發佈:2018-12-16
1.實現功能
如果SparkStreaming程式斷掉,重新啟動,可以讀取斷掉之前的結果。通過,使用SparkStreaming的HA:checkpoints。
2.程式碼
package _0809kafka //import com.beifeng.util.SparkUtil import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkConf, SparkContext} /** * * 之前做的計算當中,當前批次的計算值不會累加到下一個批次 * * 當前批次的值計算完之後,存到外部儲存系統中 * 下一個批次計算完值之後,在取出上一個批次計算出來的值, * 做相加,更新會原位置上 * * checkpoint會保留上一個程式的ssc的狀態和UpdateStateByKey的結果 * 但是構造ssc的時候,必須按照規矩寫,否則就讀不到UpdateStateByKey上一次的結果 */ object UpdateStateByKeyAPI_1020HA { def main(args: Array[String]) { //使用checkpoint來儲存批次的資料 //1、建立sparkConf val sparkConf: SparkConf = new SparkConf() .setAppName("UpdateStateByKeyAPI") .setMaster("local[2]") //2、建立sparkContext val sc = new SparkContext(sparkConf) // val path = s"file:///E:\\workspace\\SparkPro\\checkpoint\\streaming_05" val path = s"file:///E:\\Tools\\WorkspaceforMyeclipse\\scalaProjectMaven\\streaming_07" def creatingFunc():StreamingContext ={ val ssc = new StreamingContext(sc,Seconds(10)) ssc.checkpoint(path) val socketDStream: ReceiverInputDStream[String] = ssc.socketTextStream("bigdata.ibeifeng.com",9999) //api updateStateByKey val resultDStream: DStream[(String, Long)] = socketDStream.mapPartitions(iter =>{ //對於當前批次的值做資料轉換 iter.flatMap(_.split(" ")) .filter(_.nonEmpty) .map(word => (word,1)) }) //對於當前批次的值,做累加(aggr聚合)操作 .reduceByKey(_ + _) //對於value的操作,相同key怎麼處理對應的value .updateStateByKey((seq: Seq[Int],state: Option[Long])=>{ //當前批次的相同key的value的聚合值 val sum = seq.sum val preState= state.getOrElse(0L) /** * if(sum + preState > 1000){ * Some(sum + preState) * }else{ * //清空當前key的value值 * None * } */ Some(sum + preState) }) resultDStream.foreachRDD((rdd,time) =>{ println(s"----------------當前時間為:${time}----------------") //比如說:某些key不列印,某些值過於小也可以不列印,或者列印排序後的前5 rdd.filter(t =>{ t._2 > 100 }).foreach(println) }) ssc } val ssc = StreamingContext.getActiveOrCreate(path,creatingFunc) ssc.start() ssc.awaitTermination() } }
3.測試
(1)開啟nc nc -lt 9999
(2)執行程式
(3)結果:
----------------當前時間為:1540004570000 ms----------------
(hadoop,212)
(ccs,159)
----------------當前時間為:1540004580000 ms----------------
[Stage 9:=================================================> (5 + 1) / 6]
(hadoop,360)
(ccs,270)