1. 程式人生 > >Flink流計算程式設計--Session Window實戰

Flink流計算程式設計--Session Window實戰

1、session window簡介

Flink從1.1開始支援Session window,它是屬於基於時間的視窗。

這裡以EventTime為例,基於時間的視窗,可以分為3種:TumblingEventTimeWindows,SlidingEventTimeWindows和EventTimeSessionWindows。

對於Tumbling與Sliding視窗,其視窗的時間大小是固定的,例如10秒鐘一個視窗,那麼視窗中開始時間和結束時間一定是一個10秒的間隔,例如從10:00:00到10:00:10。Sliding的視窗大小也是固定的,例如每隔10秒鐘統計過去20秒的資料,那麼它的視窗也是從10:00:00到10:00:20,大小是20秒。

而Session window的視窗大小,則是由資料本身決定。例如,基於同一個key,有如下幾條資料,其自身時間戳如下:

key,10:00:00
key,10:00:03
key,10:00:05
key,10:00:12
key,10:00:15
key,10:00:24
key,10:00:30
key,10:00:42
.....

那麼,假設Session Window的時間gap如果是6秒,那麼,上面的資料會被分成以下幾個視窗(視窗開始時間以及視窗結束時間,視窗內記錄數):

視窗1:(key,10:00:00,10:00:11,3)
視窗2:(key,10:00:12,10:00:21,2)
視窗3:(key,10:00:24,10:00:30,1)
視窗4:(key,10:00
:30,10:00:42,1) ......

可以看到,session window只需要設定一個時間間隔(gap)即可定義一個session window機制。

2、session window視窗分析

下面我們來分析下上邊的資料。
首先,我們設定的時間gap是6秒,那麼,當相鄰的記錄相差>=6秒時,則觸發視窗。
對於第一條記錄與第二條記錄,其時間間隔是3秒,那麼這兩條記錄屬於同一個視窗內,此時並不觸發視窗;第二條與第三條記錄,間隔2秒,也不觸發視窗;第三條與第四條記錄,間隔>=6秒(7秒),此時,視窗被觸發了。
繼續,第四條記錄與第五條記錄間隔3秒,不觸發;第五條與第六條間隔9秒,觸發;
繼續,第六條與第七條間隔6秒,觸發;
繼續,第七條與第八條間隔12秒,觸發。

到此,上邊這些資料被劃分到不同的視窗中,每個視窗的大小也不一樣。

那麼,每個視窗的時間範圍有沒有什麼共性?我們可以按照下面的公式來計算每個視窗的時間範圍:

視窗大小=[第一條資料的時間,第一個與相鄰資料相差大於等於gap的時間+gap)

看似有點難以理解,其實現實的意義就是:視窗內包含的資料是“活躍的”。

例如,使用者點選行為,如果認為30秒間隔使用者沒有操作,則認為是不活躍的。那麼通過session window,定義一個30秒的gap,此時,每個視窗內的資料,都是使用者在活躍期間的資料,超過30秒了沒有任何操作,則認為使用者不活躍,有可能下線。

3、session window在Flink中的實現

上面的介紹有點繁瑣,不夠言簡意賅,那麼我們直接看程式碼。
資料介紹:在程式碼之前,介紹下資料,指數資料,正常情況每隔3秒產生一條,如果達到6秒甚至更多實踐才產生資料,則認為有gap,此時說明指數的交易不夠頻繁,不夠活躍。

程式碼如下:

import java.text.SimpleDateFormat

import org.apache.flink.streaming.api.scala._
import DataTypes.StockIndex
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.RichWindowFunction
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import toptrade.DataStreamOperator.CommonOperator
import toptrade.kafkaInOut.KafkaConsumeToptrade



object SessionWindowTest {

  // *************************************************************************
  // main函式
  // *************************************************************************

  def main(args : Array[String]) : Unit = {

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val source = new KafkaConsumeToptrade()

    val indexString = source.indexDataStream(env).name("Index").setParallelism(4)
    val indexDataStream = new CommonOperator().mapIndexToDataStreamPOJO(indexString).filter(f=>f.lastIndex != 0L && f.totalVolume != 0L).setParallelism(8).name("index filter")

    val watermarkIndex = indexDataStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[StockIndex] {

      var currentMaxTimestamp = 0L
      val maxOutOfOrderness = 10000L

      override def getCurrentWatermark: Watermark = {
        new Watermark(currentMaxTimestamp - maxOutOfOrderness)
      }

      override def extractTimestamp(t: StockIndex, l: Long): Long = {
        val timestamp = t.time
        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp)
        timestamp
      }
    })
      .name("index watermark")
      .setParallelism(8)


    val sessionWindow = watermarkIndex
      .keyBy(_.code)
      .window(EventTimeSessionWindows.withGap(Time.seconds(6)))
      .apply(new IndexSessionWindow)
      .setParallelism(8)


    sessionWindow.print().setParallelism(1)

    env.execute()

  }

