1. 程式人生 > >Flink之Window Operation

Flink之Window Operation

目錄

本文API基於1.4以上

Configuring Time Characteristics

非key Stream的window operator並行度為1

Process Time

所有operator會按照系統時間來判斷是否觸發計算。如果作業是在9:15am開始的,且設定了1h的間隔,那麼第一次計算涉及的範圍是9:15am到10:00am,下一次就是10:00am~11:00am(不包含後邊界)

Timestamps and watermarks for event-time applications

timestamps and watermarks可以從SourceFunction或者使用者定義的timestamp assigner和watermark generator產生。第一種就在sourceFunction的run方法中 SourceContex.collectWithTimestamp()

emitWatermark() 來產生。下面介紹第二種:

如果使用timestamp assigner,所有已存在的timestamp和watermark會被覆蓋。

DataStream API提供TimestampAssigner 介面來提取資料的時間戳。timestamp assigner通常會在source function之後呼叫,除非assign timestamp之前不對流進行重新劃分,如keyBy、rebalance等,因之後timestamp就不好處理。assign即使能延後,但也必須在event-time dependent transformation之前,如第一個event-time window。

val readings: DataStream[SensorReading] = env
  .addSource(new SensorSource)
  // assign timestamps and generate watermarks
  .assignTimestampsAndWatermarks(new MyAssigner())

上面的MyAssigner可以是AssignerWithPeriodicWatermarks or AssignerWithPunctuatedWatermarks,兩個都繼承自TimestampAssigner 。產生的新WM都必須大於之前的WM。

ASSIGNER WITH PERIODIC WATERMARKS

// 新增下面設定後,AssignerWithPeriodicWatermarks就會每個隔5s呼叫一次getCurrentWatermark()。如果該方法返回一個non-null值,且該值的timestamp大於前一個WM,那麼新的WM就會被髮送。注意,這必須假設資料的event time時間戳(不是資料時間戳)是遞增的,否則方法返回null,沒有新WM。
env.getConfig.setAutoWatermarkInterval(5000) // 預設200ms

// 下面產生WM是按照當前所接收到的資料中timestamp最大值-1m,即比最大值小一分鐘的資料依然能近視窗,多於1分鐘的就要另外處理了。
class PeriodicAssigner 
    extends AssignerWithPeriodicWatermarks[SensorReading] {

  val bound: Long = 60 * 1000     // 1 min in ms
  var maxTs: Long = Long.MinValue // the maximum observed timestamp

  override def getCurrentWatermark: Watermark = {
    // generated watermark with 1 min tolerance
    new Watermark(maxTs - bound)
    // 下面這個就變成依據process time來建立WM了
    new Watermark(System.currentTimeMillis() - bound)
  }

  override def extractTimestamp(r: SensorReading, previousTS: Long): Long = {
    // update maximum timestamp
    maxTs = maxTs.max(r.timestamp)
    // return record timestamp
    r.timestamp
  }
}

// 這個是上面的簡化版
val output = stream.assignTimestampsAndWatermarks(
 new BoundedOutOfOrdernessTimestampExtractor[MyEvent](
   Time.seconds(60))(_.getCreationTime)
// 如果資料的時間戳不是亂序的,可以用下面方法直接按當前時間作為WM的時間戳
val withTimestampsAndWatermarks = stream
      .assignAscendingTimestamps(_.getCreationTime)

ASSIGNER WITH PUNCTUATED WATERMARKS

// 這個就不需要setAutoWatermarkInterval了,PunctuatedAssigner 對每個資料都先呼叫extractTimestamp 後呼叫 checkAndGetNextWatermark 來判斷是否產生WM
class PunctuatedAssigner 
    extends AssignerWithPunctuatedWatermarks[SensorReading] {

  val bound: Long = 60 * 1000 // 1 min in ms

  // 在extractTimestamp之後呼叫
  override def checkAndGetNextWatermark(
      r: SensorReading, 
      extractedTS: Long): Watermark = {
    if (r.id == "sensor_1") {
      // emit watermark if reading is from sensor_1
      new Watermark(extractedTS - bound)
    } else {
      // do not emit a watermark
      null
    }
  }

  override def extractTimestamp(
      r: SensorReading, 
      previousTS: Long): Long = {
    // assign record timestamp
    r.timestamp
  }
}

Process Functions

如果需要access keyed state and timers 就用PF(v1.4之前有問題)

PF都是實現了RichFunction介面,所以都有開關方法。另外兩個重要方法(他們同步執行,防止併發訪問和修改state)是:

  • processElement(v: IN, ctx: Context, out: Collector[OUT]):對每條資料呼叫。它可以通過Context來訪問資料的timestamp和任務的TimerService,而且還可以釋放記錄到其他出口。
  • onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT]):一個回撥方法,當之前登記的timer被觸發時呼叫。OnTimerContext和上面的Context類似,它還返回trigger的time domain(processing or event time)

