[白話解析] Flink的Watermark機制

0x00 摘要

對於Flink來說,Watermark是個很難繞過去的概念。本文將從整體的思路上來說,運用感性直覺的思考來幫大家梳理Watermark概念。

0x01 問題

關於Watermark,很容易產生幾個問題

  • Flink 流處理應用中,常見的處理需求/應對方案是什麼?
  • Watermark究竟應該翻譯成水印還是水位線?
  • Watermark本質是什麼?
  • Watermark是如何解決問題?

下面我們就來簡要解答這些問題以給大家一個大致概念,在後文中,會再深入描述。

聚合類的處理 Flink可以每來一個訊息就處理一次,但是有時我們需要做一些聚合類的處理,例如:在過去的1分鐘內有多少使用者點選了我們的網頁。所以Flink引入了視窗概念。

視窗 視窗的作用為了週期性的獲取資料。就是把傳入的原始資料流切分成多個buckets,所有計算都在單一的buckets中進行。視窗(window)就是從 Streaming 到 Batch 的一個橋樑。

帶來的問題:聚合類處理帶來了新的問題,比如亂序/延遲。其解決方案就是 Watermark / allowLateNess / sideOutPut 這一組合拳。

Watermark 的作用是防止 資料亂序 / 指定時間內獲取不到全部資料。

allowLateNess 是將視窗關閉時間再延遲一段時間。

sideOutPut 是最後兜底操作,當指定視窗已經徹底關閉後,就會把所有過期延遲資料放到側輸出流,讓使用者決定如何處理。

總結起來就是說

Windows -----> Watermark -----> allowLateNess -----> sideOutPut 
    
用Windows把流資料分塊處理,用Watermark確定什麼時候不再等待更早的資料/觸發視窗進行計算,用allowLateNess 將視窗關閉時間再延遲一段時間。用sideOutPut 最後兜底把資料匯出到其他地方。

問題2. Watermark應該翻譯成水位線

我最初看的一篇文章中把Watermark翻譯成“水印”。我當時比較暈。因為按說名字一定能夠反應事物本質。但是我怎麼也腦補不出這個”水印“的本質。

繼續看文章內容,越來越覺得這個應該翻譯成“水位線”。於是查了查,確實英文有如下翻譯:high-water mark 高水位線(海水或洪水所達到的最高水位)。

後來逐漸看到其他文章中也有翻譯成水位線,我才放心下來,終於不會出現第二個“套接字”這樣神奇的翻譯了

問題3. Watermark本質是什麼

Watermarks是基於已經收集的訊息來估算是否還有訊息未到達,本質上是一個時間戳。時間戳反映的是事件發生的時間,而不是事件處理的時間。

這個從Flink的原始碼就能看出來,唯一有意義的成員變數就是 timestamp。

public final class Watermark extends StreamElement {
  /*The watermark that signifies end-of-event-time. */
  public static final Watermark MAX_WATERMARK = new Watermark(Long.MAX_VALUE);
  /* The timestamp of the watermark in milliseconds. */
  private final long timestamp;
  /* Creates a new watermark with the given timestamp in milliseconds.*/
  public Watermarklong timestamp) {
    this.timestamp = timestamp;
  }
  /*Returns the timestamp associated with this {@link Watermark} in milliseconds.**/
  public long getTimestamp() {
    return timestamp;
  }
}

問題4. Watermark如何解決問題

Watermark是一種告訴Flink一個訊息延遲多少的方式。它定義了什麼時候不再等待更早的資料

可以把Watermarks理解為一個水位線,這個Watermarks在不斷的變化。Watermark實際上作為資料流的一部分隨資料流流動

當Flink中的運算子接收到Watermarks時,它明白早於該時間的訊息已經完全抵達計算引擎,即假設不會再有時間小於水位線的事件到達

這個假設是觸發視窗計算的基礎,只有水位線越過視窗對應的結束時間,窗口才會關閉和進行計算

0x02 背景概念

流處理

流處理,最本質的是在處理資料的時候,接受一條處理一條資料。

批處理,則是累積資料到一定程度在處理。這是他們本質的區別。

在設計上Flink認為資料是流式的,批處理只是流處理的特例。同時對資料分為有界資料和無界資料。

  • 有界資料對應批處理,API對應Dateset。
  • 無界資料對應流處理,API對應DataStream。

