徹底搞清Flink中的Window
阿新 • • 發佈:2021-04-02
![flink-window](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/33d42111a18a44f5bb739a5c75fa51ea~tplv-k3u1fbpfcp-zoom-1.image)
# 視窗
在流處理應用中,資料是連續不斷的,因此我們不可能等到所有資料都到了才開始處理。當然我們可以每來一個訊息就處理一次,但是有時我們需要做一些聚合類的處理,例如:在過去的1分鐘內有多少使用者點選了我們的網頁。在這種情況下,我們必須定義一個視窗,用來收集最近一分鐘內的資料,並對這個視窗內的資料進行計算。
Flink 認為 Batch 是 Streaming 的一個特例,所以 Flink 底層引擎是一個流式引擎,在上面實現了流處理和批處理。而視窗(window)就是從 Streaming 到 Batch 的一個橋樑。
- 一個Window代表有限物件的集合。一個視窗有一個最大的時間戳,該時間戳意味著在其代表的某時間點——所有應該進入這個視窗的元素都已經到達
- Window就是用來對一個無限的流設定一個有限的集合,在有界的資料集上進行操作的一種機制。window又可以分為基於時間(Time-based)的window以及基於數量(Count-based)的window。
- Flink DataStream API提供了Time和Count的window,同時增加了基於Session的window。同時,由於某些特殊的需要,DataStream API也提供了定製化的window操作,供使用者自定義window。
## 視窗的組成
### 視窗分配器
- assignWindows將某個帶有時間戳timestamp的元素element分配給一個或多個視窗,並返回視窗集合
- getDefaultTrigger 返回跟WindowAssigner關聯的預設觸發器
- getWindowSerializer返回WindowAssigner分配的視窗的序列化器
- 視窗分配器定義如何將資料元分配給視窗。這是通過WindowAssigner 在window(...)(對於被Keys化流)或windowAll()(對於非被Keys化流)呼叫中指定您的選擇來完成的。
- WindowAssigner負責將每個傳入資料元分配給一個或多個視窗。Flink帶有預定義的視窗分配器,用於最常見的用例
即翻滾視窗, 滑動視窗,會話視窗和全域性視窗。
- 您還可以通過擴充套件WindowAssigner類來實現自定義視窗分配器。
- 所有內建視窗分配器(全域性視窗除外)都根據時間為視窗分配資料元,這可以是處理時間或事件時間。
### State
- 狀態,用來儲存視窗內的元素,如果有 AggregateFunction,則儲存的是增量聚合的中間結果。
### 視窗函式
選擇合適的計算函式,減少開發程式碼量提高系統性能
#### 增量聚合函式(視窗只維護狀態)
- ReduceFunction
- AggregateFunction
- FoldFunction
#### 全量聚合函式(視窗維護視窗內的資料)
- ProcessWindowFunction
- 全量計算
- 支援功能更加靈活
- 支援狀態操作
### 觸發器
![image-20210202200655485](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/160f7ad1ec144709ab27f2e5762e66b9~tplv-k3u1fbpfcp-zoom-1.image)
- EventTimeTrigger基於事件時間的觸發器,對應onEventTime
- ProcessingTimeTrigger
基於當前系統時間的觸發器,對應onProcessingTime
ProcessingTime 有最好的效能和最低的延遲。但在分散式計算環境中ProcessingTime具有不確定性,相同資料流多次執行有可能產生不同的計算結果。
- ContinuousEventTimeTrigger
- ContinuousProcessingTimeTrigger
- CountTrigger
- Trigger確定何時視窗函式準備好處理視窗(由視窗分配器形成)。每個都有預設值。
如果預設觸發器不符合您的需要,您可以使用指定自定義觸發器。WindowAssignerTriggertrigger(...)
- 觸發器介面有五種方法可以Trigger對不同的事件做出反應:
- onElement()為新增到視窗的每個資料元呼叫該方法。
- onEventTime()在註冊的事件時間計時器觸發時呼叫該方法。
- onProcessingTime()在註冊的處理時間計時器觸發時呼叫該方法。
- 該onMerge()方法與狀態觸發器相關,並且當它們的相應視窗合併時合併兩個觸發器的狀態,例如當使用會話視窗時。
- 最後,該clear()方法在移除相應視窗時執行所需的任何動作。
- 預設觸發器
- 預設觸發器GlobalWindow是NeverTrigger從不觸發的。因此,在使用時必須定義自定義觸發器GlobalWindow。
- 通過使用trigger()您指定觸發器會覆蓋a的預設觸發器WindowAssigner。例如,如果指定a CountTrigger,TumblingEventTimeWindows則不再根據時間進度獲取視窗,
而是僅按計數。現在,如果你想根據時間和數量做出反應,你必須編寫自己的自定義觸發器。
- event-time視窗分配器都有一個EventTimeTrigger作為預設觸發器。該觸發器在watermark通過視窗末尾時出發。
#### 觸發器分類
##### CountTrigger
一旦視窗中的資料元數量超過給定限制,就會觸發。所以其觸發機制實現在onElement中
##### ProcessingTimeTrigger
基於處理時間的觸發。
##### EventTimeTrigger
根據 watermarks 度量的事件時間進度進行觸發。
##### PurgingTrigger
- 另一個觸發器作為引數作為引數並將其轉換為清除觸發器。
- 其作用是在 Trigger 觸發視窗計算之後將視窗的 State 中的資料清除。
- ![image-20210202200710573](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/9d75e157858649a9aeb5988e730c3b3a~tplv-k3u1fbpfcp-zoom-1.image)前兩條資料先後於20:01和20:02進入視窗,此時 State 中的值更新為3,同時到了Trigger的觸發時間,輸出結果為3。
![image-20210202200733128](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/83d962a7b9184ce1949022cd6515893c~tplv-k3u1fbpfcp-zoom-1.image)
- 由於 PurgingTrigger 的作用,State 中的資料會被清除。
![image-20210202200744793](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/dd3ba120aac44a899846678ce6b0b0bf~tplv-k3u1fbpfcp-zoom-1.image)
##### DeltaTrigger
###### DeltaTrigger 的應用
- 有這樣一個車輛區間測試的需求,車輛每分鐘上報當前位置與車速,每行進10公里,計算區間內最高車速。
![image-20210202200802480](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/7ddd3b605bae42c5bff68601d91eeab4~tplv-k3u1fbpfcp-zoom-1.image)
#### 觸發器原型
- onElement
- onProcessingTime
- onEventTime
- onMerge
- clear
#### 說明
- TriggerResult可以是以下之一
- CONTINUE 什麼都不做
- FIRE_AND_PURGE 觸發計算,然後清除視窗中的元素
- FIRE 觸發計算 預設情況下,內建的觸發器只返回 FIRE,不會清除視窗狀態。
- PURGE 清除視窗中的元素
- 所有的事件時間視窗分配器都有一個 EventTimeTrigger 作為預設觸發器。一旦 watermark 到達視窗末尾,這個觸發器就會被觸發。
- 全域性視窗(GlobalWindow)的預設觸發器是永不會被觸發的 NeverTrigger。因此,在使用全域性視窗時,必須自定義一個觸發器。
- 通過使用 trigger() 方法指定觸發器,將會覆蓋視窗分配器的預設觸發器。例如,如果你為 TumblingEventTimeWindows 指定 CountTrigger,
那麼不會再根據時間進度觸發視窗,而只能通過計數。目前為止,如果你希望基於時間以及計數進行觸發,則必須編寫自己的自定義觸發器。
## 視窗的分類
- 根據視窗是否呼叫keyBy運算元key化,分為被Keys化Windows和非被Keys化Windows;
![flink window圖解](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/c010b3c1135d4b9882c8b8c1871807e3~tplv-k3u1fbpfcp-zoom-1.image)
- 根據視窗的驅動方式,分為時間驅動(Time Window)、資料驅動(Count Window);
- 根據視窗的元素分配方式,分為滾動視窗(tumbling windows)、滑動視窗(sliding windows)、會話視窗(session windows)以及全域性視窗(global windows)
### 被Keys化Windows
可以理解為按照原始資料流中的某個key進行分類,擁有同一個key值的資料流將為進入同一個window,多個視窗並行的邏輯流
```java
stream
.keyBy(...) <- keyed versus non-keyed windows
.window(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/fold/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
```
### 非被Keys化Windows
- 不做分類,每進入一條資料即增加一個視窗,多個視窗並行,每個視窗處理1條資料
- WindowAll 將元素按照某種特性聚集在一起,該函式不支援並行操作,預設的並行度就是1,所以如果使用這個運算元的話需要注意一下效能問題
```text
stream
.windowAll(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/fold/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"
```
#### 區別
- 對於被Key化的資料流,可以將傳入事件的任何屬性用作鍵(此處有更多詳細資訊)。
- 擁有被Key化的資料流將允許您的視窗計算由多個任務並行執行,因為每個邏輯被Key化的資料流可以獨立於其餘任務進行處理。
引用相同Keys的所有資料元將被髮送到同一個並行任務。
### Time-Based window(基於時間的視窗)
每一條記錄來了以後會根據時間屬性值採用不同的window assinger 方法分配給一個或者多個視窗,分為滾動視窗(Tumbling windows)和滑動視窗(Sliding windows)。
- EventTime 資料本身攜帶的時間,預設的時間屬性;
- ProcessingTime 處理時間;
- IngestionTime 資料進入flink程式的時間;
#### Tumbling windows(滾動視窗)
滾動視窗下視窗之間不重疊,且視窗長度是固定的。我們可以用TumblingEventTimeWindows和TumblingProcessingTimeWindows建立一個基於Event Time或Processing Time的滾動時間視窗。
![tumb-window](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/10625231124848efaabe17095880e65f~tplv-k3u1fbpfcp-zoom-1.image)
下面示例以滾動時間視窗(`TumblingEventTimeWindows`)為例,預設模式是`TimeCharacteristic.ProcessingTime`處理時間
```java
/** The time characteristic that is used if none other is set. */
private static final TimeCharacteristic DEFAULT_TIME_CHARACTERISTIC = TimeCharacteristic.ProcessingTime;
```
所以如果使用`Event Time`即資料的實際產生時間,需要通過`senv.setStreamTimeCharacteristic`指定
```java
// 指定使用資料的實際時間
senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
Da