Flink中Periodic水印和Punctuated水印實現原理(原始碼分析)
在使用者程式碼中,我們設定生成水印和事件時間的方法assignTimestampsAndWatermarks()中這裡有個方法的過載
我們傳入的物件分為兩種
AssignerWithPunctuatedWatermarks(可以理解為每條資料都會產生水印,如果不想產生水印,返回一個null的水印)
AssignerWithPeriodicWatermarks(週期性的生成水印)
來看一下原始碼中是如何實現這兩種水印的
二話不說開啟org.apache.flink.streaming.runtime.operators.TimestampsAndPunctuatedWatermarksOperator.java
這個類的processElement方法
看到原始碼這裡這段邏輯就 非常的清晰了
先通過使用者的程式碼獲取到事件時間,注入到element裡面就直接往下個opeartor傳送了
然後通過使用者程式碼獲取水印,這裡會判斷水印是否為null
不為null的就直接往下游emit 了
現在看一下AssignerWithPeriodicWatermarks如何週期的傳送生成的水印
直接開啟TimestampsAndPeriodicWatermarksOperator.java這個類
這裡先不看processElement()方法,先看open方法
可以看到它將 當前時間其實就是System.currentTimeMillis()+ watermarkInterval水印間隔 註冊作為了一個timer定時器
這樣就知道了,當他過了這個水印間隔時間以後肯定會觸發操作
來看一下這個間隔時間以後觸發了什麼操作
可以看到,他先是獲取了當前的水印時間,然後直接emit出去了????
Periodic模式明明是在接收資料的processElement()傳送水印的
然後又再次註冊了一個 當前時間+間隔的 timer,這樣就無限的觸發下去了
既然他在這裡傳送了水印,來看下他的processElement方法
果然他週期性的傳送水印以後,接收資料的processElement()方法裡面就沒有傳送水印了
只有獲取事件時間的邏