The TimerService and Timers

上面兩個的TimerService(即context)有4個方法:

  • currentProcessingTime(): Long returns the current processing time.

  • currentWatermark(): Long returns the timestamp of the current watermark.

  • registerProcessingTimeTimer(timestamp: Long): Unit registers a processing time timer. The timer will fire when the processing time of the executing machine reaches the provided timestamp.

  • registerEventTimeTimer(timestamp: Long)

    上面的register只針對key streams。To use timers on a non-keyed stream, you can create a keyed stream by using a KeySelector with a constant dummy key. Note that this will move all data to a single task such that the operator would be effectively executed with a parallelism of 1.

對每個key可以登記多個timer,timestamp只有一個。登記後不能刪除(1.6後可以)。PF內有所有timer的時間,存放在堆的權重佇列,並將它們作為持久化的function state。timer通常用來根據一段時間後清除key state或者實現自定義time-based windowing邏輯。每個key與timestamp組合的timer只能有一個,比如key1 1:00:00如果重複了,只會執行這種timer一次。這裡涉及timer的登記技巧,將timer的時間round為秒,這樣就不會因資料過多時,每秒產生多個timer。event-time就round為當前WM+1

// 登記
// coalescing到1秒,避免timer頻繁登記
val coalescedTime = ((ctx.timestamp + timeout) / 1000) * 1000
// 針對WM的coalescing
val coalescedTime = ctx.timerService.currentWatermark + 1
ctx.timerService.registerProcessingTimeTimer(coalescedTime)

// 刪除
ctx.timerService.deleteProcessingTimeTimer(timestampOfTimerToStop)
ctx.timerService.deleteEventTimeTimer(timestampOfTimerToStop)

Timers會被checkpoint(1.5同步,1.6非同步checkpoint,除非涉及RocksDB backend / with incremental snapshots / with heap-based timers,最後一個1.7解決),當從失敗重啟,過其的timer會被馬上觸發。

val warnings = readings
  .keyBy(_.id)
  // apply ProcessFunction to monitor temperatures
  .process(new TempIncreaseAlertFunction)

// =================== //

/** Emits a warning if the temperature of a sensor
  * monotonically increases for 5 second (in processing time).
  */
class TempIncreaseAlertFunction
  extends KeyedProcessFunction[String, SensorReading, String] {

  // hold temperature of last sensor reading
  lazy val lastTemp: ValueState[Double] = getRuntimeContext.getState(
      new ValueStateDescriptor[Double]("lastTemp", Types.of[Double]))

  // hold timestamp of currently active timer
  lazy val currentTimer: ValueState[Long] = getRuntimeContext.getState(
      new ValueStateDescriptor[Long]("timer", Types.of[Long]))

  override def processElement(
      r: SensorReading,
      ctx: KeyedProcessFunction[String, SensorReading, String]#Context,
      out: Collector[String]): Unit = {

    // get previous temperature
    val prevTemp = lastTemp.value()
    // update last temperature
    lastTemp.update(r.temperature)

    // 假設溫度都在0度以上。prevTemp == 0.0 為了處理prevTemp初始化時為0的問題,避免一開始就判斷遞增。
    if (prevTemp == 0.0 || r.temperature < prevTemp) {
      // temperature decreased. Invalidate current timer
      currentTimer.update(0L)
    }
    else if (r.temperature > prevTemp && currentTimer.value() == 0) {
      // temperature increased and we have not set a timer yet.
      // set processing time timer for now + 1 second
      val timerTs = ctx.timerService().currentProcessingTime() + 5000
      ctx.timerService().registerProcessingTimeTimer(timerTs)
      // remember current timer
      currentTimer.update(timerTs)
    }
    // 如果前後溫度相等、遞增但是已經登記了一個timer(即currentTimer.value() != 0)就什麼都不幹,避免過多timer,等timer的processtime過了1s後呼叫下面的onTimer
  }

  override def onTimer(
      ts: Long, // timer的timestamp
      ctx: KeyedProcessFunction[String, SensorReading, String]#OnTimerContext,
      out: Collector[String]): Unit = {

    // check if firing timer is current timer
    if (ts == currentTimer.value()) {
      out.collect("Temperature of sensor '" + ctx.getCurrentKey +
        "' monotonically increased for 1 second.")
      // reset current timer
      currentTimer.update(0)
    }
  }
}