亂序(out-of-order)

什麼是亂序呢?可以理解為資料到達的順序和其實際產生時間的排序不一致。導致這的原因有很多,比如延遲,訊息積壓,重試等等。

我們知道,流處理從事件產生,到流經source,再到operator,中間是有一個過程和時間的。雖然大部分情況下,流到operator的資料都是按照事件產生的時間順序來的,但是也不排除由於網路、背壓等原因,導致亂序的產生(out-of-order或者說late element)。

比如:

某資料來源中的某些資料由於某種原因(如:網路原因,外部儲存自身原因)會有5秒的延時,
也就是在實際時間的第1秒產生的資料有可能在第5秒中產生的資料之後到來(比如到Window處理節點)。

有1~10個事件。
亂序到達的序列是:2,3,4,5,1,6,3,8,9,10,7

0x03 Flink中的視窗概念

視窗

對於Flink,如果來一條訊息計算一條,這樣是可以的,但是這樣計算是非常頻繁而且消耗資源,如果想做一些統計這是不可能的。所以對於Spark和Flink都產生了視窗計算。

比如 是因為我們想看到過去一分鐘,過去半小時的訪問資料,這時候我們就需要視窗。

Window:Window是處理無界流的關鍵,Windows將流拆分為一個個有限大小的buckets,可以可以在每一個buckets中進行計算。

start_time,end_time:當Window時時間視窗的時候,每個window都會有一個開始時間和結束時間(前開後閉),這個時間是系統時間。

視窗生命週期

簡而言之,只要屬於此視窗的第一個元素到達,就會建立一個視窗,當時間(事件或處理時間)超過其結束時間戳加上使用者指定的允許延遲時,視窗將被完全刪除。

例如:

使用基於事件時間的視窗策略,每5分鐘建立一個不重疊(或翻滾)的視窗並允許延遲1分鐘。
    
假定目前是12:00。

當具有落入該間隔的時間戳的第一個元素到達時,Flink將為12:00到12:05之間的間隔建立一個新視窗,當水位線(watermark)到12:06時間戳時將刪除它。

視窗有如下元件:

Window Assigner:用來決定某個元素被分配到哪個/哪些視窗中去。

Trigger:觸發器。決定了一個視窗何時能夠被計算或清除。觸發策略可能類似於“當視窗中的元素數量大於4”時,或“當水位線通過視窗結束時”。

Evictor:它可以在 觸發器觸發後 & 應用函式之前和/或之後 從視窗中刪除元素。

視窗還擁有函式,比如 ProcessWindowFunction,ReduceFunction,AggregateFunction或FoldFunction。該函式將包含要應用於視窗內容的計算,而觸發器指定視窗被認為準備好應用該函式的條件。

Keyed vs Non-Keyed Windows

在定義視窗之前,要指定的第一件事是流是否需要Keyed,使用keyBy(...)將無界流分成邏輯的keyed stream。 如果未呼叫keyBy(...),則表示流不是keyed stream。

  • 對於Keyed流,可以將傳入事件的任何屬性用作key。 擁有Keyed stream將允許視窗計算由多個任務並行執行,因為每個邏輯Keyed流可以獨立於其餘任務進行處理。 相同Key的所有元素將被髮送到同一個任務。

  • 在Non-Keyed流的情況下,原始流將不會被分成多個邏輯流,並且所有視窗邏輯將由單個任務執行,即並行性為1。

視窗分類

視窗分類可以分成:翻滾視窗(Tumbling Window,無重疊),滾動視窗(Sliding Window,有重疊),和會話視窗,(Session Window,活動間隙)

滾動視窗
滾動視窗分配器將每個元素分配給固定視窗大小的視窗。滾動視窗大小固定的並且不重疊。例如,如果指定大小為5分鐘的滾動視窗,則將執行當前視窗,並且每五分鐘將啟動一個新視窗。

滑動視窗

滑動視窗與滾動視窗的區別就是滑動視窗有重複的計算部分。

滑動視窗分配器將每個元素分配給固定視窗大小的視窗。類似於滾動視窗分配器,視窗的大小由視窗大小引數配置。另外一個視窗滑動引數控制滑動視窗的啟動頻率(how frequently a sliding window is started)。因此,如果滑動大小小於視窗大小,滑動窗可以重疊。在這種情況下,元素被分配到多個視窗。

