1. 程式人生 > >Spark Streaming 實戰案例(三) DStream Window操作

Spark Streaming 實戰案例(三) DStream Window操作

本節主要內容

  1. Window Operation
  2. 入門案例

1. Window Operation

Spark Streaming提供視窗操作(Window Operation),如下圖所示:
這裡寫圖片描述
上圖中,紅色實線表示視窗當前的滑動位置,虛線表示前一次視窗位置,視窗每滑動一次,落在該視窗中的RDD被一起同時處理,生成一個視窗DStream(windowed DStream),視窗操作需要設定兩個引數:
(1)視窗長度(window length),即視窗的持續時間,上圖中的視窗長度為3
(2)滑動間隔(sliding interval),視窗操作執行的時間間隔,上圖中的滑動間隔為2
這兩個引數必須是原始DStream 批處理間隔(batch interval)的整數倍(上圖中的原始DStream的batch interval為1)

2. 入門案例

  1. WindowWordCount——reduceByKeyAndWindow方法使用
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._

object WindowWordCount {
  def main(args: Array[String]) {
    //傳入的引數為localhost 9999
30 10 if (args.length != 4) { System.err.println("Usage: WindowWorldCount <hostname> <port> <windowDuration> <slideDuration>") System.exit(1) } StreamingExamples.setStreamingLogLevels() val conf = new SparkConf().setAppName("WindowWordCount").setMaster
("local[4]") val sc = new SparkContext(conf) // 建立StreamingContext,batch interval為5秒 val ssc = new StreamingContext(sc, Seconds(5)) //Socket為資料來源 val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_ONLY_SER) val words = lines.flatMap(_.split(" ")) // windows操作,對視窗中的單詞進行計數 val wordCounts = words.map(x => (x , 1)).reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(args(2).toInt), Seconds(args(3).toInt)) wordCounts.print() ssc.start() ssc.awaitTermination() } }

通過下列程式碼啟動netcat server

root@sparkmaster:~# nc -lk 9999
  • 1

再執行WindowWordCount
輸入下列語句

root@sparkmaster:~# nc -lk 9999
Spark is a fast and general cluster computing system for Big Data. It provides
  • 1

觀察執行情況:

-------------------------------------------
Time: 1448778805000 ms(10秒,第一個滑動視窗時間)
-------------------------------------------
(provides,1)
(is,1)
(general,1)
(Big,1)
(fast,1)
(cluster,1)
(Data.,1)
(computing,1)
(Spark,1)
(a,1)
...

-------------------------------------------
Time: 1448778815000 ms(10秒後,第二個滑動視窗時間)
-------------------------------------------
(provides,1)
(is,1)
(general,1)
(Big,1)
(fast,1)
(cluster,1)
(Data.,1)
(computing,1)
(Spark,1)
(a,1)
...

-------------------------------------------
Time: 1448778825000 ms(10秒後,第三個滑動視窗時間)
-------------------------------------------
(provides,1)
(is,1)
(general,1)
(Big,1)
(fast,1)
(cluster,1)
(Data.,1)
(computing,1)
(Spark,1)
(a,1)
...

-------------------------------------------
Time: 1448778835000 ms(再經10秒後,超出window length視窗長度,不在計數範圍內)
-------------------------------------------

-------------------------------------------
Time: 1448778845000 ms
-------------------------------------------

同樣的語句輸入兩次

[email protected]:~# nc -lk 9999
Spark is a fast and general cluster computing system for Big Data. It provides
Spark is a fast and general cluster computing system for Big Data. It provides
Spark is a fast and general cluster computing system for Big Data. It provides

觀察執行結果如下:

Time: 1448779205000 ms
-------------------------------------------
(provides,2)
(is,2)
(general,2)
(Big,2)
(fast,2)
(cluster,2)
(Data.,2)
(computing,2)
(Spark,2)
(a,2)
...

再輸入一次

[email protected]:~# nc -lk 9999
Spark is a fast and general cluster computing system for Big Data. It provides
Spark is a fast and general cluster computing system for Big Data. It provides
Spark is a fast and general cluster computing system for Big Data. It provides
Spark is a fast and general cluster computing system for Big Data. It provides

計算結果如下:


-------------------------------------------
Time: 1448779215000 ms
-------------------------------------------
(provides,3)
(is,3)
(general,3)
(Big,3)
(fast,3)
(cluster,3)
(Data.,3)
(computing,3)
(Spark,3)
(a,3)
...

再輸入一次

[email protected]:~# nc -lk 9999
Spark is a fast and general cluster computing system for Big Data. It provides
Spark is a fast and general cluster computing system for Big Data. It provides
Spark is a fast and general cluster computing system for Big Data. It provides
Spark is a fast and general cluster computing system for Big Data. It provides
Spark is a fast and general cluster computing system for Big Data. It provides

計算結果如下:


-------------------------------------------
Time: 1448779225000 ms
-------------------------------------------
(provides,4)
(is,4)
(general,4)
(Big,4)
(fast,4)
(cluster,4)
(Data.,4)
(computing,4)
(Spark,4)
(a,4)
...

-------------------------------------------
Time: 1448779235000 ms
-------------------------------------------
(provides,2)
(is,2)
(general,2)
(Big,2)
(fast,2)
(cluster,2)
(Data.,2)
(computing,2)
(Spark,2)
(a,2)
...

-------------------------------------------
Time: 1448779245000 ms
-------------------------------------------
(provides,1)
(is,1)
(general,1)
(Big,1)
(fast,1)
(cluster,1)
(Data.,1)
(computing,1)
(Spark,1)
(a,1)
...

-------------------------------------------
Time: 1448779255000 ms
-------------------------------------------

-------------------------------------------
Time: 1448779265000 ms
-------------------------------------------

2 WindowWordCount——countByWindow方法使用


import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._

object WindowWordCount {
  def main(args: Array[String]) {
    if (args.length != 4) {
      System.err.println("Usage: WindowWorldCount <hostname> <port> <windowDuration> <slideDuration>")
      System.exit(1)
    }
    StreamingExamples.setStreamingLogLevels()

    val conf = new SparkConf().setAppName("WindowWordCount").setMaster("local[2]")
    val sc = new SparkContext(conf)

    // 建立StreamingContext
    val ssc = new StreamingContext(sc, Seconds(5))
    // 定義checkpoint目錄為當前目錄
    ssc.checkpoint(".")


    val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_ONLY_SER)
    val words = lines.flatMap(_.split(" "))

    //countByWindowcountByWindow方法計算基於滑動視窗的DStream中的元素的數量。
    val countByWindow=words.countByWindow(Seconds(args(2).toInt), Seconds(args(3).toInt))

    countByWindow.print()
    ssc.start()
    ssc.awaitTermination()
  }
}

啟動

root@sparkmaster:~# nc -lk 9999

然後執行WindowWordCount
輸入

root@sparkmaster:~# nc -lk 9999
Spark is a fast and general cluster computing system for Big Data

察看執行結果:

-------------------------------------------
Time: 1448780625000 ms
-------------------------------------------
0

-------------------------------------------
Time: 1448780635000 ms
-------------------------------------------
12

-------------------------------------------
Time: 1448780645000 ms
-------------------------------------------
12

-------------------------------------------
Time: 1448780655000 ms
-------------------------------------------
12

-------------------------------------------
Time: 1448780665000 ms
-------------------------------------------
0

-------------------------------------------
Time: 1448780675000 ms
-------------------------------------------
0

3 WindowWordCount——reduceByWindow方法使用

//reduceByWindow方法基於滑動視窗對源DStream中的元素進行聚合操作,返回包含單元素的一個新的DStream。
 val reduceByWindow=words.map(x=>1).reduceByWindow(_+_,_-_Seconds(args(2).toInt), Seconds(args(3).toInt))

上面的例子其實是countByWindow的實現,可以在countByWindow原始碼實現中得到驗證

def countByWindow(
      windowDuration: Duration,
      slideDuration: Duration): DStream[Long] = ssc.withScope {
    this.map(_ => 1L).reduceByWindow(_ + _, _ - _, windowDuration, slideDuration)
  }

而reduceByWindow又是通過reduceByKeyAndWindow方法來實現的,具體程式碼如下

def reduceByWindow(
      reduceFunc: (T, T) => T,
      invReduceFunc: (T, T) => T,
      windowDuration: Duration,
      slideDuration: Duration
    ): DStream[T] = ssc.withScope {
      this.map(x => (1, x))
          .reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, 1)
          .map(_._2)
  }