// 另一個例子。
// 計算每個key的資料的數量,如果一分鐘(event time)內不再更新資料,則釋放key-count對
val result: DataStream[Tuple2[String, Long]] = stream
  .keyBy(0)
  .process(new CountWithTimeoutFunction())

/**
  * The data type stored in the state
  */
case class CountWithTimestamp(key: String, count: Long, lastModified: Long)

/**
  * The implementation of the ProcessFunction that maintains the count and timeouts
  */
class CountWithTimeoutFunction extends ProcessFunction[(String, String), (String, Long)] {

  /** The state that is maintained by this process function */
  lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext
    .getState(new ValueStateDescriptor[CountWithTimestamp]("myState", classOf[CountWithTimestamp]))


  override def processElement(value: (String, String), ctx: Context, out: Collector[(String, Long)]): Unit = {
    // initialize or retrieve/update the state

    val current: CountWithTimestamp = state.value match {
      case null =>
        CountWithTimestamp(value._1, 1, ctx.timestamp)
      case CountWithTimestamp(key, count, lastModified) =>
        CountWithTimestamp(key, count + 1, ctx.timestamp)
    }

    // write the state back
    state.update(current)

    // schedule the next timer 60 seconds from the current event time
    ctx.timerService.registerEventTimeTimer(current.lastModified + 60000)
  }

  override def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[(String, Long)]): Unit = {
    state.value match {
      case CountWithTimestamp(key, count, lastModified) if (timestamp == lastModified + 60000) =>
        out.collect((key, count))
      case _ =>
    }
  }
}

Emitting to Side Outputs

side output多個流,且可以不同型別,由OutputTag [X]物件標識。

能夠使用該功能的函式有:ProcessFunction、CoProcessFunction、ProcessWindowFunction、ProcessAllWindowFunction

// define a side output tag
val freezingAlarmOutput: OutputTag[String] =
  new OutputTag[String]("freezing-alarms")

// =================== //

val monitoredReadings: DataStream[SensorReading] = readings
  // monitor stream for readings with freezing temperatures
  .process(new FreezingMonitor)

// retrieve and print the freezing alarms
monitoredReadings
  .getSideOutput(freezingAlarmOutput)
  .print()

// print the main output
readings.print()

// =================== //

class FreezingMonitor extends ProcessFunction[SensorReading, SensorReading] {

  override def processElement(
      r: SensorReading,
      ctx: ProcessFunction[SensorReading, SensorReading]#Context,
      out: Collector[SensorReading]): Unit = {
    // emit freezing alarm if temperature is below 32F.
    if (r.temperature < 32.0) {
      ctx.output(freezingAlarmOutput, s"Freezing Alarm for ${r.id}")
    }
    // forward all readings to the regular output
    out.collect(r)
  }
}

The CoProcessFunction

// 下面程式碼的結果是,只有sensor_2和sensor_7的資訊輸出,而且輸出時間只持續10s和1m
// ingest sensor stream
val sensorData: DataStream[SensorReading] = ...

// filter switches enable forwarding of readings
val filterSwitches: DataStream[(String, Long)] = env
  .fromCollection(Seq(
    ("sensor_2", 10 * 1000L), // forward sensor_2 for 10 seconds
    ("sensor_7", 60 * 1000L)) // forward sensor_7 for 1 minute
  )