例如,你可以使用視窗大小為10分鐘的視窗,滑動大小為5分鐘。這樣,每5分鐘會生成一個視窗,包含最後10分鐘內到達的事件。

會話視窗
會話視窗分配器通過活動會話分組元素。與滾動視窗和滑動視窗相比,會話視窗不會重疊,也沒有固定的開始和結束時間。相反,當會話視窗在一段時間內沒有接收到元素時會關閉。

例如,不活動的間隙時。會話視窗分配器配置會話間隙,定義所需的不活動時間長度(defines how long is the required period of inactivity)。當此時間段到期時,當前會話關閉,後續元素被分配到新的會話視窗。

0x04 Flink中的時間概念

Flink在流處理程式支援不同的時間概念。分別為Event Time/Processing Time/Ingestion Time,也就是事件時間、處理時間、提取時間。

從時間序列角度來說,發生的先後順序是:

事件時間(Event Time)----> 提取時間(Ingestion Time)----> 處理時間(Processing Time)
  • Event Time 是事件在現實世界中發生的時間,它通常由事件中的時間戳描述。
  • Ingestion Time 是資料進入Apache Flink流處理系統的時間,也就是Flink讀取資料來源時間。
  • Processing Time 是資料流入到具體某個運算元 (訊息被計算處理) 時候相應的系統時間。也就是Flink程式處理該事件時當前系統時間。

但是我們講解時,會從後往前講解,把最重要的Event Time放在最後。

處理時間

是資料流入到具體某個運算元時候相應的系統時間。

這個系統時間指的是執行相應操作的機器的系統時間。當一個流程式通過處理時間來執行時,所有基於時間的操作(如: 時間視窗)將使用各自操作所在的物理機的系統時間。

ProcessingTime 有最好的效能和最低的延遲。但在分散式計算環境或者非同步環境中,ProcessingTime具有不確定性,相同資料流多次執行有可能產生不同的計算結果。因為它容易受到從記錄到達系統的速度(例如從訊息佇列)到記錄在系統內的operator之間流動的速度的影響(停電,排程或其他)。

提取時間

IngestionTime是資料進入Apache Flink框架的時間,是在Source Operator中設定的。每個記錄將源的當前時間作為時間戳,並且後續基於時間的操作(如時間視窗)引用該時間戳。

提取時間在概念上位於事件時間和處理時間之間。與處理時間相比,它稍早一些。IngestionTime與ProcessingTime相比可以提供更可預測的結果,因為IngestionTime的時間戳比較穩定(在源處只記錄一次),所以同一資料在流經不同視窗操作時將使用相同的時間戳,而對於ProcessingTime同一資料在流經不同視窗運算元會有不同的處理時間戳。

與事件時間相比,提取時間程式無法處理任何無序事件或後期資料,但程式不必指定如何生成水位線。

在內部,提取時間與事件時間非常相似,但具有自動時間戳分配和自動水位線生成功能。

事件時間

事件時間就是事件在真實世界的發生時間,即每個事件在產生它的裝置上發生的時間(當地時間)。比如一個點選事件的時間發生時間,是使用者點選操作所在的手機或電腦的時間。

在進入Apache Flink框架之前EventTime通常要嵌入到記錄中,並且EventTime也可以從記錄中提取出來。在實際的網上購物訂單等業務場景中,大多會使用EventTime來進行資料計算。

基於事件時間處理的強大之處在於即使在亂序事件,延遲事件,歷史資料以及從備份或持久化日誌中的重複資料也能獲得正確的結果。對於事件時間,時間的進度取決於資料,而不是任何時鐘

事件時間程式必須指定如何生成事件時間的Watermarks,這是表示事件時間進度的機制。

現在假設我們正在建立一個排序的資料流。這意味著應用程式處理流中的亂序到達的事件,並生成同樣事件但按時間戳(事件時間)排序的新資料流。

比如:

有1~10個事件。
亂序到達的序列是:1,2,4,5,6,3,8,9,10,7
經過按 事件時間 處理後的序列是:1,2,3,4,5,6,7,8,9,10

為了處理事件時間,Flink需要知道事件的時間戳,這意味著流中的每條資料都需要分配其事件時間戳。這通常通過提取每條資料中的固定欄位來完成時間戳的獲取。