與前面的例子中的reduceByKeyAndWindow方法不同的是這裡的reduceByKeyAndWindow方法多了一個invReduceFunc引數,方法完整原始碼如下:

 /**
   * Return a new DStream by applying incremental `reduceByKey` over a sliding window.
   * The reduced value of over a new window is calculated using the old window's reduced value :
   *  1. reduce the new values that entered the window (e.g., adding new counts)
   *
   *  2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
   *
   * This is more efficient than reduceByKeyAndWindow without "inverse reduce" function.
   * However, it is applicable to only "invertible reduce functions".
   * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
   * @param reduceFunc associative reduce function
   * @param invReduceFunc inverse reduce function
   * @param windowDuration width of the window; must be a multiple of this DStream's
   *                       batching interval
   * @param slideDuration  sliding interval of the window (i.e., the interval after which
   *                       the new DStream will generate RDDs); must be a multiple of this
   *                       DStream's batching interval
   * @param filterFunc     Optional function to filter expired key-value pairs;
   *                       only pairs that satisfy the function are retained
   */
  def reduceByKeyAndWindow(
      reduceFunc: (V, V) => V,
      invReduceFunc: (V, V) => V,
      windowDuration: Duration,
      slideDuration: Duration = self.slideDuration,
      numPartitions: Int = ssc.sc.defaultParallelism,
      filterFunc: ((K, V)) => Boolean = null
    ): DStream[(K, V)] = ssc.withScope {
    reduceByKeyAndWindow(
      reduceFunc, invReduceFunc, windowDuration,
      slideDuration, defaultPartitioner(numPartitions), filterFunc
    )
  }

