1. 程式人生 > >Spark學習筆記:Spark Streaming與Spark SQL協同工作

Spark學習筆記:Spark Streaming與Spark SQL協同工作

Spark Streaming與Spark SQL協同工作

Spark Streaming可以和Spark Core,Spark SQL整合在一起使用,這也是它最強大的一個地方。

例項:實時統計搜尋次數大於3次的搜尋詞

package StreamingDemo

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * Spark Streaming與Spark相結合
  * 需求:實時統計搜尋次數大於3次的搜尋詞
  */
object StreamingAndSQLDemo {
  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.WARN)
    System.setProperty("HADOOP_USER_NAME", "Setsuna")
    val conf = new SparkConf()
      .setAppName(this.getClass.getSimpleName)
      .setMaster("local[2]")
    val ssc = new StreamingContext(conf, Seconds(2))

    //開啟checkpoint
    ssc.checkpoint("hdfs://Hadoop01:9000/checkpoint")

    //分詞,wordcount,記錄狀態
    val resultDStream =
      ssc
        .socketTextStream("Hadoop01", 6666)
        .flatMap(_.split(" "))
        .map((_, 1))
        .updateStateByKey((values: Seq[Int], state: Option[Int]) => {
          var count = state.getOrElse(0)
          for (value <- values) {
            count += value
          }

          Option(count)
        })

    //將wordcount結果存進表裡
    resultDStream.foreachRDD(rdd=>{
      //建立SparkSession物件
      val sparkSession=SparkSession.builder().getOrCreate()
      //建立Row型別的RDD
      val rowRDD=rdd.map(x=>Row(x._1,x._2))
      //建立schema
      val schema=StructType(List(
        StructField("word",StringType,true),
        StructField("count",IntegerType,true)
      ))
      //建立DataFrame,並註冊臨時檢視
      sparkSession.createDataFrame(rowRDD,schema).
        createOrReplaceTempView("wordcount")
      //進行查詢並在Console裡輸出
      sparkSession.sql("select * from wordcount where count>3").show()
    })
    ssc.start()
    ssc.awaitTermination()
  }
}

測試
在nc裡輸入資料

Console裡的輸出