設定時間特性

Flink DataStream 程式的第一部分通常是設定基本時間特性。 該設定定義了資料流源的行為方式(例如:它們是否將分配時間戳),以及像 KeyedStream.timeWindow(Time.seconds(30)) 這樣的視窗操作應該使用上面哪種時間概念。

比如:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);

0x05 Watermark

前文講到了事件時間,這個真實發生的時間是我們業務在實時處理程式中非常關心的。在一個理想的情況下,事件時間處理將產生完全一致和確定的結果,無論事件何時到達或其排序。但是在現實中,訊息不在是按照順序傳送,產生了亂序,這時候該怎麼處理?

Watermark是Apache Flink為了處理EventTime 視窗計算提出的一種機制,本質上也是一種時間戳。watermark是用於處理亂序事件或延遲資料的,這通常用watermark機制結合window來實現(Watermarks用來觸發window視窗計算)。

比如對於late element,我們不能無限期的等下去,必須要有個機制來保證一個特定的時間後,必須觸發window去進行計算了。這個特別的機制,就是watermark。 可以把Watermark看作是一種告訴Flink一個訊息延遲多少的方式。定義了什麼時候不再等待更早的資料。

1. 視窗觸發條件

上面談到了對資料亂序問題的處理機制是watermark+window,那麼window什麼時候該被觸發呢?

基於Event Time的事件處理,Flink預設的事件觸發條件為:

對於out-of-order及正常的資料而言

  • watermark的時間戳 > = window endTime
  • 在 [window_start_time,window_end_time] 中有資料存在。

對於late element太多的資料而言

  • Event Time > watermark的時間戳

WaterMark相當於一個EndLine,一旦Watermarks大於了某個window的end_time,就意味著windows_end_time時間和WaterMark時間相同的視窗開始計算執行了。

就是說,我們根據一定規則,計算出Watermarks,並且設定一些延遲,給遲到的資料一些機會,也就是說正常來講,對於遲到的資料,我只等你一段時間,再不來就沒有機會了。

WaterMark時間可以用Flink系統現實時間,也可以用處理資料所攜帶的Event time。

使用Flink系統現實時間,在並行和多執行緒中需要注意的問題較少,因為都是以現實時間為標準。

如果使用處理資料所攜帶的Event time作為WaterMark時間,需要注意兩點:

  • 因為資料到達並不是循序的,注意儲存一個當前最大時間戳作為WaterMark時間
  • 並行同步問題

2. WaterMark設定方法

標點水位線(Punctuated Watermark)

標點水位線(Punctuated Watermark)通過資料流中某些特殊標記事件來觸發新水位線的生成。這種方式下視窗的觸發與時間無關,而是決定於何時收到標記事件。

在實際的生產中Punctuated方式在TPS很高的場景下會產生大量的Watermark在一定程度上對下游運算元造成壓力,所以只有在實時性要求非常高的場景才會選擇Punctuated的方式進行Watermark的生成。

定期水位線(Periodic Watermark)

週期性的(允許一定時間間隔或者達到一定的記錄條數)產生一個Watermark。水位線提升的時間間隔是由使用者設定的,在兩次水位線提升時隔內會有一部分訊息流入,使用者可以根據這部分資料來計算出新的水位線。

在實際的生產中Periodic的方式必須結合時間和積累條數兩個維度繼續週期性產生Watermark,否則在極端情況下會有很大的延時。

舉個例子,最簡單的水位線演算法就是取目前為止最大的事件時間,然而這種方式比較暴力,對亂序事件的容忍程度比較低,容易出現大量遲到事件。

3. 遲到事件

雖說水位線表明著早於它的事件不應該再出現,但是上如上文所講,接收到水位線以前的的訊息是不可避免的,這就是所謂的遲到事件。實際上遲到事件是亂序事件的特例,和一般亂序事件不同的是它們的亂序程度超出了水位線的預計,導致視窗在它們到達之前已經關閉。

遲到事件出現時視窗已經關閉併產出了計算結果,因此處理的方法有3種:

  • 重新啟用已經關閉的視窗並重新計算以修正結果。
  • 將遲到事件收集起來另外處理。
  • 將遲到事件視為錯誤訊息並丟棄。

Flink 預設的處理方式是第3種直接丟棄,其他兩種方式分別使用Side OutputAllowed Lateness

