1. 程式人生 > >Flink 中極其重要的 Time 與 Window 詳細解析(深度好文,建議收藏)

Flink 中極其重要的 Time 與 Window 詳細解析(深度好文,建議收藏)

### 前言 Flink 是流式的、實時的 計算引擎 上面一句話就有兩個概念,一個是流式,一個是實時。 **流式**:就是資料來源源不斷的流進來,也就是資料沒有邊界,但是我們計算的時候必須在一個有邊界的範圍內進行,所以這裡面就有一個問題,邊界怎麼確定? 無非就兩種方式,**根據時間段或者資料量進行確定**,根據時間段就是每隔多長時間就劃分一個邊界,根據資料量就是每來多少條資料劃分一個邊界,Flink 中就是這麼劃分邊界的,本文會詳細講解。 **實時**:就是資料傳送過來之後立馬就進行相關的計算,然後將結果輸出。這裡的計算有兩種: - **一種是隻有邊界內的資料進行計算**,這種好理解,比如統計每個使用者最近五分鐘內瀏覽的新聞數量,就可以取最近五分鐘內的所有資料,然後根據每個使用者分組,統計新聞的總數。 - **另一種是邊界內資料與外部資料進行關聯計算**,比如:統計最近五分鐘內瀏覽新聞的使用者都是來自哪些地區,這種就需要將五分鐘內瀏覽新聞的使用者資訊與 hive 中的地區維表進行關聯,然後在進行相關計算。 本篇文章所講的 Flink 的內容就是圍繞以上概念進行詳細剖析的! ## Time與Window ### Time 在Flink中,如果以時間段劃分邊界的話,那麼時間就是一個極其重要的欄位。 Flink中的時間有三種類型,如下圖所示: ![](https://cdn.jsdelivr.net/gh/sunmyuan/cdn/210123_1.png) - **Event Time**:是事件建立的時間。它通常由事件中的時間戳描述,例如採集的日誌資料中,每一條日誌都會記錄自己的生成時間,Flink通過時間戳分配器訪問事件時間戳。 - **Ingestion Time**:是資料進入Flink的時間。 - **Processing Time**:是每一個執行基於時間操作的運算元的本地系統時間,與機器相關,預設的時間屬性就是Processing Time。 例如,一條日誌進入Flink的時間為2021-01-22 10:00:00.123,到達Window的系統時間為2021-01-22 10:00:01.234,日誌的內容如下: 2021-01-06 18:37:15.624 INFO Fail over to rm2 對於業務來說,要統計1min內的故障日誌個數,哪個時間是最有意義的?—— eventTime,因為我們要根據日誌的生成時間進行統計。 ### Window Window,即視窗,我們前面一直提到的邊界就是這裡的Window(視窗)。 官方解釋:**流式計算是一種被設計用於處理無限資料集的資料處理引擎,而無限資料集是指一種不斷增長的本質上無限的資料集,而window是一種切割無限資料為有限塊進行處理的手段**。 所以**Window是無限資料流處理的核心,Window將一個無限的stream拆分成有限大小的”buckets”桶,我們可以在這些桶上做計算操作**。 #### Window型別 本文剛開始提到,劃分視窗就兩種方式: 1. 根據時間進行擷取(time-driven-window),比如每1分鐘統計一次或每10分鐘統計一次。 2. 根據資料進行擷取(data-driven-window),比如每5個數據統計一次或每50個數據統計一次。 ![視窗型別](https://cdn.jsdelivr.net/gh/sunmyuan/cdn/210123_2.png) 對於TimeWindow(根據時間劃分視窗), 可以根據視窗實現原理的不同分成三類:**滾動視窗(Tumbling Window)、滑動視窗(Sliding Window)和會話視窗(Session Window)**。 1. **滾動視窗(Tumbling Windows)** 將資料依據固定的視窗長度對資料進行切片。 特點:**時間對齊,視窗長度固定,沒有重疊**。 滾動視窗分配器將每個元素分配到一個指定視窗大小的視窗中,滾動視窗有一個固定的大小,並且不會出現重疊。 例如:如果你指定了一個5分鐘大小的滾動視窗,視窗的建立如下圖所示: ![滾動視窗](https://cdn.jsdelivr.net/gh/sunmyuan/cdn/210123_3.png) 適用場景:適合做BI統計等(做每個時間段的聚合計算)。 2. **滑動視窗(Sliding Windows)** 滑動視窗是固定視窗的更廣義的一種形式,滑動視窗由固定的視窗長度和滑動間隔組成。 特點:**時間對齊,視窗長度固定,有重疊**。 滑動視窗分配器將元素分配到固定長度的視窗中,與滾動視窗類似,視窗的大小由視窗大小引數來配置,另一個視窗滑動引數控制滑動視窗開始的頻率。因此,滑動視窗如果滑動引數小於視窗大小的話,視窗是可以重疊的,在這種情況下元素會被分配到多個視窗中。 例如,你有10分鐘的視窗和5分鐘的滑動,那麼每個視窗中5分鐘的窗口裡包含著上個10分鐘產生的資料,如下圖所示: ![滑動視窗](https://cdn.jsdelivr.net/gh/sunmyuan/cdn/210123_9.png) 適用場景:對最近一個時間段內的統計(求某介面最近5min的失敗率來決定是否要報警)。 3. **會話視窗(Session Windows)** 由一系列事件組合一個指定時間長度的timeout間隙組成,類似於web應用的session,也就是一段時間沒有接收到新資料就會生成新的視窗。 特點:**時間無對齊**。 session視窗分配器通過session活動來對元素進行分組,session視窗跟滾動視窗和滑動視窗相比,不會有重疊和固定的開始時間和結束時間的情況,相反,**當它在一個固定的時間週期內不再收到元素,即非活動間隔產生,那個這個視窗就會關閉**。一個session視窗通過一個session間隔來配置,這個session間隔定義了非活躍週期的長度,當這個非活躍週期產生,那麼當前的session將關閉並且後續的元素將被分配到新的session視窗中去。 ![會話視窗](https://cdn.jsdelivr.net/gh/sunmyuan/cdn/210123_10.png) ### Window API #### TimeWindow TimeWindow是將指定時間範圍內的所有資料組成一個window,一次對一個window裡面的所有資料進行計算(就是本文開頭說的對一個邊界內的資料進行計算)。 我們以 **紅綠燈路口通過的汽車數量** 為例子: 紅綠燈路口會有汽車通過,一共會有多少汽車通過,無法計算。因為車流源源不斷,計算沒有邊界。 所以我們統計每15秒鐘通過紅路燈的汽車數量,如第一個15秒為2輛,第二個15秒為3輛,第三個15秒為1輛 ... - **tumbling-time-window (無重疊資料)** 我們使用 Linux 中的 nc 命令模擬資料的傳送方 ```shell 1.開啟發送埠,埠號為9999 nc -lk 9999 2.傳送內容(key 代表不同的路口,value 代表每次通過的車輛) 一次傳送一行,傳送的時間間隔代表汽車經過的時間間隔 9,3 9,2 9,7 4,9 2,6 1,5 2,3 5,7 5,4 ``` Flink 進行採集資料並計算: ``` object Window { def main(args: Array[String]): Unit = { //TODO time-window //1.建立執行環境 val env = StreamExecutionEnvironment.getExecutionEnvironment //2.定義資料流來源 val text = env.socketTextStream("localhost", 9999) //3.轉換資料格式,text->CarWc case class CarWc(sensorId: Int, carCnt: Int) val ds1: DataStream[CarWc] = text.map { line => { val tokens = line.split(",") CarWc(tokens(0).trim.toInt, tokens(1).trim.toInt) } } //4.執行統計操作,每個sensorId一個tumbling視窗,視窗的大小為5秒 //也就是說,每5秒鐘統計一次,在這過去的5秒鐘內,各個路口通過紅綠燈汽車的數量。 val ds2: DataStream[CarWc] = ds1 .keyBy("sensorId") .timeWindow(Time.seconds(5)) .sum("carCnt") //5.顯示統計結果 ds2.print() //6.觸發流計算 env.execute(this.getClass.getName) } } ``` 我們傳送的資料並沒有指定時間欄位,所以Flink使用的是預設的 Processing Time,也就是Flink系統處理資料時的時間。 - **sliding-time-window (有重疊資料)** ``` //1.建立執行環境 val env = StreamExecutionEnvironment.getExecutionEnvironment //2.定義資料流來源 val text = env.socketTextStream("localhost", 9999) //3.轉換資料格式,text->CarWc case class CarWc(sensorId: Int, carCnt: Int) val ds1: DataStream[CarWc] = text.map { line => { val tokens = line.split(",") CarWc(tokens(0).trim.toInt, tokens(1).trim.toInt) } } //4.執行統計操作,每個sensorId一個sliding視窗,視窗時間10秒,滑動時間5秒 //也就是說,每5秒鐘統計一次,在這過去的10秒鐘內,各個路口通過紅綠燈汽車的數量。 val ds2: DataStream[CarWc] = ds1 .keyBy("sensorId") .timeWindow(Time.seconds(10), Time.seconds(5)) .sum("carCnt") //5.顯示統計結果 ds2.print() //6.觸發流計算 env.execute(this.getClass.getName) ``` #### CountWindow CountWindow根據視窗中相同key元素的數量來觸發執行,執行時只計算元素數量達到視窗大小的key對應的結果。 **注意:CountWindow的window_size指的是相同Key的元素的個數,不是輸入的所有元素的總數**。 - **tumbling-count-window (無重疊資料)** ``` //1.建立執行環境 val env = StreamExecutionEnvironment.getExecutionEnvironment //2.定義資料流來源 val text = env.socketTextStream("localhost", 9999) //3.轉換資料格式,text->CarWc case class CarWc(sensorId: Int, carCnt: Int) val ds1: DataStream[CarWc] = text.map { (f) => { val tokens = f.split(",") CarWc(tokens(0).trim.toInt, tokens(1).trim.toInt) } } //4.執行統計操作,每個sensorId一個tumbling視窗,視窗的大小為5 //按照key進行收集,對應的key出現的次數達到5次作為一個結果 val ds2: DataStream[CarWc] = ds1 .keyBy("sensorId") .countWindow(5) .sum("carCnt") //5.顯示統計結果 ds2.print() //6.觸發流計算 env.execute(this.getClass.getName) ``` *** - **sliding-count-window (有重疊資料)** 同樣也是視窗長度和滑動視窗的操作:視窗長度是5,滑動長度是3 ``` //1.建立執行環境 val env = StreamExecutionEnvironment.getExecutionEnvironment //2.定義資料流來源 val text = env.socketTextStream("localhost", 9999) //3.轉換資料格式,text->CarWc case class CarWc(sensorId: Int, carCnt: Int) val ds1: DataStream[CarWc] = text.map { (f) => { val tokens = f.split(",") CarWc(tokens(0).trim.toInt, tokens(1).trim.toInt) } } //4.執行統計操作,每個sensorId一個sliding視窗,視窗大小3條資料,視窗滑動為3條資料 //也就是說,每個路口分別統計,收到關於它的3條訊息時統計在最近5條訊息中,各自路口通過的汽車數量 val ds2: DataStream[CarWc] = ds1 .keyBy("sensorId") .countWindow(5, 3) .sum("carCnt") //5.顯示統計結果 ds2.print() //6.觸發流計算 env.execute(this.getClass.getName) ``` *** - **Window 總結** 1. flink支援兩種劃分視窗的方式(time和count) - 如果根據時間劃分視窗,那麼它就是一個time-window - 如果根據資料劃分視窗,那麼它就是一個count-window 2. flink支援視窗的兩個重要屬性(size和interval) - 如果size=interval,那麼就會形成tumbling-window(無重疊資料) - 如果size>interval,那麼就會形成sliding-window(有重疊資料) -