val forwardedReadings = readings
  .connect(filterSwitches)
  .keyBy(_.id, _._1) // 將兩個stream的資訊聯絡起來了
  .process(new ReadingFilter)

// =============== //

class ReadingFilter
  extends CoProcessFunction[SensorReading, (String, Long), SensorReading] {

  // switch to enable forwarding
  lazy val forwardingEnabled: ValueState[Boolean] =
    getRuntimeContext.getState(
      new ValueStateDescriptor[Boolean]("filterSwitch", Types.of[Boolean])
    )

  // hold timestamp of currently active disable timer
  lazy val disableTimer: ValueState[Long] =
    getRuntimeContext.getState(
      new ValueStateDescriptor[Long]("timer", Types.of[Long])
    )

  override def processElement1(
      reading: SensorReading,
      ctx: CoProcessFunction[SensorReading, (String, Long), SensorReading]#Context,
      out: Collector[SensorReading]): Unit = {
    // check if we may forward the reading
    if (forwardingEnabled.value()) {
      out.collect(reading)
    }
  }

  override def processElement2(
      switch: (String, Long),
      ctx: CoProcessFunction[SensorReading, (String, Long), SensorReading]#Context,
      out: Collector[SensorReading]): Unit = {

    // enable reading forwarding
    forwardingEnabled.update(true)
    // set disable forward timer
    val timerTimestamp = ctx.timerService().currentProcessingTime() + switch._2
    ctx.timerService().registerProcessingTimeTimer(timerTimestamp)
    disableTimer.update(timerTimestamp)
  }

  override def onTimer(
      ts: Long,
      ctx: CoProcessFunction[SensorReading, (String, Long), SensorReading]#OnTimerContext,
      out: Collector[SensorReading]): Unit = {

    if (ts == disableTimer.value()) {
      // remove all state. Forward switch will be false by default.
      forwardingEnabled.clear()
      disableTimer.clear()
    }
  }
}

與join結合

For example, you might be joining customer data to financial trades, while keeping state for the customer data. If you care about having complete and deterministic joins in the face of out-of-order events, you can use a timer to evaluate and emit the join for a trade when the watermark for the customer data stream has passed the time of that trade.

Window Operators

Defining Window Operators

keyed window可以並行,non-keyed window只能單執行緒。實現window operator需要下面兩個元件:

  • Window Assigner:決定資料如何被分組。它會產生WindowStream(keyed)或AllWindowedStream(non-keyed)
  • Window Function:應用到上面兩個產物,並處理被分配到window的資料

Built-in Window Assigners

根據資料的event-time或者processing time來將資料分配到對應的window。當第一個資料分到window時,window才會被建立。window有開始和結束(不包含)timestamp。

Count-based Windows:結果是non-deterministic的。另外,如果沒有自定義Trigger來廢棄incomplete and stale的windows,會出問題。

// tumbling
val avgTemp = sensorData
.keyBy(_.id)
// group readings in 1s event-time windows
// start at 00:15:00, 01:15:00, 02:15:00, etc
// 中國調整時區Time.hours(-8)
.window(TumblingEventTimeWindows.of(Time.hours(1), Time.minutes(15))) // TumblingProcessingTimeWindows也可
.process(new TemperatureAverager)

// sliding 如果slide大於window interval,部分資料會被drop
// create 1h event-time windows every 15 minutes
.window(SlidingEventTimeWindows.of(Time.hours(1), Time.minutes(15)))

// session
// assigner 開始時將所有資料對映到自己的window,以資料的timestamp為開始視窗,session gap作為window大小,最後再融合所有重疊的視窗。也有動態gap可選,就是自定義邏輯來判斷gap長度
.window(EventTimeSessionWindows.withGap(Time.minutes(15)))

Applying Functions on Windows

Incremental Aggregation Functions只保留一個值, ReduceFunction and AggregateFunction實際上這類函式會馬上計算並把計算結果儲存到window,trigger只是出發結果的傳送。當這類函式需要window資訊時,可以與下面函式組合,詳細看下面第4個例子。

