1. 程式人生 > >Spark Streaming 實戰案例(三) DStream Window操作

Spark Streaming 實戰案例(三) DStream Window操作


  1. Window Operation
  2. 入門案例

1. Window Operation

Spark Streaming提供視窗操作(Window Operation),如下圖所示:
上圖中,紅色實線表示視窗當前的滑動位置,虛線表示前一次視窗位置,視窗每滑動一次,落在該視窗中的RDD被一起同時處理,生成一個視窗DStream(windowed DStream),視窗操作需要設定兩個引數:
(1)視窗長度(window length),即視窗的持續時間,上圖中的視窗長度為3
(2)滑動間隔(sliding interval),視窗操作執行的時間間隔,上圖中的滑動間隔為2
這兩個引數必須是原始DStream 批處理間隔(batch interval)的整數倍(上圖中的原始DStream的batch interval為1)

2. 入門案例

  1. WindowWordCount——reduceByKeyAndWindow方法使用
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._

object WindowWordCount {
  def main(args: Array[String]) {
    //傳入的引數為localhost 9999
30 10 if (args.length != 4) { System.err.println("Usage: WindowWorldCount <hostname> <port> <windowDuration> <slideDuration>") System.exit(1) } StreamingExamples.setStreamingLogLevels() val conf = new SparkConf().setAppName("WindowWordCount").setMaster
("local[4]") val sc = new SparkContext(conf) // 建立StreamingContext,batch interval為5秒 val ssc = new StreamingContext(sc, Seconds(5)) //Socket為資料來源 val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_ONLY_SER) val words = lines.flatMap(_.split(" ")) // windows操作,對視窗中的單詞進行計數 val wordCounts = words.map(x => (x , 1)).reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(args(2).toInt), Seconds(args(3).toInt)) wordCounts.print() ssc.start() ssc.awaitTermination() } }

通過下列程式碼啟動netcat server

root@sparkmaster:~# nc -lk 9999
root@sparkmaster:~# nc -lk 9999
Spark is a fast and general cluster computing system for Big Data. It provides
Time: 1448778805000 ms(10秒,第一個滑動視窗時間)

Time: 1448778815000 ms(10秒後,第二個滑動視窗時間)

Time: 1448778825000 ms(10秒後,第三個滑動視窗時間)

Time: 1448778835000 ms(再經10秒後,超出window length視窗長度,不在計數範圍內)

Time: 1448778845000 ms


[email protected]:~# nc -lk 9999
Spark is a fast and general cluster computing system for Big Data. It provides
Spark is a fast and general cluster computing system for Big Data. It provides
Spark is a fast and general cluster computing system for Big Data. It provides


Time: 1448779205000 ms


[email protected]:~# nc -lk 9999
Spark is a fast and general cluster computing system for Big Data. It provides
Spark is a fast and general cluster computing system for Big Data. It provides
Spark is a fast and general cluster computing system for Big Data. It provides
Spark is a fast and general cluster computing system for Big Data. It provides


Time: 1448779215000 ms


[email protected]:~# nc -lk 9999
Spark is a fast and general cluster computing system for Big Data. It provides
Spark is a fast and general cluster computing system for Big Data. It provides
Spark is a fast and general cluster computing system for Big Data. It provides
Spark is a fast and general cluster computing system for Big Data. It provides
Spark is a fast and general cluster computing system for Big Data. It provides


Time: 1448779225000 ms

Time: 1448779235000 ms

Time: 1448779245000 ms

Time: 1448779255000 ms

Time: 1448779265000 ms

2 WindowWordCount——countByWindow方法使用

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._

object WindowWordCount {
  def main(args: Array[String]) {
    if (args.length != 4) {
      System.err.println("Usage: WindowWorldCount <hostname> <port> <windowDuration> <slideDuration>")

    val conf = new SparkConf().setAppName("WindowWordCount").setMaster("local[2]")
    val sc = new SparkContext(conf)

    // 建立StreamingContext
    val ssc = new StreamingContext(sc, Seconds(5))
    // 定義checkpoint目錄為當前目錄

    val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_ONLY_SER)
    val words = lines.flatMap(_.split(" "))

    val countByWindow=words.countByWindow(Seconds(args(2).toInt), Seconds(args(3).toInt))



root@sparkmaster:~# nc -lk 9999


root@sparkmaster:~# nc -lk 9999
Spark is a fast and general cluster computing system for Big Data


Time: 1448780625000 ms

Time: 1448780635000 ms

Time: 1448780645000 ms

Time: 1448780655000 ms

Time: 1448780665000 ms

Time: 1448780675000 ms

3 WindowWordCount——reduceByWindow方法使用

 val reduceByWindow=words.map(x=>1).reduceByWindow(_+_,_-_Seconds(args(2).toInt), Seconds(args(3).toInt))


def countByWindow(
      windowDuration: Duration,
      slideDuration: Duration): DStream[Long] = ssc.withScope {
    this.map(_ => 1L).reduceByWindow(_ + _, _ - _, windowDuration, slideDuration)


def reduceByWindow(
      reduceFunc: (T, T) => T,
      invReduceFunc: (T, T) => T,
      windowDuration: Duration,
      slideDuration: Duration
    ): DStream[T] = ssc.withScope {
      this.map(x => (1, x))
          .reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration, 1)


   * Return a new DStream by applying incremental `reduceByKey` over a sliding window.
   * The reduced value of over a new window is calculated using the old window's reduced value :
   *  1. reduce the new values that entered the window (e.g., adding new counts)
   *  2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
   * This is more efficient than reduceByKeyAndWindow without "inverse reduce" function.
   * However, it is applicable to only "invertible reduce functions".
   * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
   * @param reduceFunc associative reduce function
   * @param invReduceFunc inverse reduce function
   * @param windowDuration width of the window; must be a multiple of this DStream's
   *                       batching interval
   * @param slideDuration  sliding interval of the window (i.e., the interval after which
   *                       the new DStream will generate RDDs); must be a multiple of this
   *                       DStream's batching interval
   * @param filterFunc     Optional function to filter expired key-value pairs;
   *                       only pairs that satisfy the function are retained
  def reduceByKeyAndWindow(
      reduceFunc: (V, V) => V,
      invReduceFunc: (V, V) => V,
      windowDuration: Duration,
      slideDuration: Duration = self.slideDuration,
      numPartitions: Int = ssc.sc.defaultParallelism,
      filterFunc: ((K, V)) => Boolean = null
    ): DStream[(K, V)] = ssc.withScope {
      reduceFunc, invReduceFunc, windowDuration,
      slideDuration, defaultPartitioner(numPartitions), filterFunc


//然後進行疊加,得出這個視窗中的單詞統計。 這種方式被稱為疊加方式,如下圖左邊所示
val wordCounts = words.map(x => (x, 1)).reduceByKeyAndWindow(_ + _, Seconds(5s),seconds(1))

//再減去[t-2,t-1]的統計量,這種方法可以複用中間三秒的統計量,提高統計的效率。 這種方式被稱為增量方式,如下圖的右邊所示
val wordCounts = words.map(x => (x, 1)).reduceByKeyAndWindow(_ + _, _ - _, Seconds(5s),seconds(1))
  • 1




