1. 程式人生 > >大數據筆記(三十一)——SparkStreaming詳細介紹

大數據筆記(三十一)——SparkStreaming詳細介紹

rgs 啟動 gui sele big 虛擬 ring foreach 單詞


Spark Streaming: Spark用於處理流式數據的模塊,類似Storm

核心:DStream(離散流),就是一個RDD
============================================
一、Spark Streaming基礎
1、什麽是Spark Streaming?
(*)Spark Streaming makes it easy to build scalable fault-tolerant streaming applications.
(*)常見的流式處理框架
(1)Apache Storm
(2)Spark Streaming
(3)JStorm:阿裏巴巴
(4)Flink:可以很好的管理內存

(*)離線計算和流式計算各自的特點
典型代表 數據的采集 數據源(結果)
離線計算: MR、Spark Core Sqoop 批量操作
流式計算: Storm等等 Flume(Kafka) 實時性

(*)典型的流式計算的框架:參考Hadoop的課件:P91

2、簡介Spark Streaming內部結構
技術分享圖片

3、演示Demo:NetworkWordCount 處理的是流式數據
(*)工具:netcat
(*)文檔:http://spark.apache.org/docs/latest/streaming-programming-guide.html#a-quick-example
(*)步驟:啟動兩個窗口
第一個窗口中:

bin/run-example streaming.NetworkWordCount bigdata11 9999

第二個窗口中:啟動消息服務器(先啟動)

nc -l -p 9999


註意:如果要演示成功,保證虛擬機的CPU的核數至少2以上
技術分享圖片

運行:

技術分享圖片

4、開發自己的NetworkWordCount程序

技術分享圖片

 1 package main.scala.demo
 2 
 3 import org.apache.spark.SparkConf
 4 import org.apache.spark.storage.StorageLevel
 5 import org.apache.spark.streaming.{Seconds, StreamingContext}
 6 
 7 /**
 8   * Created by YOGA on 2018/2/27.
 9   */
