1. 程式人生 > >Spark Streaming整合Spark SQL之wordcount案例

Spark Streaming整合Spark SQL之wordcount案例

完整原始碼地址:  https://github.com/apache/spark/blob/v2.3.2/examples/src/main/scala/org/apache/spark/examples/streaming/SqlNetworkWordCount.scala

案例原始碼:

package cn.ysjh

import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.{Seconds, StreamingContext, Time}

object SparkStreamingSql {

  def main(args: Array[String]): Unit = {

    val cf: SparkConf = new SparkConf().setAppName("SparkStreamingSql").setMaster("local[2]")

    val streaming: StreamingContext = new StreamingContext(cf, Seconds(5))


    val lines = streaming.socketTextStream("192.168.220.134", 6789)
    val words = lines.flatMap(_.split(" "))

    // 將單詞DStream的RDD轉換為DataFrame並執行SQL查詢
    words.foreachRDD { (rdd: RDD[String], time: Time) =>
      // 獲取SparkSession的單例例項
      val spark = SparkSessionSingleton.getInstance(rdd.sparkContext.getConf)
      import spark.implicits._

      //將RDD [String]轉換為RDD [case class]到DataFrame
      val wordsDataFrame = rdd.map(w => Record(w)).toDF()

      // 使用DataFrame建立臨時檢視
      wordsDataFrame.createOrReplaceTempView("words")

      // 使用SQL對錶進行單詞計數並列印它
      val wordCountsDataFrame =
        spark.sql("select word, count(*) as total from words group by word")
      println(s"========= $time =========")
      wordCountsDataFrame.show()
    }


    streaming.start()

    streaming.awaitTermination()

  }


//  將RDD轉換為DataFrame的案例類
  case class Record(word: String)


// 例項化SparkSession的單例例項
  object SparkSessionSingleton {

    @transient private var instance: SparkSession = _

    def getInstance(sparkConf: SparkConf): SparkSession = {
      if (instance == null) {
        instance = SparkSession
          .builder
          .config(sparkConf)
          .getOrCreate()
      }
      instance
    }

  }

}

可以看出將Spark Streaming中接收到的資料建立成表,然後使用Spark SQL來進行一系列的操作,在實際生產中使用的非常多

執行截圖:

這裡仍然使用netcat來產生socket資料進行測試