1. 程式人生 > >Spark Streaming中reduceByKeyAndWindow例項開發

Spark Streaming中reduceByKeyAndWindow例項開發

package SparkStreamingTest.Scala

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * Created by TG.
  * 每隔2秒鐘,統計最近5秒鐘的搜尋詞中排名最靠前的3個搜尋詞以及出現次數。
  */
object ReduceByKeyAndWindowDemo {
  def main(args: Array[String]): Unit = {
    //設定日誌級別
    Logger.getLogger
("org").setLevel(Level.WARN) val conf = new SparkConf().setAppName(this.getClass.getSimpleName).setMaster("local[2]") val ssc = new StreamingContext(conf, Seconds(1)) //StorageLevel.MEMORY_AND_DISK_SER_2 val linesDStream = ssc.socketTextStream("master", 6666) //StorageLevel.MEMORY_ONLY_SER // linesDStream.persist
() linesDStream.checkpoint(Seconds(10)) linesDStream.flatMap(_.split(" ")) .map((_, 1)) .reduceByKeyAndWindow((a: Int, b: Int) => a + b, Seconds(5), Seconds(2)) .transform(rdd => { val result: Array[(String, Int)] = rdd.map(x => (x._2, x._1)).sortByKey(false).map
(x => (x._2, x._1)).take(3) //result的型別不是RDD,而是一個Array陣列,此處將其變為RDD val resultRDD = ssc.sparkContext.parallelize(result) //注意:transform函式是要有返回值的,所以將操作之後的resultRDD返回。 resultRDD }).map(x => x._1 + "出現的次數是:" + x._2).print() ssc.start() ssc.awaitTermination() } }