10 object MyNetworkWordCount {
11   def main(args: Array[String]) {
12 //核心:通過StreamingContext對象,去創建一個DStream 13 //DStream從外部接收數據(使用的是Linux上的netcat工具) 14 15 //創建一個SparkConf對象 16 //local[2]:相當於有兩個工作線程,一個接收一個發送 17 val sparkconf = new SparkConf() 18 .setAppName("MyNetworkWordCount") 19 .setMaster("local[2]") 20 21 //創建StreamContext,表示每隔三秒采集一次數據 22 val ssc = new StreamingContext(sparkconf,Seconds(3)) 23 24 //創建DStream,看成一個輸入流 25   //IP,端口,緩存到硬盤 26 27 val lines = ssc.socketTextStream("192.168.153.11",1234,StorageLevel.MEMORY_AND_DISK_SER) 28 29 //執行WordCount 30 val words = lines.flatMap(_.split(" ")) 31 32 //使用transform完成同樣的計數,相當於map操作 33 //val wordPair = words.transform(x=>x.map(x=>(x,1))) 34 //val wordCount = wordPair.reduceByKey(_+_) 35 val wordCount = words.map((_,1)).reduceByKey(_+_) 36 37 /* 38 * 參數一:執行運算 39 * 參數二:窗口的大小 40 * 參數三:創建滑動的距離 41 * 42 * 例子:每9秒鐘,把過去30秒的數據進行wordcount 43 * 註意:第二個參數 第三個參數 必須是采樣頻率的整數倍 44 * */ 45 //val wordCount = words.map((_,1)).reduceByKeyAndWindow((a:Int,b:Int)=>(a+b),Seconds(30),Seconds(9)) 46 //輸出 47 wordCount.print() 48 49 //啟動StreamingContext 50 ssc.start() 51 52 //等待計算完成 53 ssc.awaitTermination() 54 } 55 56 }

二、Spark Streaming進階

bin/spark-shell --master spark://bigdata11:7077
1、類:StreamingContext(類似:Spark Context、SQLContext)
上下文對象

創建的方式:
(1)通過SparkConf來創建

val sparkconf = new SparkConf().setAppName("MyNetworkWordCount").setMaster("local[2]")

//創建StreamingContext,表示每隔3秒采集一次數據
val ssc = new StreamingContext(sparkconf,Seconds(3))    

(2)通過SparkContext對象來創建

import org.apache.spark.streaming.{Seconds, StreamingContext}
val ssc = new StreamingContext(sc,Seconds(3))

說明:
(1)setMaster("local[2]")
(2)當創建StreamingContext對象,內部會創建一個SparkContext對象
(3)當StreamingContext開始執行,不能添加新的任務
(4)同一個時刻上,JVM只能有一個活動的StreamingContext

2、DStream(離散流):把連續的數據流,變成不連續的離散流,表現形式就是RDD
簡單來說:把連續的變成不連續的

技術分享圖片


操作:Transformation和Action
? (*)transform(func)
? 通過RDD-to-RDD函數作用於源DStream中的各個RDD,可以是任意的RDD操作,從而返回一個新的RDD

改寫上面WordCount例子,屏蔽35行

//使用transform完成同樣的計數,相當於map操作
33     val wordPair = words.transform(x=>x.map(x=>(x,1)))
34     val wordCount = wordPair.reduceByKey(_+_)

(*)?updateStateByKey(func)
可以進行累加操作。方法:設置檢查點,定義一個累加功能的函數

 1 package main.scala.demo
 2 
 3 import org.apache.spark.SparkConf
 4 import org.apache.spark.storage.StorageLevel
 5 import org.apache.spark.streaming.{Seconds, StreamingContext}
 6 
 7 /**
 8   * Created by YOGA on 2018/2/28.
 9   */
10 object MyTotalNetworkWordCount {
11   def main(args: Array[String]) {
12     val sparkconf = new SparkConf()
13       .setAppName("MyNetworkWordCount")
14       .setMaster("local[2]")
15 
16     //創建StreamContext,表示每隔三秒采集一次數據
17     val ssc = new StreamingContext(sparkconf,Seconds(3))
18 
19     //註意:如果累計,在執行計算的時候,需要保持之前的狀態信息
20     //設置檢查點
21     ssc.checkpoint("hdfs://192.168.153.11:9000/spark/checkpoint0228")
22 
23     //創建DStream,看成一個輸入流
24     val lines = ssc.socketTextStream("192.168.153.11",1234,StorageLevel.MEMORY_AND_DISK_SER)
25 
26     //執行WordCount
27     val words = lines.flatMap(_.split(" "))
28 
29     //每個單詞記一次數
30     val pairs = words.map((_,1))
31 
32     //定義一個函數,進行累加
33     //參數:1、當前的值 2、之前的值
34     val addFunc = (currentValues:Seq[Int],preValues:Option[Int]) =>{
35       //得到當前的值
36       val currentCount = currentValues.sum
37 
38       //先得到之前的值
39       val preCount = preValues.getOrElse(0)
40 
41       //返回累加結果
42       Some(currentCount + preCount)
43     }
44 
45     //統計每個單詞出現的頻率:累計
46     val totalCount = pairs.updateStateByKey(addFunc)
47     totalCount.print()
48 
49     //啟動任務
50     ssc.start()
51     ssc.awaitTermination()
52 
53   }
54 }


3、窗口操作

技術分享圖片

技術分享圖片

例子:每9秒鐘,把過去30秒的數據進行WordCount
註釋上面的代碼35行,放開下面一行代碼

/*
38     * 參數一:執行運算
39     * 參數二:窗口的大小
40     * 參數三:創建滑動的距離
41     *
42     * 例子:每9秒鐘,把過去30秒的數據進行wordcount
43     * 註意:第二個參數 第三個參數 必須是采樣頻率的整數倍,采樣頻率3s
44     * */
45     val wordCount = words.map((_,1)).reduceByKeyAndWindow((a:Int,b:Int)=>(a+b),Seconds(30),Seconds(9))

4、輸入和輸出
(1)輸入:接收器接收外部數據源的數據
(*)基本數據源:文件流、RDD隊列流、Socket流
(*)高級數據源:Kafka、Flume
文件流:監聽一個目錄,當目錄下的文件發生變化的時候,將變化的數據讀入DStream

package main.scala.demo

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * Created by YOGA on 2018/2/28.
  */
object MyFileDStream {
  def main(args: Array[String]) {
    //創建一個SparkConf對象
    //local[2]:相當於有兩個工作線程,一個接收一個發送
    val sparkconf = new SparkConf()
      .setAppName("MyNetworkWordCount")
      .setMaster("local[2]")

    //創建StreamContext,表示每隔三秒采集一次數據
    val ssc = new StreamingContext(sparkconf,Seconds(3))

      //監聽一個目錄,當目錄下的文件發生變化的時候,將變化的數據讀入DStream
    val lines = ssc.textFileStream("D:\\temp\\aaa")

    lines.print()

    ssc.start()
    ssc.awaitTermination()
  }
}

RDD隊列流queueStream

:定義一個for循環,生成RDD放入隊列

package main.scala.demo

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.collection.mutable
import scala.collection.mutable.Queue
import org.apache.spark.rdd.RDD
/**
  * Created by YOGA on 2018/2/28.
  */
object MyRDDQueueDStream {
  def main(args: Array[String]){
    val sparkconf = new SparkConf()
      .setAppName("MyNetworkWordCount")
      .setMaster("local[2]")

    //創建StreamContext,表示每隔三秒采集一次數據
    val ssc = new StreamingContext(sparkconf,Seconds(3))

    //創建一個隊列,把生成RDD放入隊列
    val rddQueue = new mutable.Queue[RDD[Int]]()
    //初始化
    for(i <- 1 to 3){
      rddQueue += ssc.sparkContext.makeRDD(1 to 10)

      //讓線程睡幾秒
      Thread.sleep(3000)

    }

    //創建一個RDD的DStream
    val inputStream = ssc.queueStream(rddQueue)
    //處理:乘以10
    val result = inputStream.map(x=> (x,x*10))
    result.print()

    ssc.start()
    ssc.awaitTermination()
  }
}

運行:

技術分享圖片


(2)輸出操作

技術分享圖片


5、集成DataFrame和SQL: 使用SparkSQL的方式處理流式數據

把RDD轉換成DataFrame,並生成臨時表,然後就可以進行SQL查詢

 1 package main.scala.demo
 2 
 3 import org.apache.spark.SparkConf
 4 import org.apache.spark.sql.SparkSession
 5 import org.apache.spark.storage.StorageLevel
 6 import org.apache.spark.streaming.{Seconds, StreamingContext}
 7 
 8 /**
 9   * Created by YOGA on 2018/2/28.
10   */
11 object MyNetWorkWordCountBySQL {
12   def main(args: Array[String]) {
13     //核心:通過StreamingContext對象,去創建一個DStream
14     //DStream從外部接收數據(使用的是Linux上的netcat工具)
15 
16     //創建一個SparkConf對象
17     //local[2]:相當於有兩個工作線程,一個接收一個發送
18     val sparkconf = new SparkConf()
19       .setAppName("MyNetworkWordCount")
20       .setMaster("local[2]")
21 
22     //創建StreamContext,表示每隔三秒采集一次數據
23     val ssc = new StreamingContext(sparkconf,Seconds(3))
24 
25     //創建DStream,看成一個輸入流
26     val lines = ssc.socketTextStream("192.168.153.11",1234,StorageLevel.MEMORY_AND_DISK_SER)
27 
28     //得到的所有單詞
29     val words = lines.flatMap(_.split(" "))
30     //val wordPair = words.transform(x=> x.map(x=>(x,1)))
31     //val wordCount = wordPair.reduceByKey(_+_)
32 
33     //使用sparkSQL處理Spark Streaming的數據
34     words.foreachRDD(rdd =>{
35       //使用SparkSession來創建
36       val spark = SparkSession.builder()
37                     .config(rdd.sparkContext.getConf)
38                     .getOrCreate()
39 
40       //需要把RDD轉成一個DataFrame
41       import spark.implicits._
42       val wordCountDF = rdd.toDF("word")
43 
44       //註冊成一個表
45       wordCountDF.createOrReplaceTempView("words")
46 
47       //執行SQL
48       val result = spark.sql("select * from words group by word")
49       result.show()
50 
51       Thread.sleep(5000)
52     })
53 
54 
55     //啟動StreamingContext
56     ssc.start()
57 
58     //等待計算完成
59     ssc.awaitTermination()
60   }
61 }

大數據筆記(三十一)——SparkStreaming詳細介紹