Full Window Functions保留所有資料,當計算時,遍歷所有資料,ProcessWindowFunction

ReduceFunction:輸入和輸出型別相同

val minTempPerWindow: DataStream[(String, Double)] = sensorData
    .map(r => (r.id, r.temperature))
    .keyBy(_._1)
    .timeWindow(Time.seconds(15))
    .reduce((r1, r2) => (r1._1, r1._2.min(r2._2)))

AggregateFunction :比上面靈活,類似spark的aggregator

val avgTempPerWindow: DataStream[(String, Double)] = sensorData
  .map(r => (r.id, r.temperature))
  .keyBy(_._1)
  .timeWindow(Time.seconds(15))
  .aggregate(new AvgTempFunction)

// ========= //

class AvgTempFunction
    extends AggregateFunction[(String, Double), (String, Double, Int), (String, Double)] {
  
  override def createAccumulator() = {
    ("", 0.0, 0)
  }

  override def add(in: (String, Double), acc: (String, Double, Int)) = {
    (in._1, in._2 + acc._2, 1 + acc._3)
  }

  override def merge(acc1: (String, Double, Int), acc2: (String, Double, Int)) = {
    (acc1._1, acc1._2 + acc2._2, acc1._3 + acc2._3)
  }
        
  override def getResult(acc: (String, Double, Int)) = {
    (acc._1, acc._2 / acc._3)
  }
}

ProcessWindowFunction

public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> 
    extends AbstractRichFunction {

  // Evaluates the window
  void process(KEY key, Context ctx, Iterable<IN> vals, Collector<OUT> out) throws Exception;

  // Deletes 所有 custom per-window state when the window is purged. 如果要使用下面的windowState就要實現這個方法
  public void clear(Context ctx) throws Exception {}

  // The context holding window metadata.
  public abstract class Context implements Serializable {
      
    // Returns the metadata of the window
    public abstract W window();

    // Returns the current processing time.
    public abstract long currentProcessingTime();

    // Returns the current event-time watermark.
    public abstract long currentWatermark();

    // State accessor for per-window state.
    public abstract KeyedStateStore windowState();

    // State accessor for per-key global state.
    public abstract KeyedStateStore globalState();

    // Emits a record to the side output identified by the OutputTag.
    public abstract <X> void output(OutputTag<X> outputTag, X value);
  }
}

// 例子
// output the lowest and highest temperature reading every 5 seconds
val minMaxTempPerWindow: DataStream[MinMaxTemp] = sensorData
  .keyBy(_.id)
  .timeWindow(Time.seconds(5))
  .process(new HighAndLowTempProcessFunction)

// ========= //

case class MinMaxTemp(id: String, min: Double, max:Double, endTs: Long)

class HighAndLowTempProcessFunction
    extends ProcessWindowFunction[SensorReading, MinMaxTemp, String, TimeWindow] {

  override def process(
      key: String,
      ctx: Context,
      vals: Iterable[SensorReading],
      out: Collector[MinMaxTemp]): Unit = {

    val temps = vals.map(_.temperature)
    val windowEnd = ctx.window.getEnd

    out.collect(MinMaxTemp(key, temps.min, temps.max, windowEnd))
  }
}

Incremental Aggregation and ProcessWindowFunction

當使用incremental aggregation時需要用到window元資料或者state時,可以如下使用

input
  .keyBy(...)
  .timeWindow(...)
  .reduce(incrAggregator: ReduceFunction[IN],
    function: ProcessWindowFunction[IN, OUT, K, W])

input
  .keyBy(...)
  .timeWindow(...)
  .aggregate(incrAggregator: AggregateFunction[IN, ACC, V],
    windowFunction: ProcessWindowFunction[V, OUT, K, W])

// 下面程式碼實現了和上面例子一樣的功能,但不需要儲存5秒內的所有視窗資料
val minMaxTempPerWindow2: DataStream[MinMaxTemp] = sensorData
  .map(r => (r.id, r.temperature, r.temperature))
  .keyBy(_._1)
  .timeWindow(Time.seconds(5))
  .reduce(
    // incrementally compute min and max temperature
    (r1: (String, Double, Double), r2: (String, Double, Double)) => {
      (r1._1, r1._2.min(r2._2), r1._3.max(r2._3))
    },
    // finalize result in ProcessWindowFunction
    new AssignWindowEndProcessFunction()
  )