// *************************************************************************
  // SessionWindow Function
  // *************************************************************************
  class IndexSessionWindow extends RichWindowFunction[StockIndex,(String,String,String,String,String,Int),String,TimeWindow]{

    var state : ValueState[IndexSumTest] = null
    var size = 0

    val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")

    override def open(config : Configuration) : Unit = {

      state = getRuntimeContext.getState(new ValueStateDescriptor[IndexSumTest]("snapshot State", classOf[IndexSumTest], null))
    }

    override def apply(key: String, window: TimeWindow, input: Iterable[StockIndex], out: Collector[(String, String, String, String,String,Int)]): Unit = {
      //init
      if(state.value() == null){
        state.update(IndexSumTest(0))
      }else{
        size = state.value().size
      }

      val list = input.toList.sortBy(_.time)

      val window_start_time = format.format(window.getStart)
      val window_end_time = format.format(window.getEnd)
      val window_size = input.size

      size = size + window_size
      state.update(IndexSumTest(size))


      out.collect((key,window_start_time,window_end_time,format.format(list.head.time),format.format(list.last.time),size))
    }
  }

  // *************************************************************************
  // Case Class
  // *************************************************************************
  case class IndexSumTest(size : Int)

}

session window function的實現,輸出的內容代表:(key,視窗開始時間,視窗結束時間,視窗內最早的一條資料的時間,視窗內最後一條資料時間,同一個key的累計個數)。

4、session window的輸出結果

上面的結果,輸出如下(抽取了其中一小部分):

(990857,2016-09-23 14:31:03.000,2016-09-23 14:31:09.000,2016-09-23 14:31:03.000,2016-09-23 14:31:03.000,5)
(990857,2016-09-23 14:31:49.000,2016-09-23 14:31:55.000,2016-09-23 14:31:49.000,2016-09-23 14:31:49.000,6)
(990857,2016-09-23 14:32:09.000,2016-09-23 14:32:20.000,2016-09-23 14:32:09.000,2016-09-23 14:32:14.000,8)
(990857,2016-09-23 14:32:29.000,2016-09-23 14:32:35.000,2016-09-23 14:32:29.000,2016-09-23 14:32:29.000,9)
(990857,2016-09-23 14:32:39.000,2016-09-23 14:32:45.000,2016-09-23 14:32:39.000,2016-09-23 14:32:39.000,10)
(990857,2016-09-23 14:32:49.000,2016-09-23 14:32:55.000,2016-09-23 14:32:49.000,2016-09-23 14:32:49.000,11)
(990857,2016-09-23 14:33:04.000,2016-09-23 14:33:10.000,2016-09-23 14:33:04.000,2016-09-23 14:33:04.000,12)
(990857,2016-09-23 14:33:14.000,2016-09-23 14:33:20.000,2016-09-23 14:33:14.000,2016-09-23 14:33:14.000,13)
(990857,2016-09-23 14:33:29.000,2016-09-23 14:33:35.000,2016-09-23 14:33:29.000,2016-09-23 14:33:29.000,14)
(990857,2016-09-23 14:33:39.000,2016-09-23 14:33:45.000,2016-09-23 14:33:39.000,2016-09-23 14:33:39.000,15)
(990857,2016-09-23 14:33:49.000,2016-09-23 14:33:55.000,2016-09-23 14:33:49.000,2016-09-23 14:33:49.000,16)
(990857,2016-09-23 14:34:04.000,2016-09-23 14:34:10.000,2016-09-23 14:34:04.000,2016-09-23 14:34:04.000,17)
(990857,2016-09-23 14:34:14.000,2016-09-23 14:34:20.000,2016-09-23 14:34:14.000,2016-09-23 14:34:14.000,18)

我們以第三個視窗為例來說明:
第三個視窗中有2條記錄(8-6),最早的一條記錄時間是:2016-09-23 14:32:09.000,最後的一條記錄時間是:2016-09-23 14:32:14.000。相差5秒,因此這兩條資料沒有達到6秒的間隔,所以這兩條資料一定屬於同一個視窗。下一條資料可以觀察下一個視窗的開始時間:2016-09-23 14:32:29.000,比第三個視窗的最後一條的時間多了15秒,因此才產生了第三個視窗。第三個視窗的結束時間是:2016-09-23 14:32:20.000,正好是視窗內最後一個數據時間+gap的時間。

由此也驗證了我們上邊提高的公式。

不過作為視窗結束時間,在實際中的用處不大,只是gap內部記錄的一個時間戳而已,僅做觸發條件使用。

5、參考