Side Output機制可以將遲到事件單獨放入一個數據流分支,這會作為 window 計算結果的副產品,以便使用者獲取並對其進行特殊處理。

Allowed Lateness機制允許使用者設定一個允許的最大遲到時長。Flink 會在視窗關閉後一直儲存視窗的狀態直至超過允許遲到時長,這期間的遲到事件不會被丟棄,而是預設會觸發視窗重新計算。因為儲存視窗狀態需要額外記憶體,並且如果視窗計算使用了 ProcessWindowFunction API 還可能使得每個遲到事件觸發一次視窗的全量計算,代價比較大,所以允許遲到時長不宜設得太長,遲到事件也不宜過多,否則應該考慮降低水位線提高的速度或者調整演算法。

這裡總結機制為:

  • 視窗window 的作用是為了週期性的獲取資料。

  • watermark的作用是防止資料出現亂序(經常),事件時間內獲取不到指定的全部資料,而做的一種保險方法。

  • allowLateNess是將視窗關閉時間再延遲一段時間。
  • sideOutPut是最後兜底操作,所有過期延遲資料,指定視窗已經徹底關閉了,就會把資料放到側輸出流。

4. 例項

採用系統時間做Watermark

我們將水位線設定為當前系統時間間-5秒。

override def getCurrentWatermark(): Watermark = {       
    new Watermark(System.currentTimeMillis - 5000) 
}

通常最好保持接收到的最大時間戳,並建立具有最大預期延遲的水位線,而不是從當前系統時間減去

採用Event Time做watermark

例如基於Event Time的資料,自身都包含一個型別為timestamp的欄位,假設叫做rowtime,例如1543903383(2018-12-04 14:03:03),定義一個基於rowtime列,策略為偏移3s的watermark,這條資料的水位線時間戳則是:

1543903383-3000 = 1543900383(2018-12-04 14:03:00)

該條資料的水位線時間含義:timestamp小於1543900383(2018-12-04 14:03:00)的資料,都已經到達了。

class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {
    val maxOutOfOrderness = 3000L; // 3 seconds
    var currentMaxTimestamp: Long;
    override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
        val timestamp = element.getCreationTime()
        currentMaxTimestamp = max(timestamp, currentMaxTimestamp)
        timestamp;
    }
    override def getCurrentWatermark(): Watermark = {
        // return the watermark as current highest timestamp minus the out-of-orderness bound
        new Watermark(currentMaxTimestamp - maxOutOfOrderness);
    }
}

看看如何觸發視窗

我們明白了視窗的觸發機制,這裡我們添加了水位線,到底是個怎麼個情況?我們來看下面

假如我們設定10s的時間視窗(window),那麼0~10s,10~20s都是一個視窗,以0~10s為例,0為start-time,10為end-time。假如有4個數據的event-time分別是8(A),12.5(B),9(C),13.5(D),我們設定Watermarks為當前所有到達資料event-time的最大值減去延遲值3.5秒

當A到達的時候,Watermarks為max{8}-3.5=8-3.5 = 4.5 < 10,不會觸發計算
當B到達的時候,Watermarks為max(12.5,8)-3.5=12.5-3.5 = 9 < 10,不會觸發計算
當C到達的時候,Watermarks為max(12.5,8,9)-3.5=12.5-3.5 = 9 < 10,不會觸發計算
當D到達的時候,Watermarks為max(13.5,12.5,8,9)-3.5=13.5-3.5 = 10 = 10,觸發計算
觸發計算的時候,會將A,C(因為他們都小於10)都計算進去,其中C是遲到的。

max這個很關鍵,就是當前視窗內,所有事件的最大事件

這裡的延遲3.5s是我們假設一個數據到達的時候,比他早3.5s的資料肯定也都到達了,這個是需要根據經驗推算。假設加入D到達以後有到達了一個E,event-time=6,但是由於0~10的時間視窗已經開始計算了,所以E就丟了。

從這裡上面E的丟失說明,水位線也不是萬能的,但是如果根據我們自己的生產經驗+側道輸出等方案,可以做到資料不丟失。

0x06 Flink原始碼

資料結構定義

在Flink DataStream中流動著不同的元素,統稱為StreamElement,StreamElement可以是StreamRecord、Watermark、StreamStatus、LatencyMarker中任何一種型別。