// ========= //

case class MinMaxTemp(id: String, min: Double, max:Double, endTs: Long)

class AssignWindowEndProcessFunction
  extends ProcessWindowFunction[(String, Double, Double), MinMaxTemp, String, TimeWindow] {

  override def process(
      key: String,
      ctx: Context,
      minMaxIt: Iterable[(String, Double, Double)],
      out: Collector[MinMaxTemp]): Unit = {

    val minMax = minMaxIt.head
    val windowEnd = ctx.window.getEnd
    out.collect(MinMaxTemp(key, minMax._2, minMax._3, windowEnd))
  }
}

Consecutive windowed operations

來自第一次操作的時間視窗[0,5]的結果也將在隨後的視窗化操作中以時間視窗[0,5]結束。 這允許計算每個key window的和,然後在第二個操作中計算同一視窗內的前k個元素。

val input: DataStream[Int] = ...

val resultsPerKey = input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .reduce(new Summer())

val globalResults = resultsPerKey
    .windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
    .process(new TopKWindowFunction())

3> (1,2,2)
3> (1,7,2)
3> (1,12,2)
4> (2,3,1)
4> (2,5,1)

Customizing Window Operators

A window operator with an incremental aggregation function(IAF):資料 => WindowAssigner => IAF => 所屬window => 結果

A window operator with a full window function(FWF):資料 => WindowAssigner => 所屬window => FWF => 結果

A window operator with IAF and FWF:資料 => WindowAssigner => IAF => 所屬window => FWF => 結果

trigger 是window的元件,定義什麼時候計算window和傳送結果。每當資料進入window都會被傳到trigger。所以可以根據被分配的elements或者登記的timer來觸發對window資料的計算。

Evictor是FWF的可選元件,用了IAF就不能用。在函式的呼叫前、後呼叫,可以遍歷window中的元素。通常用在GlobalWindow。更多看官網

上圖的一切都可能是在一個大window裡,即tumbling window之類的。

// define a keyed window operator,non-keyed去掉keyBy即可
stream
       .keyBy(...)               // versus non-keyed windows
       .window(...)              // "assigner"
      [.trigger(...)]            // "trigger" (覆蓋 default trigger)
      [.evictor(...)]            // "evictor" (else no evictor)
      [.allowedLateness(...)]    // "lateness" (else zero)
      [.sideOutputLateData(...)] // "output tag" (else no side output for late data)
       .reduce/aggregate/fold/apply()      // required: "function"
      [.getSideOutput(...)]      // optional: "output tag"

Window Lifecycle

A window is created when a WindowAssigner assigns the first element to it. window包含以下內容:

  • content存IAF的結果或者FWF的待計算資料
  • object(即window本身):WindowAssigner 會返回零、一個、多個objects。window operator根據這些objects來劃分資料,因為這些object區分了不同的window,且記錄了window何時可以被刪除
  • trigger的timer:用來在達到timer時點時回撥計算window或者清空window content。timer由window operator維護
  • 自定義trigger的state:完全由trigger控制,與window operator無關。當window被刪除時,trigger記得要呼叫.clear()來清除這部分state。

window operator在window的end time(event還是process time取決於WindowAssigner.isEventTime)時刪除window,自動刪除window content和所有登記在該window的timer。

Window Assigners

//  a custom assigner for 30 seconds tumbling event-time window
class ThirtySecondsWindows
    extends WindowAssigner[Object, TimeWindow] {

  val windowSize: Long = 30 * 1000L

  override def assignWindows(
      o: Object,
      ts: Long,
      ctx: WindowAssigner.WindowAssignerContext): java.util.List[TimeWindow] = {

    // rounding down by 30 seconds
    val startTime = ts - (ts % windowSize)
    val endTime = startTime + windowSize
    // emitting the corresponding time window 每30秒一個新window
    Collections.singletonList(new TimeWindow(startTime, endTime))
  }

  override def getDefaultTrigger(
      env: environment.StreamExecutionEnvironment): Trigger[Object, TimeWindow] = {
    EventTimeTrigger.create()
  }

  override def getWindowSerializer(
      executionConfig: ExecutionConfig): TypeSerializer[TimeWindow] = {
    new TimeWindow.Serializer
  }

  override def isEventTime = true
}