具體來講,下面兩個方法得到的結果是一樣的,只是效率不同,後面的方法方式效率更高:

//以過去5秒鐘為一個輸入視窗,每1秒統計一下WordCount,本方法會將過去5秒鐘的每一秒鐘的WordCount都進行統計
//然後進行疊加,得出這個視窗中的單詞統計。 這種方式被稱為疊加方式,如下圖左邊所示
val wordCounts = words.map(x => (x, 1)).reduceByKeyAndWindow(_ + _, Seconds(5s),seconds(1))

//計算t+4秒這個時刻過去5秒視窗的WordCount,可以將t+3時刻過去5秒的統計量加上[t+3,t+4]的統計量
//再減去[t-2,t-1]的統計量,這種方法可以複用中間三秒的統計量,提高統計的效率。 這種方式被稱為增量方式,如下圖的右邊所示
val wordCounts = words.map(x => (x, 1)).reduceByKeyAndWindow(_ + _, _ - _, Seconds(5s),seconds(1))
  • 1

這裡寫圖片描述

DStream支援的全部Window操作方法如下:
這裡寫圖片描述

相關推薦

Spark Streaming 實戰案例) DStream Window操作

本節主要內容 Window Operation入門案例1. Window Operation Spark Streaming提供視窗操作(Window Operation),如下圖所示: 上圖中,紅色實線表示視窗當前的滑動位置,虛線表示前一次視窗位置,視窗每滑動一次,落在

Spark Streaming筆記整理):DS的transformation與output操作

job watermark number 這樣的 格式 current fix work eat DStream的各種transformation Transformation Meaning map(func) 對DStream中的各個元素進行func函數操作,然後

Spark Streaming程式設計指南

DStreams轉換(Transformation) 和RDD類似,轉換中允許輸入DStream中的資料被修改。DStream支援很多Spark RDD上的轉換。常用的轉換如下。 轉換 含義 map(func) 將源DS

Java多線程編程模式實戰指南):Two-phase Termination模式

增加 row throws mgr 額外 finally join table 還需 停止線程是一個目標簡單而實現卻不那麽簡單的任務。首先,Java沒有提供直接的API用於停止線程。此外,停止線程時還有一些額外的細節需要考慮,如待停止的線程處於阻塞(等待鎖)或者等待狀態(等

Spring Boot實戰筆記)-- Spring常用配置Bean的初始化和銷毀、Profile)

div nbsp troy string 實例化 public ive work 初始 一、Bean的初始化和銷毀   在我們的實際開發的時候,經常會遇到Bean在使用之前或之後做些必要的操作,Spring對Bean的生命周期操作提供了支持。在使用Java配置和註解配置下提

shell腳本案例)利用top命令精確監控cpu使用率

cpu 監控 shell linux 自動化 需求:利用top命令精確監控CPU 準備知識:top使用、基本的awk、dc(默認bash shell不支持小數點運算) 腳本如下 [root@arppining scripts]# cat cpu.sh #!/bin/bash - # t

Spark SQL筆記整理):加載保存功能與Spark SQL函數

code ren maven依賴 append 關聯 dfs 取值 struct nal 加載保存功能 數據加載(json文件、jdbc)與保存(json、jdbc) 測試代碼如下: package cn.xpleaf.bigdata.spark.scala.sql.p1