StreamElement

StreamElement是一個抽象類(是Flink 承載訊息的基類),其他四種類型繼承StreamElement。

public abstract class StreamElement {
  //判斷是否是Watermark
  public final boolean isWatermark() {
    return getClass() == Watermark.class;
  }
  //判斷是否為StreamStatus
  public final boolean isStreamStatus() {
    return getClass() == StreamStatus.class;
  }
  //判斷是否為StreamRecord
  public final boolean isRecord() {
    return getClass() == StreamRecord.class;
  }
  //判斷是否為LatencyMarker
  public final boolean isLatencyMarker() {
    return getClass() == LatencyMarker.class;
  }
  //轉換為StreamRecord
  public final <E> StreamRecord<E> asRecord() {
    return (StreamRecord<E>) this;
  }
  //轉換為Watermark
  public final Watermark asWatermark() {
    return (Watermark) this;
  }
  //轉換為StreamStatus
  public final StreamStatus asStreamStatus() {
    return (StreamStatus) this;
  }
  //轉換為LatencyMarker
  public final LatencyMarker asLatencyMarker() {
    return (LatencyMarker) this;
  }
}

Watermark

Watermark繼承了StreamElement。Watermark 是和事件一個級別的抽象,其內部包含一個成員變數時間戳timestamp,標識當前資料的時間進度。Watermark實際上作為資料流的一部分隨資料流流動

@PublicEvolving
public final class Watermark extends StreamElement {
  /*The watermark that signifies end-of-event-time. */
  public static final Watermark MAX_WATERMARK = new Watermark(Long.MAX_VALUE);
  /* The timestamp of the watermark in milliseconds. */
  private final long timestamp;
  /* Creates a new watermark with the given timestamp in milliseconds.*/
  public Watermarklong timestamp) {
    this.timestamp = timestamp;
  }
  /*Returns the timestamp associated with this {@link Watermark} in milliseconds.**/
  public long getTimestamp() {
    return timestamp;
  }
}

Flink如何生成&處理Watermark

在實際使用中大多數情況下會選擇週期性生成方式也就是AssignerWithPeriodicWatermarks方式.

//指定為evenTime時間語義
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//生成watermark的週期
env.getConfig.setAutoWatermarkInterval(watermarkInterval)
//指定方式
dataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Element](Time.seconds(allowDealy)) {
   override def extractTimestamp(element: Element): Long = element.dT
  })

BoundedOutOfOrdernessTimestampExtractor 是Flink內建提供的允許亂序最大延時的watermark生成方式,只需要重寫其extractTimestamp方法即可。

assignTimestampsAndWatermarks 可以理解為是一個運算元轉換操作,等同於map/window一樣理解,可以為其設定並行度、名稱,也是一個transformation/operator,

public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(
        AssignerWithPeriodicWatermarks<T> timestampAndWatermarkAssigner) {

    // match parallelism to input, otherwise dop=1 sources could lead to some strange
    // behaviour: the watermark will creep along very slowly because the elements
    // from the source go to each extraction operator round robin.
    final int inputParallelism = getTransformation().getParallelism();
    final AssignerWithPeriodicWatermarks<T> cleanedAssigner = clean(timestampAndWatermarkAssigner);

    TimestampsAndPeriodicWatermarksOperator<T> operator =
            new TimestampsAndPeriodicWatermarksOperator<>(cleanedAssigner);

    return transform("Timestamps/Watermarks", getTransformation().getOutputType(), operator)
            .setParallelism(inputParallelism);
}

其使用的StreamOperator型別TimestampsAndPeriodicWatermarksOperator,繼承了AbstractUdfStreamOperator,實現了OneInputStreamOperator介面與ProcessingTimeCallback介面,

TimestampsAndPeriodicWatermarksOperator。

/**
 * A stream operator that extracts timestamps from stream elements and
 * generates periodic watermarks.
 *
 * @param <T> The type of the input elements
 */