還有MergingWindowAssigner ,之前提到的sessionwindow就用到這個介面,下面有更詳細的說明。

When merging windows, you need to ensure that the state of all merging windows and their triggers is also approriately merged. The Trigger interface features a callback method that is invoked when windows are merged to merge state that is associated to the windows.

Global Windows:將所有elements都放到同一個全域性window,且NeverTrigger。這種window需要自定義trigger或者加上evictor來選擇除去window state中的什麼element。另外,如果運用到keyedStream,還要注意window會為每個key留下state。

Trigger

Triggers define when a window is evaluated and its results are emitted. A trigger can decide to fire based on progress in time or data-specific conditions. Triggers have access to time properties, timers, and can work with state. 所以它可以根據一些條件,如某個資料的某個特定值、兩個資料同時出現在5秒鐘內、先提前顯示結果等。每當trigger被呼叫,都會產生一個TriggerResult,它的選項有:CONTINUE, FIRE, PURGE, FIRE_AND_PURGE。PURGE指清理window的內容,而不是整個window。

刪除window前要清楚所有的state,之後就不行了。在MergingWindowAssigner 中,要實現canMerge()和onMerge()

/** A trigger that fires early. The trigger fires at most every second. */
class OneSecondIntervalTrigger
    extends Trigger[SensorReading, TimeWindow] {

  override def onElement(
      r: SensorReading,
      timestamp: Long,
      window: TimeWindow,
      ctx: Trigger.TriggerContext): TriggerResult = {

    // firstSeen will be false if not set yet. 這個PartitionedState是對應key和window的state
    val firstSeen: ValueState[Boolean] = ctx.getPartitionedState(
      new ValueStateDescriptor[Boolean]("firstSeen", createTypeInformation[Boolean]))

    // register initial timer only for first element
    if (!firstSeen.value()) {
      // compute time for next early firing by rounding watermark to second
      val t = ctx.getCurrentWatermark + (1000 - (ctx.getCurrentWatermark % 1000))
      ctx.registerEventTimeTimer(t)
      // register timer for the window end
      ctx.registerEventTimeTimer(window.getEnd)
      firstSeen.update(true)
    }
    // Continue. Do not evaluate per element
    TriggerResult.CONTINUE
  }

  override def onEventTime(
      timestamp: Long,
      window: TimeWindow,
      ctx: Trigger.TriggerContext): TriggerResult = {
    if (timestamp == window.getEnd) {
      // 由於覆蓋了預設,所以這裡要手動清空
      TriggerResult.FIRE_AND_PURGE
    } else {
      // register next early firing timer
      val t = ctx.getCurrentWatermark + (1000 - (ctx.getCurrentWatermark % 1000))
      if (t < window.getEnd) {
        ctx.registerEventTimeTimer(t)
      }
      // fire trigger to evaluate window
      TriggerResult.FIRE
    }
  }

  override def onProcessingTime(
      timestamp: Long,
      window: TimeWindow,
      ctx: Trigger.TriggerContext): TriggerResult = {
    // Continue. We don't use processing time timers
    TriggerResult.CONTINUE
  }

  override def clear(
      window: TimeWindow,
      ctx: Trigger.TriggerContext): Unit = {

    // clear trigger state. This method is called when a window is purged
    val firstSeen: ValueState[Boolean] = ctx.getPartitionedState(
      new ValueStateDescriptor[Boolean]("firstSeen", createTypeInformation[Boolean]))
    firstSeen.clear()
  }
}

Joining Streams on Time(v1.5)

input1.join(input2)
  .where(...)       // specify key attributes for input1
  .equalTo(...)     // specify key attributes for input2
  .window(...)      // specify the WindowAssigner
 [.trigger(...)]    // optional: specify a Trigger
 [.evictor(...)]    // optional: specify an Evictor
  .apply(...)       // specify the JoinFunction

