Spark學習筆記:Spark Streaming與Spark SQL協同工作
阿新 • • 發佈:2019-02-19
Spark Streaming與Spark SQL協同工作
Spark Streaming可以和Spark Core,Spark SQL整合在一起使用,這也是它最強大的一個地方。
例項:實時統計搜尋次數大於3次的搜尋詞
package StreamingDemo import org.apache.log4j.{Level, Logger} import org.apache.spark.SparkConf import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.sql.{Row, SparkSession} import org.apache.spark.streaming.{Seconds, StreamingContext} /** * Spark Streaming與Spark相結合 * 需求:實時統計搜尋次數大於3次的搜尋詞 */ object StreamingAndSQLDemo { def main(args: Array[String]): Unit = { Logger.getLogger("org").setLevel(Level.WARN) System.setProperty("HADOOP_USER_NAME", "Setsuna") val conf = new SparkConf() .setAppName(this.getClass.getSimpleName) .setMaster("local[2]") val ssc = new StreamingContext(conf, Seconds(2)) //開啟checkpoint ssc.checkpoint("hdfs://Hadoop01:9000/checkpoint") //分詞,wordcount,記錄狀態 val resultDStream = ssc .socketTextStream("Hadoop01", 6666) .flatMap(_.split(" ")) .map((_, 1)) .updateStateByKey((values: Seq[Int], state: Option[Int]) => { var count = state.getOrElse(0) for (value <- values) { count += value } Option(count) }) //將wordcount結果存進表裡 resultDStream.foreachRDD(rdd=>{ //建立SparkSession物件 val sparkSession=SparkSession.builder().getOrCreate() //建立Row型別的RDD val rowRDD=rdd.map(x=>Row(x._1,x._2)) //建立schema val schema=StructType(List( StructField("word",StringType,true), StructField("count",IntegerType,true) )) //建立DataFrame,並註冊臨時檢視 sparkSession.createDataFrame(rowRDD,schema). createOrReplaceTempView("wordcount") //進行查詢並在Console裡輸出 sparkSession.sql("select * from wordcount where count>3").show() }) ssc.start() ssc.awaitTermination() } }
測試
在nc裡輸入資料
Console裡的輸出