public class TimestampsAndPeriodicWatermarksOperator<T>
        extends AbstractUdfStreamOperator<T, AssignerWithPeriodicWatermarks<T>>
        implements OneInputStreamOperator<T, T>, ProcessingTimeCallback {

    private static final long serialVersionUID = 1L;
    private transient long watermarkInterval;
    private transient long currentWatermark;

    public TimestampsAndPeriodicWatermarksOperator(AssignerWithPeriodicWatermarks<T> assigner) {
        super(assigner);
        this.chainingStrategy = ChainingStrategy.ALWAYS;
    }

    @Override
    public void open() throws Exception {
        super.open();
        //初始化預設當前watermark
        currentWatermark = Long.MIN_VALUE;
        //生成watermark週期時間配置
        watermarkInterval = getExecutionConfig().getAutoWatermarkInterval();
        //註冊定時其配置
        if (watermarkInterval > 0) {
            long now = getProcessingTimeService().getCurrentProcessingTime();
            //註冊一個watermarkInterval後觸發的定時器,傳入回撥引數是this,也就是會呼叫當前物件的onProcessingTime方法
            getProcessingTimeService().registerTimer(now + watermarkInterval, this);
        }
    }

    @Override
    public void processElement(StreamRecord<T> element) throws Exception {
        //提取當前的事件時間
        final long newTimestamp = userFunction.extractTimestamp(element.getValue(),
                element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE);
        //儲存當前最大的事件時間。
        output.collect(element.replace(element.getValue(), newTimestamp));
    }

    @Override
    public void onProcessingTime(long timestamp) throws Exception {
        //此方法表示的就是定時回撥的方法,將符合要求的watermark傳送出去並且註冊下一個定時器。
        // register next timer
        Watermark newWatermark = userFunction.getCurrentWatermark();
        //當新的watermark大於當前的watermark
        if (newWatermark != null && newWatermark.getTimestamp() > currentWatermark) {
            currentWatermark = newWatermark.getTimestamp();
            //將符合要求的watermark傳送出去
            // emit watermark
            output.emitWatermark(newWatermark);
        }
        //註冊下一次觸發時間
        long now = getProcessingTimeService().getCurrentProcessingTime();
        getProcessingTimeService().registerTimer(now + watermarkInterval, this);
    }

    /**
     * Override the base implementation to completely ignore watermarks propagated from
     * upstream (we rely only on the {@link AssignerWithPeriodicWatermarks} to emit
     * watermarks from here).
     */
    @Override
    public void processWatermark(Watermark mark) throws Exception {
        //用來處理上游傳送過來的watermark,可以認為不做任何處理,下游的watermark只與其上游最近的生成方式相關。
        // if we receive a Long.MAX_VALUE watermark we forward it since it is used
        // to signal the end of input and to not block watermark progress downstream
        if (mark.getTimestamp() == Long.MAX_VALUE && currentWatermark != Long.MAX_VALUE) {
            currentWatermark = Long.MAX_VALUE;
            output.emitWatermark(mark);
        }
    }

    @Override
    public void close() throws Exception {
        super.close();

        // emit a final watermark
        Watermark newWatermark = userFunction.getCurrentWatermark();
        if (newWatermark != null && newWatermark.getTimestamp() > currentWatermark) {
            currentWatermark = newWatermark.getTimestamp();
            // emit watermark
            output.emitWatermark(newWatermark);
        }
    }
}

Flink如何處理遲到資料

這裡我們使用 Side Output機制來說明。Side Output機制可以將遲到事件單獨放入一個數據流分支,這會作為 window 計算結果的副產品,以便使用者獲取並對其進行特殊處理。

生成新的Watermark

Flink會替換StreamRecord 物件中的Timestamp,如果 根據當前事件的Timestamp 生成的Watermark 大於上一次的Watermark,就發出新的Watermark。

具體程式碼在 TimestampsAndPunctuatedWatermarksOperator.processElement。

@Override
public void processElement(StreamRecord<T> element) throws Exception {
    final T value = element.getValue();
    // 呼叫 使用者實現的 extractTimestamp 獲取新的Timestamp
    final long newTimestamp = userFunction.extractTimestamp(value,
            element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE);
    // 用新Timestamp 替換StreamRecord中的舊Timestamp
    output.collect(element.replace(element.getValue(), newTimestamp));
    // 呼叫 使用者實現的 checkAndGetNextWatermark 方法獲取下一個Watermark
    final Watermark nextWatermark = userFunction.checkAndGetNextWatermark(value, newTimestamp);
    // 如果下一個Watermark 大於當前Watermark,就發出新的Watermark
    if (nextWatermark != null && nextWatermark.getTimestamp() > currentWatermark) {
        currentWatermark = nextWatermark.getTimestamp();
        output.emitWatermark(nextWatermark);
    }
}

