1. 程式人生 > >徹底搞清Flink中的Window

徹底搞清Flink中的Window

![flink-window](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/33d42111a18a44f5bb739a5c75fa51ea~tplv-k3u1fbpfcp-zoom-1.image) # 視窗 在流處理應用中,資料是連續不斷的,因此我們不可能等到所有資料都到了才開始處理。當然我們可以每來一個訊息就處理一次,但是有時我們需要做一些聚合類的處理,例如:在過去的1分鐘內有多少使用者點選了我們的網頁。在這種情況下,我們必須定義一個視窗,用來收集最近一分鐘內的資料,並對這個視窗內的資料進行計算。 Flink 認為 Batch 是 Streaming 的一個特例,所以 Flink 底層引擎是一個流式引擎,在上面實現了流處理和批處理。而視窗(window)就是從 Streaming 到 Batch 的一個橋樑。 - 一個Window代表有限物件的集合。一個視窗有一個最大的時間戳,該時間戳意味著在其代表的某時間點——所有應該進入這個視窗的元素都已經到達 - Window就是用來對一個無限的流設定一個有限的集合,在有界的資料集上進行操作的一種機制。window又可以分為基於時間(Time-based)的window以及基於數量(Count-based)的window。 - Flink DataStream API提供了Time和Count的window,同時增加了基於Session的window。同時,由於某些特殊的需要,DataStream API也提供了定製化的window操作,供使用者自定義window。 ## 視窗的組成 ### 視窗分配器 - assignWindows將某個帶有時間戳timestamp的元素element分配給一個或多個視窗,並返回視窗集合 - getDefaultTrigger 返回跟WindowAssigner關聯的預設觸發器 - getWindowSerializer返回WindowAssigner分配的視窗的序列化器 - 視窗分配器定義如何將資料元分配給視窗。這是通過WindowAssigner 在window(...)(對於被Keys化流)或windowAll()(對於非被Keys化流)呼叫中指定您的選擇來完成的。 - WindowAssigner負責將每個傳入資料元分配給一個或多個視窗。Flink帶有預定義的視窗分配器,用於最常見的用例 即翻滾視窗, 滑動視窗,會話視窗和全域性視窗。 - 您還可以通過擴充套件WindowAssigner類來實現自定義視窗分配器。 - 所有內建視窗分配器(全域性視窗除外)都根據時間為視窗分配資料元,這可以是處理時間或事件時間。 ### State - 狀態,用來儲存視窗內的元素,如果有 AggregateFunction,則儲存的是增量聚合的中間結果。 ### 視窗函式 選擇合適的計算函式,減少開發程式碼量提高系統性能 #### 增量聚合函式(視窗只維護狀態) - ReduceFunction - AggregateFunction - FoldFunction #### 全量聚合函式(視窗維護視窗內的資料) - ProcessWindowFunction - 全量計算 - 支援功能更加靈活 - 支援狀態操作 ### 觸發器 ![image-20210202200655485](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/160f7ad1ec144709ab27f2e5762e66b9~tplv-k3u1fbpfcp-zoom-1.image) - EventTimeTrigger基於事件時間的觸發器,對應onEventTime - ProcessingTimeTrigger 基於當前系統時間的觸發器,對應onProcessingTime ProcessingTime 有最好的效能和最低的延遲。但在分散式計算環境中ProcessingTime具有不確定性,相同資料流多次執行有可能產生不同的計算結果。 - ContinuousEventTimeTrigger - ContinuousProcessingTimeTrigger - CountTrigger - Trigger確定何時視窗函式準備好處理視窗(由視窗分配器形成)。每個都有預設值。 如果預設觸發器不符合您的需要,您可以使用指定自定義觸發器。WindowAssignerTriggertrigger(...) - 觸發器介面有五種方法可以Trigger對不同的事件做出反應: - onElement()為新增到視窗的每個資料元呼叫該方法。 - onEventTime()在註冊的事件時間計時器觸發時呼叫該方法。 - onProcessingTime()在註冊的處理時間計時器觸發時呼叫該方法。 - 該onMerge()方法與狀態觸發器相關,並且當它們的相應視窗合併時合併兩個觸發器的狀態,例如當使用會話視窗時。 - 最後,該clear()方法在移除相應視窗時執行所需的任何動作。 - 預設觸發器 - 預設觸發器GlobalWindow是NeverTrigger從不觸發的。因此,在使用時必須定義自定義觸發器GlobalWindow。 - 通過使用trigger()您指定觸發器會覆蓋a的預設觸發器WindowAssigner。例如,如果指定a CountTrigger,TumblingEventTimeWindows則不再根據時間進度獲取視窗, 而是僅按計數。現在,如果你想根據時間和數量做出反應,你必須編寫自己的自定義觸發器。 - event-time視窗分配器都有一個EventTimeTrigger作為預設觸發器。該觸發器在watermark通過視窗末尾時出發。 #### 觸發器分類 ##### CountTrigger 一旦視窗中的資料元數量超過給定限制,就會觸發。所以其觸發機制實現在onElement中 ##### ProcessingTimeTrigger 基於處理時間的觸發。 ##### EventTimeTrigger 根據 watermarks 度量的事件時間進度進行觸發。 ##### PurgingTrigger - 另一個觸發器作為引數作為引數並將其轉換為清除觸發器。 - 其作用是在 Trigger 觸發視窗計算之後將視窗的 State 中的資料清除。 - ![image-20210202200710573](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/9d75e157858649a9aeb5988e730c3b3a~tplv-k3u1fbpfcp-zoom-1.image)前兩條資料先後於20:01和20:02進入視窗,此時 State 中的值更新為3,同時到了Trigger的觸發時間,輸出結果為3。 ![image-20210202200733128](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/83d962a7b9184ce1949022cd6515893c~tplv-k3u1fbpfcp-zoom-1.image) - 由於 PurgingTrigger 的作用,State 中的資料會被清除。 ![image-20210202200744793](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/dd3ba120aac44a899846678ce6b0b0bf~tplv-k3u1fbpfcp-zoom-1.image) ##### DeltaTrigger ###### DeltaTrigger 的應用 - 有這樣一個車輛區間測試的需求,車輛每分鐘上報當前位置與車速,每行進10公里,計算區間內最高車速。 ![image-20210202200802480](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/7ddd3b605bae42c5bff68601d91eeab4~tplv-k3u1fbpfcp-zoom-1.image) #### 觸發器原型 - onElement - onProcessingTime - onEventTime - onMerge - clear #### 說明 - TriggerResult可以是以下之一 - CONTINUE 什麼都不做 - FIRE_AND_PURGE 觸發計算,然後清除視窗中的元素 - FIRE 觸發計算 預設情況下,內建的觸發器只返回 FIRE,不會清除視窗狀態。 - PURGE 清除視窗中的元素 - 所有的事件時間視窗分配器都有一個 EventTimeTrigger 作為預設觸發器。一旦 watermark 到達視窗末尾,這個觸發器就會被觸發。 - 全域性視窗(GlobalWindow)的預設觸發器是永不會被觸發的 NeverTrigger。因此,在使用全域性視窗時,必須自定義一個觸發器。 - 通過使用 trigger() 方法指定觸發器,將會覆蓋視窗分配器的預設觸發器。例如,如果你為 TumblingEventTimeWindows 指定 CountTrigger, 那麼不會再根據時間進度觸發視窗,而只能通過計數。目前為止,如果你希望基於時間以及計數進行觸發,則必須編寫自己的自定義觸發器。 ## 視窗的分類 - 根據視窗是否呼叫keyBy運算元key化,分為被Keys化Windows和非被Keys化Windows; ![flink window圖解](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/c010b3c1135d4b9882c8b8c1871807e3~tplv-k3u1fbpfcp-zoom-1.image) - 根據視窗的驅動方式,分為時間驅動(Time Window)、資料驅動(Count Window); - 根據視窗的元素分配方式,分為滾動視窗(tumbling windows)、滑動視窗(sliding windows)、會話視窗(session windows)以及全域性視窗(global windows) ### 被Keys化Windows 可以理解為按照原始資料流中的某個key進行分類,擁有同一個key值的資料流將為進入同一個window,多個視窗並行的邏輯流 ```java stream .keyBy(...) <- keyed versus non-keyed windows .window(...) <- required: "assigner" [.trigger(...)] <- optional: "trigger" (else default trigger) [.evictor(...)] <- optional: "evictor" (else no evictor) [.allowedLateness(...)] <- optional: "lateness" (else zero) [.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data) .reduce/aggregate/fold/apply() <- required: "function" [.getSideOutput(...)] <- optional: "output tag" ``` ### 非被Keys化Windows - 不做分類,每進入一條資料即增加一個視窗,多個視窗並行,每個視窗處理1條資料 - WindowAll 將元素按照某種特性聚集在一起,該函式不支援並行操作,預設的並行度就是1,所以如果使用這個運算元的話需要注意一下效能問題 ```text stream .windowAll(...) <- required: "assigner" [.trigger(...)] <- optional: "trigger" (else default trigger) [.evictor(...)] <- optional: "evictor" (else no evictor) [.allowedLateness(...)] <- optional: "lateness" (else zero) [.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data) .reduce/aggregate/fold/apply() <- required: "function" [.getSideOutput(...)] <- optional: "output tag" ``` #### 區別 - 對於被Key化的資料流,可以將傳入事件的任何屬性用作鍵(此處有更多詳細資訊)。 - 擁有被Key化的資料流將允許您的視窗計算由多個任務並行執行,因為每個邏輯被Key化的資料流可以獨立於其餘任務進行處理。 引用相同Keys的所有資料元將被髮送到同一個並行任務。 ### Time-Based window(基於時間的視窗) 每一條記錄來了以後會根據時間屬性值採用不同的window assinger 方法分配給一個或者多個視窗,分為滾動視窗(Tumbling windows)和滑動視窗(Sliding windows)。 - EventTime 資料本身攜帶的時間,預設的時間屬性; - ProcessingTime 處理時間; - IngestionTime 資料進入flink程式的時間; #### Tumbling windows(滾動視窗) 滾動視窗下視窗之間不重疊,且視窗長度是固定的。我們可以用TumblingEventTimeWindows和TumblingProcessingTimeWindows建立一個基於Event Time或Processing Time的滾動時間視窗。 ![tumb-window](https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/10625231124848efaabe17095880e65f~tplv-k3u1fbpfcp-zoom-1.image) 下面示例以滾動時間視窗(`TumblingEventTimeWindows`)為例,預設模式是`TimeCharacteristic.ProcessingTime`處理時間 ```java /** The time characteristic that is used if none other is set. */ private static final TimeCharacteristic DEFAULT_TIME_CHARACTERISTIC = TimeCharacteristic.ProcessingTime; ``` 所以如果使用`Event Time`即資料的實際產生時間,需要通過`senv.setStreamTimeCharacteristic`指定 ```java // 指定使用資料的實際時間 senv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); Da