1. 程式人生 > >Spark學習筆記(14)——Spark Streaming 資料累加的案例

Spark學習筆記(14)——Spark Streaming 資料累加的案例

1 原始碼

package mystreaming

import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object AccWordcount {


  /*
  * (hello,1),(hello,1),(tom,1)
  * (hello,Seq(1,1)),(tom,Seq(1))
  * Option[Int],以前的結果
  * Seq 這個批次某個單詞的次數
  *
* */ val func = (iter: Iterator[(String, Seq[Int], Option[Int])]) => { //iter.map{case(word,current_count,history_count) => (word,current_count.sum + history_count.getOrElse(0)) } //iter.map(t=>(t._1,t._2.sum + t._3.getOrElse(0))) //iter.flatMap(it=>Some(it._2.sum + it._3.getOrElse(
0)).map(x=>(it._1,x))) iter.flatMap { case (x, y, z) => Some(y.sum + z.getOrElse(0)).map(m => (x, m)) } } def main(args: Array[String]): Unit = { LoggerLevels.setStreamingLogLevels() val conf = new SparkConf().setAppName("AccWordCount").setMaster("local[2]") val sc = new SparkContext(
conf) val ssc = new StreamingContext(sc, Seconds(5)) /* * updateStateByKey 必須設定 checkpoint * */ sc.setCheckpointDir("d://checkpoint") //接收資料 val ds = ssc.socketTextStream("node1", 8888) //DStream 是一個特殊的 RDD val result = ds.flatMap(_.split(" ")).map((_, 1)).updateStateByKey(func,new HashPartitioner(sc.defaultParallelism),true) result.print() ssc.start() ssc.awaitTermination() } }

2 測試

在這裡插入圖片描述
在這裡插入圖片描述