scala spark-streaming整合kafka spark 2.3 kafka 0.10)

obj required word 錯誤 prope apache rop sta move Maven組件如下: <dependency> <groupId>org.apache.spark</groupId> <

spark RDD常用運算元

- first、take、collect、count、top、takeOrdered、foreach、fold、reduce、countByValue、lookup 演算法解釋 first:返回第一個元素 take:rdd.t

大資料之電話日誌分析callLog案例

一、查詢使用者最近的通話資訊 -------------------------------------------- 1.實現分析 使用ssm可視介面提供查詢串 -- controller連線 hiveserver2 -- 將命令轉化成hsql語句 -- hive繫結hba

.Net Core 在 Linux-Centos上的部署實戰教程

fuse use yun 失敗 重新 nginx配置 reload cat 實戰 綁定域名,利用Nginx反向代理來操作 1.安裝Nginx yun install nginx 安裝成功 2.啟動nginx   service nginx

python實戰演練)購物車程序

%s end src mage imp sdi gin gif 名稱 一.實現功能 作業需求: 用戶入口:1.商品信息存在文件裏2.已購商品,余額記錄。第一次啟動程序時需要記錄工資,第二次啟動程序時談出上次余額3.允許用戶根據商品編號購買商品4.用戶選擇商品後,檢測是否夠,

MySql必知必會實戰練習)資料過濾 MySql必知必會實戰練習二)資料檢索

  在之前的部落格MySql必知必會實戰練習(一)表建立和資料新增中完成了各表的建立和資料新增,MySql必知必會實戰練習(二)資料檢索中介紹了所有的資料檢索操作,下面對資料過濾操作進行總結。 1. where子句操作符   等於: =    不等於: != 或 <>   小於:

Spark採坑系列Spark操作Hive的坑

跟著教學試著用Idea程式設計,實現Spark查詢Hive中的表。結果上來就涼了。 搗鼓好久都不行,在網上查有說將hive-site.xml放到resource目錄就行,還有什麼hadoop針對windows使用者的許可權問題,結果都是扯淡。 其實問題還是處在程式碼上,直接附上程式碼了

Java 由淺入深GUI程式設計實戰練習

一,專案介紹 1.可以檢視年,月,日等功能。能獲取今天的日期,並且能夠通過下拉年,月的列表。 2.當程式執行時,顯示的時間是系統當前時間。 3.可以手動輸入時間,確定後系統跳轉到制定的時間。 4.提供一種點選功能,通過點選實現年份,月份的自增和自減功能。 二,執行介面 三,程式碼詳情

Spark基礎-scala學習

面向物件程式設計之Trait trait基礎知識 將trait作為介面使用 在trait中定義具體方法 在trait中定義具體欄位 在trait中定義抽象欄位 trait高階知識 為例項物件混入trait trait呼叫鏈 在trait中覆蓋抽象方法 混合使用t

微服務架構實戰):Spring boot2.0 + Mybatis + PageHelper實現增刪改查和分頁查詢功能

簡介 該專案主要利用Spring boot2.0 +Mybatis + PageHelper實現增刪改查和分頁查詢功能,快速搭建一套和資料庫互動的專案。 小工具一枚,歡迎使用和Star支援,如使用過程中碰到問題,可以提出Issue,我會盡力完善該Starter 版本基礎

《Python3網絡爬蟲實戰案例崔慶才著)》 中文版PDF下載,附源代碼+視頻教程

圖片 網絡爬蟲 51cto 視頻教程 下載 網絡 png image ref 《Python3網絡爬蟲實戰案例(崔慶才著)》中文版PDF下載,附源代碼+視頻教程,帶目錄資料下載:https://pan.baidu.com/s/1OzxyHQMLOzWFMzjdQ8kEqQ《

Netty實戰手冊

上篇已經講述瞭如何搭建基礎的服務結構,現在瞭解下如何完成與客戶端的通訊模型。 首先,在HandlerService中,處理接收來自客戶端的訊息: @Override public void receive( ChannelHandlerContext _ctx , Object _o

實戰演練

1.羅馬數字與整數之間的轉換 羅馬數字包含以下七種字元:I,V,X,L,C,D和M 字元 數值 I 1 V 5 X 10 L 50 C 100 D 500 M 1000 例如,羅馬數字2寫做 II,即為兩個並排放置的的1、12寫做XII,即為 X + II,27寫做XXVII,即為X