1. 程式人生 > >Spark Streaming 滑動視窗

Spark Streaming 滑動視窗

Spark Streaming提供了滑動視窗操作的支援,從而讓我們可以對一個滑動視窗內的資料執行計算操作。每次掉落在視窗內的RDD的資料,會被聚合起來執行計算操作,然後生成的RDD,會作為window DStream的一個RDD。

網官圖中所示,就是對每三秒鐘的資料執行一次滑動視窗計算,這3秒內的3個RDD會被聚合起來進行處理,然後過了兩秒鐘,又會對最近三秒內的資料執行滑動視窗計算。所以每個滑動視窗操作,都必須指定兩個引數,視窗長度以及滑動間隔,而且這兩個引數值都必須是batch間隔的整數倍。

Spark Streaming對滑動視窗的支援,是比Storm更加完善和強大的。

 

之前有些朋友問:

spark官網圖片中: 滑動視窗寬度是3個時間單位,滑動時間是2兩個單位,這樣的話中間time3的Dstream不是重複計算了嗎? 

Answer:比如下面這個例子是針對熱搜的應用場景,官方的例子也可能是是針對不同的場景給出了的。如果你不想出現重疊的部分,把滑動間隔由2改成3即可

SparkStreaming對滑動視窗支援的轉換操作:

 示例講解:

1、window(windowLength, slideInterval)

該操作由一個DStream物件呼叫,傳入一個視窗長度引數,一個視窗移動速率引數,然後將當前時刻當前長度視窗中的元素取出形成一個新的DStream。

下面的程式碼以長度為3,移動速率為1擷取源DStream中的元素形成新的DStream。

val windowWords = words.window(Seconds( 3 ), Seconds( 1))

基本上每秒輸入一個字母,然後取出當前時刻3秒這個長度中的所有元素,打印出來。從上面的截圖中可以看到,下一秒時已經看不到a了,再下一秒,已經看不到b和c了。表示a, b, c已經不在當前的視窗中。

2、 countByWindow(windowLength,slideInterval)

返回指定長度視窗中的元素個數。

程式碼如下,統計當前3秒長度的時間視窗的DStream中元素的個數:

val windowWords = words.countByWindow(Seconds( 3 ), Seconds( 1))

3、 reduceByWindow(func, windowLength,slideInterval)

類似於上面的reduce操作,只不過這裡不再是對整個呼叫DStream進行reduce操作,而是在呼叫DStream上首先取視窗函式的元素形成新的DStream,然後在視窗元素形成的DStream上進行reduce。

val windowWords = words.reduceByWindow(_ + "-" + _, Seconds( 3) , Seconds( 1 ))

4、 reduceByKeyAndWindow(func,windowLength, slideInterval, [numTasks])

呼叫該操作的DStream中的元素格式為(k, v),整個操作類似於前面的reduceByKey,只不過對應的資料來源不同,reduceByKeyAndWindow的資料來源是基於該DStream的視窗長度中的所有資料。該操作也有一個可選的併發數引數。

下面程式碼中,將當前長度為3的時間視窗中的所有資料元素根據key進行合併,統計當前3秒中內不同單詞出現的次數。

val windowWords = pairs.reduceByKeyAndWindow((a:Int , b:Int) => (a + b) , Seconds(3 ) , Seconds( 1 ))

5、 reduceByKeyAndWindow(func, invFunc,windowLength, slideInterval, [numTasks])

這個視窗操作和上一個的區別是多傳入一個函式invFunc。前面的func作用和上一個reduceByKeyAndWindow相同,後面的invFunc是用於處理流出rdd的。

在下面這個例子中,如果把3秒的時間視窗當成一個池塘,池塘每一秒都會有魚遊進或者游出,那麼第一個函式表示每由進來一條魚,就在該類魚的數量上累加。而第二個函式是,每由出去一條魚,就將該魚的總數減去一。

val windowWords = pairs.reduceByKeyAndWindow((a: Int, b:Int ) => (a + b) , (a:Int, b: Int) => (a - b) , Seconds( 3 ), Seconds( 1 ))

下面是演示結果,最終的結果是該3秒長度的視窗中歷史上出現過的所有不同單詞個數都為0。

段時間不輸入任何資訊,看一下最終結果

 

6、 countByValueAndWindow(windowLength,slideInterval, [numTasks])

類似於前面的countByValue操作,呼叫該操作的DStream資料格式為(K, v),返回的DStream格式為(K, Long)。統計當前時間視窗中元素值相同的元素的個數。

val windowWords = words.countByValueAndWindow(Seconds( 3 ), Seconds( 1))

示例二:熱點搜尋詞滑動統計,每隔10秒鐘,統計最近60秒鐘的搜尋詞的搜尋頻次,並打印出排名最靠前的3個搜尋詞以及出現次數

Scala版本:

 

packagecom.spark.streaming    import org.apache.spark.streaming.Seconds import org.apache.spark.streaming.StreamingContext  import org.apache.spark.SparkConf   /**  * @author Ganymede  */ object WindowHotWordS {    def main(args: Array[String]): Unit = {      val conf = newSparkConf().setAppName("WindowHotWordS").setMaster("local[2]")        //Scala中,建立的是StreamingContext      val ssc = new StreamingContext(conf,Seconds(5))        val searchLogsDStream =ssc.socketTextStream("spark1", 9999)       val searchWordsDStream =searchLogsDStream.map { searchLog => searchLog.split(" ")(1)}        val searchWordPairDStream = searchWordsDStream.map{ searchWord => (searchWord, 1) }       // reduceByKeyAndWindow      // 第二個引數,是視窗長度,這是是60秒      // 第三個引數,是滑動間隔,這裡是10秒      // 也就是說,每隔10秒鐘,將最近60秒的資料,作為一個視窗,進行內部的RDD的聚合,然後統一對一個RDD進行後續計算     // 而是隻是放在那裡      // 然後,等待我們的滑動間隔到了以後,10秒到了,會將之前60秒的RDD,因為一個batch間隔是5秒,所以之前60秒,就有12個RDD,給聚合起來,然後統一執行reduceByKey操作      // 所以這裡的reduceByKeyAndWindow,是針對每個視窗執行計算的,而不是針對 某個DStream中的RDD     // 每隔10秒鐘,出來之前60秒的收集到的單詞的統計次數      val searchWordCountsDStream =searchWordPairDStream.reduceByKeyAndWindow((v1: Int, v2: Int) => v1 + v2,Seconds(60), Seconds(10))              val finalDStream =searchWordCountsDStream.transform(searchWordCountsRDD => {        val countSearchWordsRDD =searchWordCountsRDD.map(tuple => (tuple._2, tuple._1))        val sortedCountSearchWordsRDD =countSearchWordsRDD.sortByKey(false)       val sortedSearchWordCountsRDD =sortedCountSearchWordsRDD.map(tuple => (tuple._1, tuple._2))        val top3SearchWordCounts =sortedSearchWordCountsRDD.take(3)         for (tuple <-top3SearchWordCounts) {          println("result : " +tuple)        }         searchWordCountsRDD      })       finalDStream.print()        ssc.start()      ssc.awaitTermination()    } }