處理遲到資料

首先,判斷是否是遲到資料。

@Override
public void processElement(StreamRecord<IN> element) throws Exception {
            for (W window: elementWindows) {
                // drop if the window is already late
                // 如果視窗已經遲到了,則處理下一條資料
                if (isWindowLate(window)) {
                    continue;
                }   
            }
    ......
}

/**
 Returns {@code true} if the watermark is after the end timestamp plus the allowed lateness of the given window.
 */
protected boolean isWindowLate(W window) {
    // 當前機制是 事件時間 && 視窗元素的最大時間戳 + 允許遲到時間 <= 當前水位線 的時候為true(即當前視窗元素遲到了)
    return (windowAssigner.isEventTime() && (cleanupTime(window) <= internalTimerService.currentWatermark()));
}

/**
 * Returns the cleanup time for a window, which is
 * {@code window.maxTimestamp + allowedLateness}. In
 * case this leads to a value greater than {@link Long#MAX_VALUE}
 * then a cleanup time of {@link Long#MAX_VALUE} is
 * returned.
 *
 * @param window the window whose cleanup time we are computing.
 */
private long cleanupTime(W window) {
    if (windowAssigner.isEventTime()) {
        long cleanupTime = window.maxTimestamp() + allowedLateness;
    //返回視窗的 cleanup 時間 : 視窗元素的最大時間戳 + 允許延遲的時間
        return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE;
    } else {
        return window.maxTimestamp();
    }
}

其次,處理遲到資料的具體程式碼在WindowOperator.processElement 方法的最後一段。這裡就是旁路輸出。

@Override
public void processElement(StreamRecord<IN> element) throws Exception {
    
    ......
    // 其他操作
    ......
    
    // side output input event if element not handled by any window  late arriving tag has been set
    // 如果沒有window處理過這條資料,isSkippedElement = true,如果上面判斷為遲到資料,isSkippedElement = false
    // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp
    if (isSkippedElement && isElementLate(element)) {
      if (lateDataOutputTag != null){
          //旁路輸出
          //這就是我們之前提到的,Flink 的 Side Output 機制可以將遲到事件單獨放入一個數據流分支,這會作為 window 計算結果的副產品,以便使用者獲取並對其進行特殊處理。
        sideOutput(element);
      } else {
        this.numLateRecordsDropped.inc();
      }
    }
}

/**
 * Decide if a record is currently late, based on current watermark and allowed lateness.
 * 當前機制是 事件時間 && (元素時間戳 + 允許延遲的時間) <= 當前水位線
 * @param element The element to check
 * @return The element for which should be considered when sideoutputs
 */
protected boolean isElementLate(StreamRecord<IN> element){
    return (windowAssigner.isEventTime()) &&
        (element.getTimestamp() + allowedLateness <= internalTimerService.currentWatermark());
}

/**
 * Write skipped late arriving element to SideOutput.
 * // 把資料輸出到旁路,供使用者決定如何處理。
 * @param element skipped late arriving element to side output
 */
protected void sideOutput(StreamRecord<IN> element){
    output.collect(lateDataOutputTag, element);
}

0x06 參考

Flink實時性、容錯機制、視窗等介紹

徹底明白Flink系統學習11:【Flink1.7】事件時間、處理時間、提取時間有什麼區別

徹底明白Flink系統學習10:【Flink1.7】視窗生命週期、Keyed和非Keyed及分配器詮釋

Flink 輕鬆理解Watermark

https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/event_time.html#event-time--processing-time--ingestion-time

http://smartsi.club/flink-stream-event-time-and-processing-time.html

Flink Event Time和WaterMark結合優勢分析

Flink WaterMark(水位線)分散式執行理解

初學Flink,對Watermarks的一些理解和感悟

淺談WaterMark

Flink WaterMark例項

Apache Flink 漫談系列(03) - Watermark

Flink 的Event Time

Flink流計算程式設計--watermark(水位線)簡介

Flink Watermark 機制淺析(透徹)

Flink Time和Watermark的理解

【原始碼解析】Flink 是如何處理遲到資料

flink之延遲資料處理watermark allowedLateness() sideOutputLateData()

Flink中Watermark定時生成原始碼分