SparkStreaming(9):例項-Streaming整合Spark SQL,進行wordcount功能
阿新 • • 發佈:2018-11-08
1.功能實現
綜合Spark Streaming和Spark SQL,進行word count的統計。核心理解DStream和RDD相互操作,需要通過使用foreachRDD這個API。
2.程式碼
package Spark import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.sql.SparkSession import org.apache.spark.streaming.{Seconds, StreamingContext, Time} /** * spark streaming 整合spark sql完成詞頻統計操作 * https://github.com/apache/spark/blob/v2.1.0/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala */ object SqlNetworkWordCount { def main(args: Array[String]): Unit = { val sparkConf=new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount") /*** * 建立StreamingContext需要sparkConf和batch interval */ val ssc=new StreamingContext(sparkConf,Seconds(5)) val lines = ssc.socketTextStream("bigdata.ibeifeng.com", 6789) val words = lines.flatMap(_.split(" ")) // Convert RDDs of the words DStream to DataFrame and run SQL query words.foreachRDD { (rdd: RDD[String], time: Time) => // Get the singleton instance of SparkSession val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf) import spark.implicits._ // Convert RDD[String] to RDD[case class] to DataFrame val wordsDataFrame = rdd.map(w => Record(w)).toDF() // Creates a temporary view using the DataFrame wordsDataFrame.createOrReplaceTempView("words") // Do word count on table using SQL and print it val wordCountsDataFrame = spark.sql("select word, count(*) as total from words group by word") println(s"========= $time =========") wordCountsDataFrame.show() } ssc.start() ssc.awaitTermination() } /** Case class for converting RDD to DataFrame */ case class Record(word: String) /** Lazily instantiated singleton instance of SparkSession */ object SparkSessionSingleton { @transient private var instance: SparkSession = _ def getInstance(sparkConf: SparkConf): SparkSession = { if (instance == null) { instance = SparkSession .builder .config(sparkConf) .getOrCreate() } instance } } }
3.測試
(1)開啟nc輸入
[[email protected] hadoop-2.7.3]# nc -lk 6789
20180808,ww
20180808,ww
20180808,ww
(2)結果:
========= 1537287530000 ms =========
+-----------+-----+
| word|total|
+-----------+-----+
|20180808,ww| 3|
+-----------+-----+