SparkStreaming(6):例項-統計到目前為止累積出現的單詞的個數(updateStateByKey)
阿新 • • 發佈:2018-11-08
1.實現功能
現實中,不僅需要統計,當前批次的單詞個數,還需要統計,迄今為止的總的單詞個數。這個就是需要,使用到updateStateByKey運算元。
【參考:http://spark.apache.org/docs/2.1.0/streaming-programming-guide.html】
2.程式碼
package Spark import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Seconds, StreamingContext} /** * 使用spark streaming完成迄今為止所有累計單詞的個數 */ object StatefulWordcount { def main(args: Array[String]): Unit = { val sparkConf=new SparkConf().setAppName("StatefulWordcount").setMaster("local[2]") val ssc=new StreamingContext(sparkConf,Seconds(5)) //如果使用了stateful的運算元,必須要設定checkpoint //在生產過程中,建議把這個放到hdfs上 //自己沒設定,也沒事 ssc.checkpoint(".") val lines: ReceiverInputDStream[String] =ssc.socketTextStream("bigdata.ibeifeng.com",6789) val results: DStream[(String, Int)] =lines.flatMap( _.split(" ")) .map((_,1)) //.reduceByKey(_+_) val state: DStream[(String, Int)] =results.updateStateByKey[Int](updateFunction _) state.print() ssc.start() ssc.awaitTermination() } //from:http://spark.apache.org/docs/2.1.0/streaming-programming-guide.html /** * 把當前的資料去更新已有的資料 * @param CurrentValues 新的的 * @param PreValues 以前的 * @return */ def updateFunction(CurrentValues: Seq[Int], PreValues: Option[Int]): Option[Int] = { val current = CurrentValues.sum //... // add the new values with the previous running count to get the new count val pre=PreValues.getOrElse(0) Some(current+pre) } }
3.測試
(1)啟動nc -lk 6789,輸入測試資料
(2)結果
(fsd,1)
(ewrd,1)
(vsdf,1)
(,1)