Flink之Window Operation
目錄
- Configuring Time Characteristics
- Process Functions
- Window Operators
- Applying Functions on Windows
- Consecutive windowed operations
- Customizing Window Operators
- Joining Streams on Time(v1.5)
- Handling Late Data
- notice
本文API基於1.4以上
Configuring Time Characteristics
非key Stream的window operator並行度為1
Process Time
所有operator會按照系統時間來判斷是否觸發計算。如果作業是在9:15am開始的,且設定了1h的間隔,那麼第一次計算涉及的範圍是9:15am到10:00am,下一次就是10:00am~11:00am(不包含後邊界)
Timestamps and watermarks for event-time applications
timestamps and watermarks可以從SourceFunction或者使用者定義的timestamp assigner和watermark generator產生。第一種就在sourceFunction的run方法中 SourceContex.collectWithTimestamp()
emitWatermark()
來產生。下面介紹第二種:
如果使用timestamp assigner,所有已存在的timestamp和watermark會被覆蓋。
DataStream API提供TimestampAssigner
介面來提取資料的時間戳。timestamp assigner通常會在source function之後呼叫,除非assign timestamp之前不對流進行重新劃分,如keyBy、rebalance等,因之後timestamp就不好處理。assign即使能延後,但也必須在event-time dependent transformation之前,如第一個event-time window。
val readings: DataStream[SensorReading] = env
.addSource(new SensorSource)
// assign timestamps and generate watermarks
.assignTimestampsAndWatermarks(new MyAssigner())
上面的MyAssigner可以是AssignerWithPeriodicWatermarks
or AssignerWithPunctuatedWatermarks
,兩個都繼承自TimestampAssigner
。產生的新WM都必須大於之前的WM。
ASSIGNER WITH PERIODIC WATERMARKS
// 新增下面設定後,AssignerWithPeriodicWatermarks就會每個隔5s呼叫一次getCurrentWatermark()。如果該方法返回一個non-null值,且該值的timestamp大於前一個WM,那麼新的WM就會被髮送。注意,這必須假設資料的event time時間戳(不是資料時間戳)是遞增的,否則方法返回null,沒有新WM。
env.getConfig.setAutoWatermarkInterval(5000) // 預設200ms
// 下面產生WM是按照當前所接收到的資料中timestamp最大值-1m,即比最大值小一分鐘的資料依然能近視窗,多於1分鐘的就要另外處理了。
class PeriodicAssigner
extends AssignerWithPeriodicWatermarks[SensorReading] {
val bound: Long = 60 * 1000 // 1 min in ms
var maxTs: Long = Long.MinValue // the maximum observed timestamp
override def getCurrentWatermark: Watermark = {
// generated watermark with 1 min tolerance
new Watermark(maxTs - bound)
// 下面這個就變成依據process time來建立WM了
new Watermark(System.currentTimeMillis() - bound)
}
override def extractTimestamp(r: SensorReading, previousTS: Long): Long = {
// update maximum timestamp
maxTs = maxTs.max(r.timestamp)
// return record timestamp
r.timestamp
}
}
// 這個是上面的簡化版
val output = stream.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor[MyEvent](
Time.seconds(60))(_.getCreationTime)
// 如果資料的時間戳不是亂序的,可以用下面方法直接按當前時間作為WM的時間戳
val withTimestampsAndWatermarks = stream
.assignAscendingTimestamps(_.getCreationTime)
ASSIGNER WITH PUNCTUATED WATERMARKS
// 這個就不需要setAutoWatermarkInterval了,PunctuatedAssigner 對每個資料都先呼叫extractTimestamp 後呼叫 checkAndGetNextWatermark 來判斷是否產生WM
class PunctuatedAssigner
extends AssignerWithPunctuatedWatermarks[SensorReading] {
val bound: Long = 60 * 1000 // 1 min in ms
// 在extractTimestamp之後呼叫
override def checkAndGetNextWatermark(
r: SensorReading,
extractedTS: Long): Watermark = {
if (r.id == "sensor_1") {
// emit watermark if reading is from sensor_1
new Watermark(extractedTS - bound)
} else {
// do not emit a watermark
null
}
}
override def extractTimestamp(
r: SensorReading,
previousTS: Long): Long = {
// assign record timestamp
r.timestamp
}
}
Process Functions
如果需要access keyed state and timers 就用PF(v1.4之前有問題)
PF都是實現了RichFunction介面,所以都有開關方法。另外兩個重要方法(他們同步執行,防止併發訪問和修改state)是:
- processElement(v: IN, ctx: Context, out: Collector[OUT]):對每條資料呼叫。它可以通過Context來訪問資料的timestamp和任務的TimerService,而且還可以釋放記錄到其他出口。
- onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[OUT]):一個回撥方法,當之前登記的timer被觸發時呼叫。OnTimerContext和上面的Context類似,它還返回trigger的time domain(processing or event time)
The TimerService and Timers
上面兩個的TimerService(即context)有4個方法:
currentProcessingTime(): Long
returns the current processing time.currentWatermark(): Long
returns the timestamp of the current watermark.registerProcessingTimeTimer(timestamp: Long): Unit
registers a processing time timer. The timer will fire when the processing time of the executing machine reaches the provided timestamp.registerEventTimeTimer(timestamp: Long)
上面的register只針對key streams。To use timers on a non-keyed stream, you can create a keyed stream by using a
KeySelector
with a constant dummy key. Note that this will move all data to a single task such that the operator would be effectively executed with a parallelism of 1.
對每個key可以登記多個timer,timestamp只有一個。登記後不能刪除(1.6後可以)。PF內有所有timer的時間,存放在堆的權重佇列,並將它們作為持久化的function state。timer通常用來根據一段時間後清除key state或者實現自定義time-based windowing邏輯。每個key與timestamp組合的timer只能有一個,比如key1 1:00:00如果重複了,只會執行這種timer一次。這裡涉及timer的登記技巧,將timer的時間round為秒,這樣就不會因資料過多時,每秒產生多個timer。event-time就round為當前WM+1
// 登記
// coalescing到1秒,避免timer頻繁登記
val coalescedTime = ((ctx.timestamp + timeout) / 1000) * 1000
// 針對WM的coalescing
val coalescedTime = ctx.timerService.currentWatermark + 1
ctx.timerService.registerProcessingTimeTimer(coalescedTime)
// 刪除
ctx.timerService.deleteProcessingTimeTimer(timestampOfTimerToStop)
ctx.timerService.deleteEventTimeTimer(timestampOfTimerToStop)
Timers會被checkpoint(1.5同步,1.6非同步checkpoint,除非涉及RocksDB backend / with incremental snapshots / with heap-based timers,最後一個1.7解決),當從失敗重啟,過其的timer會被馬上觸發。
val warnings = readings
.keyBy(_.id)
// apply ProcessFunction to monitor temperatures
.process(new TempIncreaseAlertFunction)
// =================== //
/** Emits a warning if the temperature of a sensor
* monotonically increases for 5 second (in processing time).
*/
class TempIncreaseAlertFunction
extends KeyedProcessFunction[String, SensorReading, String] {
// hold temperature of last sensor reading
lazy val lastTemp: ValueState[Double] = getRuntimeContext.getState(
new ValueStateDescriptor[Double]("lastTemp", Types.of[Double]))
// hold timestamp of currently active timer
lazy val currentTimer: ValueState[Long] = getRuntimeContext.getState(
new ValueStateDescriptor[Long]("timer", Types.of[Long]))
override def processElement(
r: SensorReading,
ctx: KeyedProcessFunction[String, SensorReading, String]#Context,
out: Collector[String]): Unit = {
// get previous temperature
val prevTemp = lastTemp.value()
// update last temperature
lastTemp.update(r.temperature)
// 假設溫度都在0度以上。prevTemp == 0.0 為了處理prevTemp初始化時為0的問題,避免一開始就判斷遞增。
if (prevTemp == 0.0 || r.temperature < prevTemp) {
// temperature decreased. Invalidate current timer
currentTimer.update(0L)
}
else if (r.temperature > prevTemp && currentTimer.value() == 0) {
// temperature increased and we have not set a timer yet.
// set processing time timer for now + 1 second
val timerTs = ctx.timerService().currentProcessingTime() + 5000
ctx.timerService().registerProcessingTimeTimer(timerTs)
// remember current timer
currentTimer.update(timerTs)
}
// 如果前後溫度相等、遞增但是已經登記了一個timer(即currentTimer.value() != 0)就什麼都不幹,避免過多timer,等timer的processtime過了1s後呼叫下面的onTimer
}
override def onTimer(
ts: Long, // timer的timestamp
ctx: KeyedProcessFunction[String, SensorReading, String]#OnTimerContext,
out: Collector[String]): Unit = {
// check if firing timer is current timer
if (ts == currentTimer.value()) {
out.collect("Temperature of sensor '" + ctx.getCurrentKey +
"' monotonically increased for 1 second.")
// reset current timer
currentTimer.update(0)
}
}
}
// 另一個例子。
// 計算每個key的資料的數量,如果一分鐘(event time)內不再更新資料,則釋放key-count對
val result: DataStream[Tuple2[String, Long]] = stream
.keyBy(0)
.process(new CountWithTimeoutFunction())
/**
* The data type stored in the state
*/
case class CountWithTimestamp(key: String, count: Long, lastModified: Long)
/**
* The implementation of the ProcessFunction that maintains the count and timeouts
*/
class CountWithTimeoutFunction extends ProcessFunction[(String, String), (String, Long)] {
/** The state that is maintained by this process function */
lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext
.getState(new ValueStateDescriptor[CountWithTimestamp]("myState", classOf[CountWithTimestamp]))
override def processElement(value: (String, String), ctx: Context, out: Collector[(String, Long)]): Unit = {
// initialize or retrieve/update the state
val current: CountWithTimestamp = state.value match {
case null =>
CountWithTimestamp(value._1, 1, ctx.timestamp)
case CountWithTimestamp(key, count, lastModified) =>
CountWithTimestamp(key, count + 1, ctx.timestamp)
}
// write the state back
state.update(current)
// schedule the next timer 60 seconds from the current event time
ctx.timerService.registerEventTimeTimer(current.lastModified + 60000)
}
override def onTimer(timestamp: Long, ctx: OnTimerContext, out: Collector[(String, Long)]): Unit = {
state.value match {
case CountWithTimestamp(key, count, lastModified) if (timestamp == lastModified + 60000) =>
out.collect((key, count))
case _ =>
}
}
}
Emitting to Side Outputs
side output多個流,且可以不同型別,由OutputTag [X]物件標識。
能夠使用該功能的函式有:ProcessFunction、CoProcessFunction、ProcessWindowFunction、ProcessAllWindowFunction
// define a side output tag
val freezingAlarmOutput: OutputTag[String] =
new OutputTag[String]("freezing-alarms")
// =================== //
val monitoredReadings: DataStream[SensorReading] = readings
// monitor stream for readings with freezing temperatures
.process(new FreezingMonitor)
// retrieve and print the freezing alarms
monitoredReadings
.getSideOutput(freezingAlarmOutput)
.print()
// print the main output
readings.print()
// =================== //
class FreezingMonitor extends ProcessFunction[SensorReading, SensorReading] {
override def processElement(
r: SensorReading,
ctx: ProcessFunction[SensorReading, SensorReading]#Context,
out: Collector[SensorReading]): Unit = {
// emit freezing alarm if temperature is below 32F.
if (r.temperature < 32.0) {
ctx.output(freezingAlarmOutput, s"Freezing Alarm for ${r.id}")
}
// forward all readings to the regular output
out.collect(r)
}
}
The CoProcessFunction
// 下面程式碼的結果是,只有sensor_2和sensor_7的資訊輸出,而且輸出時間只持續10s和1m
// ingest sensor stream
val sensorData: DataStream[SensorReading] = ...
// filter switches enable forwarding of readings
val filterSwitches: DataStream[(String, Long)] = env
.fromCollection(Seq(
("sensor_2", 10 * 1000L), // forward sensor_2 for 10 seconds
("sensor_7", 60 * 1000L)) // forward sensor_7 for 1 minute
)
val forwardedReadings = readings
.connect(filterSwitches)
.keyBy(_.id, _._1) // 將兩個stream的資訊聯絡起來了
.process(new ReadingFilter)
// =============== //
class ReadingFilter
extends CoProcessFunction[SensorReading, (String, Long), SensorReading] {
// switch to enable forwarding
lazy val forwardingEnabled: ValueState[Boolean] =
getRuntimeContext.getState(
new ValueStateDescriptor[Boolean]("filterSwitch", Types.of[Boolean])
)
// hold timestamp of currently active disable timer
lazy val disableTimer: ValueState[Long] =
getRuntimeContext.getState(
new ValueStateDescriptor[Long]("timer", Types.of[Long])
)
override def processElement1(
reading: SensorReading,
ctx: CoProcessFunction[SensorReading, (String, Long), SensorReading]#Context,
out: Collector[SensorReading]): Unit = {
// check if we may forward the reading
if (forwardingEnabled.value()) {
out.collect(reading)
}
}
override def processElement2(
switch: (String, Long),
ctx: CoProcessFunction[SensorReading, (String, Long), SensorReading]#Context,
out: Collector[SensorReading]): Unit = {
// enable reading forwarding
forwardingEnabled.update(true)
// set disable forward timer
val timerTimestamp = ctx.timerService().currentProcessingTime() + switch._2
ctx.timerService().registerProcessingTimeTimer(timerTimestamp)
disableTimer.update(timerTimestamp)
}
override def onTimer(
ts: Long,
ctx: CoProcessFunction[SensorReading, (String, Long), SensorReading]#OnTimerContext,
out: Collector[SensorReading]): Unit = {
if (ts == disableTimer.value()) {
// remove all state. Forward switch will be false by default.
forwardingEnabled.clear()
disableTimer.clear()
}
}
}
與join結合
For example, you might be joining customer data to financial trades, while keeping state for the customer data. If you care about having complete and deterministic joins in the face of out-of-order events, you can use a timer to evaluate and emit the join for a trade when the watermark for the customer data stream has passed the time of that trade.
Window Operators
Defining Window Operators
keyed window可以並行,non-keyed window只能單執行緒。實現window operator需要下面兩個元件:
- Window Assigner:決定資料如何被分組。它會產生WindowStream(keyed)或AllWindowedStream(non-keyed)
- Window Function:應用到上面兩個產物,並處理被分配到window的資料
Built-in Window Assigners
根據資料的event-time或者processing time來將資料分配到對應的window。當第一個資料分到window時,window才會被建立。window有開始和結束(不包含)timestamp。
Count-based Windows:結果是non-deterministic的。另外,如果沒有自定義Trigger來廢棄incomplete and stale的windows,會出問題。
// tumbling
val avgTemp = sensorData
.keyBy(_.id)
// group readings in 1s event-time windows
// start at 00:15:00, 01:15:00, 02:15:00, etc
// 中國調整時區Time.hours(-8)
.window(TumblingEventTimeWindows.of(Time.hours(1), Time.minutes(15))) // TumblingProcessingTimeWindows也可
.process(new TemperatureAverager)
// sliding 如果slide大於window interval,部分資料會被drop
// create 1h event-time windows every 15 minutes
.window(SlidingEventTimeWindows.of(Time.hours(1), Time.minutes(15)))
// session
// assigner 開始時將所有資料對映到自己的window,以資料的timestamp為開始視窗,session gap作為window大小,最後再融合所有重疊的視窗。也有動態gap可選,就是自定義邏輯來判斷gap長度
.window(EventTimeSessionWindows.withGap(Time.minutes(15)))
Applying Functions on Windows
Incremental Aggregation Functions只保留一個值, ReduceFunction
and AggregateFunction
實際上這類函式會馬上計算並把計算結果儲存到window,trigger只是出發結果的傳送。當這類函式需要window資訊時,可以與下面函式組合,詳細看下面第4個例子。
Full Window Functions保留所有資料,當計算時,遍歷所有資料,ProcessWindowFunction
ReduceFunction:輸入和輸出型別相同
val minTempPerWindow: DataStream[(String, Double)] = sensorData
.map(r => (r.id, r.temperature))
.keyBy(_._1)
.timeWindow(Time.seconds(15))
.reduce((r1, r2) => (r1._1, r1._2.min(r2._2)))
AggregateFunction :比上面靈活,類似spark的aggregator
val avgTempPerWindow: DataStream[(String, Double)] = sensorData
.map(r => (r.id, r.temperature))
.keyBy(_._1)
.timeWindow(Time.seconds(15))
.aggregate(new AvgTempFunction)
// ========= //
class AvgTempFunction
extends AggregateFunction[(String, Double), (String, Double, Int), (String, Double)] {
override def createAccumulator() = {
("", 0.0, 0)
}
override def add(in: (String, Double), acc: (String, Double, Int)) = {
(in._1, in._2 + acc._2, 1 + acc._3)
}
override def merge(acc1: (String, Double, Int), acc2: (String, Double, Int)) = {
(acc1._1, acc1._2 + acc2._2, acc1._3 + acc2._3)
}
override def getResult(acc: (String, Double, Int)) = {
(acc._1, acc._2 / acc._3)
}
}
ProcessWindowFunction
public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window>
extends AbstractRichFunction {
// Evaluates the window
void process(KEY key, Context ctx, Iterable<IN> vals, Collector<OUT> out) throws Exception;
// Deletes 所有 custom per-window state when the window is purged. 如果要使用下面的windowState就要實現這個方法
public void clear(Context ctx) throws Exception {}
// The context holding window metadata.
public abstract class Context implements Serializable {
// Returns the metadata of the window
public abstract W window();
// Returns the current processing time.
public abstract long currentProcessingTime();
// Returns the current event-time watermark.
public abstract long currentWatermark();
// State accessor for per-window state.
public abstract KeyedStateStore windowState();
// State accessor for per-key global state.
public abstract KeyedStateStore globalState();
// Emits a record to the side output identified by the OutputTag.
public abstract <X> void output(OutputTag<X> outputTag, X value);
}
}
// 例子
// output the lowest and highest temperature reading every 5 seconds
val minMaxTempPerWindow: DataStream[MinMaxTemp] = sensorData
.keyBy(_.id)
.timeWindow(Time.seconds(5))
.process(new HighAndLowTempProcessFunction)
// ========= //
case class MinMaxTemp(id: String, min: Double, max:Double, endTs: Long)
class HighAndLowTempProcessFunction
extends ProcessWindowFunction[SensorReading, MinMaxTemp, String, TimeWindow] {
override def process(
key: String,
ctx: Context,
vals: Iterable[SensorReading],
out: Collector[MinMaxTemp]): Unit = {
val temps = vals.map(_.temperature)
val windowEnd = ctx.window.getEnd
out.collect(MinMaxTemp(key, temps.min, temps.max, windowEnd))
}
}
Incremental Aggregation and ProcessWindowFunction
當使用incremental aggregation時需要用到window元資料或者state時,可以如下使用
input
.keyBy(...)
.timeWindow(...)
.reduce(incrAggregator: ReduceFunction[IN],
function: ProcessWindowFunction[IN, OUT, K, W])
input
.keyBy(...)
.timeWindow(...)
.aggregate(incrAggregator: AggregateFunction[IN, ACC, V],
windowFunction: ProcessWindowFunction[V, OUT, K, W])
// 下面程式碼實現了和上面例子一樣的功能,但不需要儲存5秒內的所有視窗資料
val minMaxTempPerWindow2: DataStream[MinMaxTemp] = sensorData
.map(r => (r.id, r.temperature, r.temperature))
.keyBy(_._1)
.timeWindow(Time.seconds(5))
.reduce(
// incrementally compute min and max temperature
(r1: (String, Double, Double), r2: (String, Double, Double)) => {
(r1._1, r1._2.min(r2._2), r1._3.max(r2._3))
},
// finalize result in ProcessWindowFunction
new AssignWindowEndProcessFunction()
)
// ========= //
case class MinMaxTemp(id: String, min: Double, max:Double, endTs: Long)
class AssignWindowEndProcessFunction
extends ProcessWindowFunction[(String, Double, Double), MinMaxTemp, String, TimeWindow] {
override def process(
key: String,
ctx: Context,
minMaxIt: Iterable[(String, Double, Double)],
out: Collector[MinMaxTemp]): Unit = {
val minMax = minMaxIt.head
val windowEnd = ctx.window.getEnd
out.collect(MinMaxTemp(key, minMax._2, minMax._3, windowEnd))
}
}
Consecutive windowed operations
來自第一次操作的時間視窗[0,5]的結果也將在隨後的視窗化操作中以時間視窗[0,5]結束。 這允許計算每個key window的和,然後在第二個操作中計算同一視窗內的前k個元素。
val input: DataStream[Int] = ...
val resultsPerKey = input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.reduce(new Summer())
val globalResults = resultsPerKey
.windowAll(TumblingEventTimeWindows.of(Time.seconds(5)))
.process(new TopKWindowFunction())
3> (1,2,2)
3> (1,7,2)
3> (1,12,2)
4> (2,3,1)
4> (2,5,1)
Customizing Window Operators
A window operator with an incremental aggregation function(IAF):資料 => WindowAssigner => IAF => 所屬window => 結果
A window operator with a full window function(FWF):資料 => WindowAssigner => 所屬window => FWF => 結果
A window operator with IAF and FWF:資料 => WindowAssigner => IAF => 所屬window => FWF => 結果
trigger 是window的元件,定義什麼時候計算window和傳送結果。每當資料進入window都會被傳到trigger。所以可以根據被分配的elements或者登記的timer來觸發對window資料的計算。
Evictor是FWF的可選元件,用了IAF就不能用。在函式的呼叫前、後呼叫,可以遍歷window中的元素。通常用在GlobalWindow。更多看官網
上圖的一切都可能是在一個大window裡,即tumbling window之類的。
// define a keyed window operator,non-keyed去掉keyBy即可
stream
.keyBy(...) // versus non-keyed windows
.window(...) // "assigner"
[.trigger(...)] // "trigger" (覆蓋 default trigger)
[.evictor(...)] // "evictor" (else no evictor)
[.allowedLateness(...)] // "lateness" (else zero)
[.sideOutputLateData(...)] // "output tag" (else no side output for late data)
.reduce/aggregate/fold/apply() // required: "function"
[.getSideOutput(...)] // optional: "output tag"
Window Lifecycle
A window is created when a WindowAssigner
assigns the first element to it. window包含以下內容:
- content存IAF的結果或者FWF的待計算資料
- object(即window本身):WindowAssigner 會返回零、一個、多個objects。window operator根據這些objects來劃分資料,因為這些object區分了不同的window,且記錄了window何時可以被刪除
- trigger的timer:用來在達到timer時點時回撥計算window或者清空window content。timer由window operator維護
- 自定義trigger的state:完全由trigger控制,與window operator無關。當window被刪除時,trigger記得要呼叫.clear()來清除這部分state。
window operator在window的end time(event還是process time取決於WindowAssigner.isEventTime)時刪除window,自動刪除window content和所有登記在該window的timer。
Window Assigners
// a custom assigner for 30 seconds tumbling event-time window
class ThirtySecondsWindows
extends WindowAssigner[Object, TimeWindow] {
val windowSize: Long = 30 * 1000L
override def assignWindows(
o: Object,
ts: Long,
ctx: WindowAssigner.WindowAssignerContext): java.util.List[TimeWindow] = {
// rounding down by 30 seconds
val startTime = ts - (ts % windowSize)
val endTime = startTime + windowSize
// emitting the corresponding time window 每30秒一個新window
Collections.singletonList(new TimeWindow(startTime, endTime))
}
override def getDefaultTrigger(
env: environment.StreamExecutionEnvironment): Trigger[Object, TimeWindow] = {
EventTimeTrigger.create()
}
override def getWindowSerializer(
executionConfig: ExecutionConfig): TypeSerializer[TimeWindow] = {
new TimeWindow.Serializer
}
override def isEventTime = true
}
還有MergingWindowAssigner
,之前提到的sessionwindow就用到這個介面,下面有更詳細的說明。
When merging windows, you need to ensure that the state of all merging windows and their triggers is also approriately merged. The Trigger
interface features a callback method that is invoked when windows are merged to merge state that is associated to the windows.
Global Windows:將所有elements都放到同一個全域性window,且NeverTrigger。這種window需要自定義trigger或者加上evictor來選擇除去window state中的什麼element。另外,如果運用到keyedStream,還要注意window會為每個key留下state。
Trigger
Triggers define when a window is evaluated and its results are emitted. A trigger can decide to fire based on progress in time or data-specific conditions. Triggers have access to time properties, timers, and can work with state. 所以它可以根據一些條件,如某個資料的某個特定值、兩個資料同時出現在5秒鐘內、先提前顯示結果等。每當trigger被呼叫,都會產生一個TriggerResult,它的選項有:CONTINUE, FIRE, PURGE, FIRE_AND_PURGE。PURGE指清理window的內容,而不是整個window。
刪除window前要清楚所有的state,之後就不行了。在MergingWindowAssigner
中,要實現canMerge()和onMerge()
/** A trigger that fires early. The trigger fires at most every second. */
class OneSecondIntervalTrigger
extends Trigger[SensorReading, TimeWindow] {
override def onElement(
r: SensorReading,
timestamp: Long,
window: TimeWindow,
ctx: Trigger.TriggerContext): TriggerResult = {
// firstSeen will be false if not set yet. 這個PartitionedState是對應key和window的state
val firstSeen: ValueState[Boolean] = ctx.getPartitionedState(
new ValueStateDescriptor[Boolean]("firstSeen", createTypeInformation[Boolean]))
// register initial timer only for first element
if (!firstSeen.value()) {
// compute time for next early firing by rounding watermark to second
val t = ctx.getCurrentWatermark + (1000 - (ctx.getCurrentWatermark % 1000))
ctx.registerEventTimeTimer(t)
// register timer for the window end
ctx.registerEventTimeTimer(window.getEnd)
firstSeen.update(true)
}
// Continue. Do not evaluate per element
TriggerResult.CONTINUE
}
override def onEventTime(
timestamp: Long,
window: TimeWindow,
ctx: Trigger.TriggerContext): TriggerResult = {
if (timestamp == window.getEnd) {
// 由於覆蓋了預設,所以這裡要手動清空
TriggerResult.FIRE_AND_PURGE
} else {
// register next early firing timer
val t = ctx.getCurrentWatermark + (1000 - (ctx.getCurrentWatermark % 1000))
if (t < window.getEnd) {
ctx.registerEventTimeTimer(t)
}
// fire trigger to evaluate window
TriggerResult.FIRE
}
}
override def onProcessingTime(
timestamp: Long,
window: TimeWindow,
ctx: Trigger.TriggerContext): TriggerResult = {
// Continue. We don't use processing time timers
TriggerResult.CONTINUE
}
override def clear(
window: TimeWindow,
ctx: Trigger.TriggerContext): Unit = {
// clear trigger state. This method is called when a window is purged
val firstSeen: ValueState[Boolean] = ctx.getPartitionedState(
new ValueStateDescriptor[Boolean]("firstSeen", createTypeInformation[Boolean]))
firstSeen.clear()
}
}
Joining Streams on Time(v1.5)
input1.join(input2)
.where(...) // specify key attributes for input1
.equalTo(...) // specify key attributes for input2
.window(...) // specify the WindowAssigner
[.trigger(...)] // optional: specify a Trigger
[.evictor(...)] // optional: specify an Evictor
.apply(...) // specify the JoinFunction
當Trigger.FIRE,JoinFunction就會被呼叫。JF會對每一對events進行join,而CoGroupFunction會針對整個window,用迭代器來完成join。
those elements that do get joined will have as their timestamp the largest timestamp that still lies in the respective window. For example a window with [5, 10) as its boundaries would result in the joined elements having 9 as their timestamp.
Sliding Join
Interval Join
join之後的時間戳為兩個element中最大的那個
在tumbling中用join,由於window是一刀切的,即使定義了join條件為相隔1s,有時兩個events還是會被分到不同window而不能實現join。但可以嘗試用CoProcessFunction來實現自定義join邏輯。
Handling Late Data
三種處理方式:drop(event-time window operator的預設實現,如果是非window operator,就用process function來判斷是否超時,然後作出相應的處理)、傳送到其他流、更新原結果
// redirect:在process前插入一個.sideOutputLateData(OutputTag),然後就可以對結果流呼叫.getSideOutput(OutputTag)
// define an output tag for late sensor readings
val lateReadingsOutput: OutputTag[SensorReading] =
new OutputTag[SensorReading]("late-readings")
val readings: DataStream[SensorReading] = ???
val countPer10Secs: DataStream[(String, Long, Int)] = readings
.keyBy(_.id)
.timeWindow(Time.seconds(10))
// emit late readings to a side output
.sideOutputLateData(lateReadingsOutput)
// count readings per window
.process(new CountFunction())
// retrieve the late events from the side output as a stream
val lateStream: DataStream[SensorReading] = countPer10Secs
.getSideOutput(lateReadingsOutput)
// 也可以直接在Process Function中處理
class LateReadingsFilter
extends ProcessFunction[SensorReading, SensorReading] {
override def processElement(
r: SensorReading,
ctx: ProcessFunction[SensorReading, SensorReading]#Context,
out: Collector[SensorReading]): Unit = {
// compare record timestamp with current watermark
if (r.timestamp < ctx.timerService().currentWatermark()) {
// this is a late reading => redirect it to the side output
ctx.output(lateReadingsOutput, r)
} else {
out.collect(r)
}
}
}
Updating Results by Including Late Events
allowedLateness,並在接下來的ProcessWindowFunction中判斷是否為第一個答案(用window的state來儲存flag)
下面同樣可以通過Process Function實現
val readings: DataStream[SensorReading] = ???
val countPer10Secs: DataStream[(String, Long, Int, String)] = readings
.keyBy(_.id)
.timeWindow(Time.seconds(10))
// process late readings for 5 additional seconds 這樣window不會被刪除,其state會被保留。與延長watermark相比,這個會發送多個結果,因window到時間時已近fire了,但late資料的到來又會觸發fire。
.allowedLateness(Time.seconds(5))
// count readings and update results if late readings arrive
.process(new UpdatingWindowCountFunction)
// =================== //
/** A counting WindowProcessFunction that distinguishes between
* first results and updates. */
class UpdatingWindowCountFunction
extends ProcessWindowFunction[SensorReading, (String, Long, Int, String), String, TimeWindow] {
override def process(
id: String,
ctx: Context,
elements: Iterable[SensorReading],
out: Collector[(String, Long, Int, String)]): Unit = {
// count the number of readings
val cnt = elements.count(_ => true)
// state to check if this is the first evaluation of the window or not.
val isUpdate = ctx.windowState.getState(
new ValueStateDescriptor[Boolean]("isUpdate", Types.of[Boolean]))
if (!isUpdate.value()) {
// first evaluation, emit first result
out.collect((id, ctx.window.getEnd, cnt, "first"))
isUpdate.update(true)
} else {
// not the first evaluation, emit an update
out.collect((id, ctx.window.getEnd, cnt, "update"))
}
}
}
notice
Flink creates one copy of each element per window to which it belongs. 所以a sliding window of size 1 day and slide 1 second might not be a good idea.
參考
Stream Processing with Apache Flink by Vasiliki Kalavri; Fabian Hueske