當Trigger.FIRE,JoinFunction就會被呼叫。JF會對每一對events進行join,而CoGroupFunction會針對整個window,用迭代器來完成join。

those elements that do get joined will have as their timestamp the largest timestamp that still lies in the respective window. For example a window with [5, 10) as its boundaries would result in the joined elements having 9 as their timestamp.

Sliding Join

Interval Join

join之後的時間戳為兩個element中最大的那個

在tumbling中用join,由於window是一刀切的,即使定義了join條件為相隔1s,有時兩個events還是會被分到不同window而不能實現join。但可以嘗試用CoProcessFunction來實現自定義join邏輯。

Handling Late Data

三種處理方式:drop(event-time window operator的預設實現,如果是非window operator,就用process function來判斷是否超時,然後作出相應的處理)、傳送到其他流、更新原結果

// redirect:在process前插入一個.sideOutputLateData(OutputTag),然後就可以對結果流呼叫.getSideOutput(OutputTag)
// define an output tag for late sensor readings
val lateReadingsOutput: OutputTag[SensorReading] =
  new OutputTag[SensorReading]("late-readings")

val readings: DataStream[SensorReading] = ???

val countPer10Secs: DataStream[(String, Long, Int)] = readings
  .keyBy(_.id)
  .timeWindow(Time.seconds(10))
  // emit late readings to a side output
  .sideOutputLateData(lateReadingsOutput)
  // count readings per window
  .process(new CountFunction())

// retrieve the late events from the side output as a stream
val lateStream: DataStream[SensorReading] = countPer10Secs
  .getSideOutput(lateReadingsOutput)

// 也可以直接在Process Function中處理
class LateReadingsFilter 
    extends ProcessFunction[SensorReading, SensorReading] {

  override def processElement(
      r: SensorReading,
      ctx: ProcessFunction[SensorReading, SensorReading]#Context,
      out: Collector[SensorReading]): Unit = {

    // compare record timestamp with current watermark
    if (r.timestamp < ctx.timerService().currentWatermark()) {
      // this is a late reading => redirect it to the side output
      ctx.output(lateReadingsOutput, r)
    } else {
      out.collect(r)
    }
  }
}

Updating Results by Including Late Events

allowedLateness,並在接下來的ProcessWindowFunction中判斷是否為第一個答案(用window的state來儲存flag)

下面同樣可以通過Process Function實現

val readings: DataStream[SensorReading] = ???

val countPer10Secs: DataStream[(String, Long, Int, String)] = readings
  .keyBy(_.id)
  .timeWindow(Time.seconds(10))
  // process late readings for 5 additional seconds 這樣window不會被刪除,其state會被保留。與延長watermark相比,這個會發送多個結果,因window到時間時已近fire了,但late資料的到來又會觸發fire。
  .allowedLateness(Time.seconds(5))
  // count readings and update results if late readings arrive
  .process(new UpdatingWindowCountFunction)

// =================== //

/** A counting WindowProcessFunction that distinguishes between 
  * first results and updates. */
class UpdatingWindowCountFunction
    extends ProcessWindowFunction[SensorReading, (String, Long, Int, String), String, TimeWindow] {

  override def process(
      id: String,
      ctx: Context,
      elements: Iterable[SensorReading],
      out: Collector[(String, Long, Int, String)]): Unit = {

    // count the number of readings
    val cnt = elements.count(_ => true)

    // state to check if this is the first evaluation of the window or not.
    val isUpdate = ctx.windowState.getState(
      new ValueStateDescriptor[Boolean]("isUpdate", Types.of[Boolean]))

    if (!isUpdate.value()) {
      // first evaluation, emit first result
      out.collect((id, ctx.window.getEnd, cnt, "first"))
      isUpdate.update(true)
    } else {
      // not the first evaluation, emit an update
      out.collect((id, ctx.window.getEnd, cnt, "update"))
    }
  }
}

notice

Flink creates one copy of each element per window to which it belongs. 所以a sliding window of size 1 day and slide 1 second might not be a good idea.

參考
Stream Processing with Apache Flink by Vasiliki Kalavri; Fabian Hueske