1. 程式人生 > >Spark修煉之道(進階篇)——Spark入門到精通:第十三節 Spark Streaming—— Spark SQL、DataFrame與Spark Streaming

Spark修煉之道(進階篇)——Spark入門到精通:第十三節 Spark Streaming—— Spark SQL、DataFrame與Spark Streaming

主要內容

  1. Spark SQL、DataFrame與Spark Streaming

1. Spark SQL、DataFrame與Spark Streaming

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Time, Seconds, StreamingContext}
import org.apache.spark.util.IntParam
import
org.apache.spark.sql.SQLContext import org.apache.spark.storage.StorageLevel object SqlNetworkWordCount { def main(args: Array[String]) { if (args.length < 2) { System.err.println("Usage: NetworkWordCount <hostname> <port>") System.exit(1) } StreamingExamples.setStreamingLogLevels() // Create the context with a 2 second batch size
val sparkConf = new SparkConf().setAppName("SqlNetworkWordCount").setMaster("local[4]") val ssc = new StreamingContext(sparkConf, Seconds(2)) // Create a socket stream on target ip:port and count the // words in input stream of \n delimited text (eg. generated by 'nc') // Note that no duplication in storage level only for running locally.
// Replication necessary in distributed scenario for fault tolerance. //Socke作為資料來源 val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER) //words DStream val words = lines.flatMap(_.split(" ")) // Convert RDDs of the words DStream to DataFrame and run SQL query //呼叫foreachRDD方法,遍歷DStream中的RDD words.foreachRDD((rdd: RDD[String], time: Time) => { // Get the singleton instance of SQLContext val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext) import sqlContext.implicits._ // Convert RDD[String] to RDD[case class] to DataFrame val wordsDataFrame = rdd.map(w => Record(w)).toDF() // Register as table wordsDataFrame.registerTempTable("words") // Do word count on table using SQL and print it val wordCountsDataFrame = sqlContext.sql("select word, count(*) as total from words group by word") println(s"========= $time =========") wordCountsDataFrame.show() }) ssc.start() ssc.awaitTermination() } } /** Case class for converting RDD to DataFrame */ case class Record(word: String) /** Lazily instantiated singleton instance of SQLContext */ object SQLContextSingleton { @transient private var instance: SQLContext = _ def getInstance(sparkContext: SparkContext): SQLContext = { if (instance == null) { instance = new SQLContext(sparkContext) } instance } }

執行程式後,再執行下列命令

[email protected]:~# nc -lk 9999
Spark is a fast and general cluster computing system for Big Data
Spark is a fast and general cluster computing system for Big Data
Spark is a fast and general cluster computing system for Big Data
Spark is a fast and general cluster computing system for Big Data
Spark is a fast and general cluster computing system for Big Data
Spark is a fast and general cluster computing system for Big Data
Spark is a fast and general cluster computing system for Big Data

處理結果:


========= 1448783840000 ms =========
+---------+-----+
|     word|total|
+---------+-----+
|    Spark|   12|
|   system|   12|
|  general|   12|
|     fast|   12|
|      and|   12|
|computing|   12|
|        a|   12|
|       is|   12|
|      for|   12|
|      Big|   12|
|  cluster|   12|
|     Data|   12|
+---------+-----+

========= 1448783842000 ms =========
+----+-----+
|word|total|
+----+-----+
+----+-----+

========= 1448783844000 ms =========
+----+-----+
|word|total|
+----+-----+
+----+-----+