1. 程式人生 > >SparkStreaming(6):例項-統計到目前為止累積出現的單詞的個數(updateStateByKey)

SparkStreaming(6):例項-統計到目前為止累積出現的單詞的個數(updateStateByKey)

1.實現功能

現實中,不僅需要統計,當前批次的單詞個數,還需要統計,迄今為止的總的單詞個數。這個就是需要,使用到updateStateByKey運算元。

【參考:http://spark.apache.org/docs/2.1.0/streaming-programming-guide.html

 

2.程式碼

package Spark

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * 使用spark streaming完成迄今為止所有累計單詞的個數
  */
object StatefulWordcount {
  def main(args: Array[String]): Unit = {

    val sparkConf=new SparkConf().setAppName("StatefulWordcount").setMaster("local[2]")

    val ssc=new StreamingContext(sparkConf,Seconds(5))
    //如果使用了stateful的運算元,必須要設定checkpoint
    //在生產過程中,建議把這個放到hdfs上
    //自己沒設定,也沒事
    ssc.checkpoint(".")

    val lines: ReceiverInputDStream[String] =ssc.socketTextStream("bigdata.ibeifeng.com",6789)
    val results: DStream[(String, Int)] =lines.flatMap( _.split(" "))
      .map((_,1))    //.reduceByKey(_+_)

    val state: DStream[(String, Int)] =results.updateStateByKey[Int](updateFunction _)

    state.print()

    ssc.start()
    ssc.awaitTermination()
  }
  //from:http://spark.apache.org/docs/2.1.0/streaming-programming-guide.html
  /**
    * 把當前的資料去更新已有的資料
    * @param CurrentValues 新的的
    * @param PreValues 以前的
    * @return
    */
  def updateFunction(CurrentValues: Seq[Int], PreValues: Option[Int]): Option[Int] = {
    val current = CurrentValues.sum //...  // add the new values with the previous running count to get the new count
    val pre=PreValues.getOrElse(0)
    Some(current+pre)
  }

}

3.測試

(1)啟動nc -lk 6789,輸入測試資料
(2)結果
        (fsd,1)
        (ewrd,1)
        (vsdf,1)
        (,1)