Flink流計算程式設計--watermark(水位線)簡介
1、watermark的概念
watermark是一種衡量Event Time進展的機制,它是資料本身的一個隱藏屬性。通常基於Event Time的資料,自身都包含一個timestamp,例如1472693399700(2016-09-01 09:29:59.700),而這條資料的watermark時間則可能是:
watermark(1472693399700) = 1472693396700(2016-09-01 09:29:56.700)
這條資料的watermark時間是什麼含義呢?即:timestamp小於1472693396700(2016-09-01 09:29:56.700)的資料,都已經到達了。
圖中藍色虛線和實線代表著watermark的時間。
2、watermark有什麼用?
watermark是用於處理亂序事件的,而正確的處理亂序事件,通常用watermark機制結合window來實現。
我們知道,流處理從事件產生,到流經source,再到operator,中間是有一個過程和時間的。雖然大部分情況下,流到operator的資料都是按照事件產生的時間順序來的,但是也不排除由於網路、背壓等原因,導致亂序的產生(out-of-order或者說late element)。
但是對於late element,我們又不能無限期的等下去,必須要有個機制來保證一個特定的時間後,必須觸發window去進行計算了。這個特別的機制,就是watermark。
3、watermark如何分配?
通常,在接收到source的資料後,應該立刻生成watermark;但是,也可以在source後,應用簡單的map或者filter操作,然後再生成watermark。
生成watermark的方式主要有2大類:
(1):With Periodic Watermarks
(2):With Punctuated Watermarks
第一種可以定義一個最大允許亂序的時間,這種情況應用較多。
我們主要來圍繞Periodic Watermarks來說明,下面是生成periodic watermark的方法:
/**
* This generator generates watermarks assuming that elements come out of order to a certain degree only.
* The latest elements for a certain timestamp t will arrive at most n milliseconds after the earliest
* elements for timestamp t.
*/
class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {
val maxOutOfOrderness = 3500L; // 3.5 seconds
var currentMaxTimestamp: Long;
override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
val timestamp = element.getCreationTime()
currentMaxTimestamp = max(timestamp, currentMaxTimestamp)
timestamp;
}
override def getCurrentWatermark(): Watermark = {
// return the watermark as current highest timestamp minus the out-of-orderness bound
new Watermark(currentMaxTimestamp - maxOutOfOrderness);
}
}
程式中有一個extractTimestamp方法,就是根據資料本身的Event time來獲取;還有一個getCurrentWatermar方法,是用currentMaxTimestamp - maxOutOfOrderness來獲取的。
這裡的概念有點抽象,下面我們結合資料,在window中來實際演示下每個element的timestamp和watermark是多少,以及何時觸發window。
4、生成並跟蹤watermark程式碼
4.1、程式說明
我們從socket接收資料,然後經過map後立刻抽取timetamp並生成watermark,之後應用window來看看watermark和event time如何變化,才導致window被觸發的。
4.2、程式碼如下
import java.text.SimpleDateFormat
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
object WatermarkTest {
def main(args: Array[String]): Unit = {
if (args.length != 2) {
System.err.println("USAGE:\nSocketWatermarkTest <hostname> <port>")
return
}
val hostName = args(0)
val port = args(1).toInt
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val input = env.socketTextStream(hostName,port)
val inputMap = input.map(f=> {
val arr = f.split("\\W+")
val code = arr(0)
val time = arr(1).toLong
(code,time)
})
val watermark = inputMap.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[(String,Long)] {
var currentMaxTimestamp = 0L
val maxOutOfOrderness = 10000L//最大允許的亂序時間是10s
var a : Watermark = null
val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
override def getCurrentWatermark: Watermark = {
a = new Watermark(currentMaxTimestamp - maxOutOfOrderness)
a
}
override def extractTimestamp(t: (String,Long), l: Long): Long = {
val timestamp = t._2
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp)
println("timestamp:" + t._1 +","+ t._2 + "|" +format.format(t._2) +","+ currentMaxTimestamp + "|"+ format.format(currentMaxTimestamp) + ","+ a.toString)
timestamp
}
})
val window = watermark
.keyBy(_._1)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply(new WindowFunctionTest)
window.print()
env.execute()
}
class WindowFunctionTest extends WindowFunction[(String,Long),(String, Int,String,String,String,String),String,TimeWindow]{
override def apply(key: String, window: TimeWindow, input: Iterable[(String, Long)], out: Collector[(String, Int,String,String,String,String)]): Unit = {
val list = input.toList.sortBy(_._2)
val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
out.collect(key,input.size,format.format(list.head._2),format.format(list.last._2),format.format(window.getStart),format.format(window.getEnd))
}
}
}
4.3、程式詳解
(1)接收socket資料
(2)將每行資料按照字元分隔,每行map成一個tuple型別(code,time)
(3)抽取timestamp生成watermark。並列印(code,time,格式化的time,currentMaxTimestamp,currentMaxTimestamp的格式化時間,watermark時間)。
(4)event time每隔3秒觸發一次視窗,輸出(code,視窗內元素個數,視窗內最早元素的時間,視窗內最晚元素的時間,視窗自身開始時間,視窗自身結束時間)
注意:new AssignerWithPeriodicWatermarks[(String,Long)中有抽取timestamp和生成watermark2個方法,在執行時,它是先抽取timestamp,後生成watermark,因此我們在這裡print的watermark時間,其實是上一條的watermark時間,我們到資料輸出時再解釋。
生成的Job Graph
5、通過資料跟蹤watermark的時間
我們重點看看watermark與timestamp的時間,並通過資料來看看window的觸發時機。
首先,我們開啟socket,輸入第一條資料:
000001,1461756862000
輸出的out檔案如下:
timestamp:000001,1461756862000|2016-04-27 19:34:22.000,1461756862000|2016-04-27 19:34:22.000,Watermark @ -10000
這裡,看下watermark的值,-10000,即0-10000得到的。這就說明程式先執行timestamp,後執行watermark。所以,每條記錄打印出的watermark,都應該是上一條的watermark。為了觀察方便,我彙總了輸出如下:
此時,wartermark的時間按照邏輯,已經落後於currentMaxTimestamp10秒了。我們繼續輸入:
此時,輸出內容如下:
我們再次彙總,見下表:
我們繼續輸入,這時我們再次輸入:
輸出如下:
彙總如下:
到這裡,window仍然沒有被觸發,此時watermark的時間已經等於了第一條資料的Event Time了。那麼window到底什麼時候被觸發呢?我們再次輸入:
輸出:
彙總:
OK,window仍然沒有觸發,此時,我們的資料已經發到2016-04-27 19:34:33.000了,最早的資料已經過去了11秒了,還沒有開始計算。那是不是要等到13(10+3)秒過去了,才開始觸發window呢?答案是否定的。
我們再次增加1秒,輸入:
輸出:
彙總:
到這裡,我們做一個說明:
window的觸發機制,是先按照自然時間將window劃分,如果window大小是3秒,那麼1分鐘內會把window劃分為如下的形式:
[00:00:00,00:00:03)
[00:00:03,00:00:06)
...
[00:00:57,00:01:00)
如果window大小是10秒,則window會被分為如下的形式:
[00:00:00,00:00:10)
[00:00:10,00:00:20)
...
[00:00:50,00:01:00)
window的設定無關資料本身,而是系統定義好了的。
輸入的資料中,根據自身的Event Time,將資料劃分到不同的window中,如果window中有資料,則當watermark時間>=Event Time時,就符合了window觸發的條件了,最終決定window觸發,還是由資料本身的Event Time所屬的window中的window_end_time決定。
上面的測試中,最後一條資料到達後,其水位線已經升至19:34:24秒,正好是最早的一條記錄所在window的window_end_time,所以window就被觸發了。
為了驗證window的觸發機制,我們繼續輸入資料:
輸出:
彙總:
此時,watermark時間雖然已經達到了第二條資料的時間,但是由於其沒有達到第二條資料所在window的結束時間,所以window並沒有被觸發。那麼,第二條資料所在的window時間是:
[19:34:24,19:34:27)
也就是說,我們必須輸入一個19:34:27秒的資料,第二條資料所在的window才會被觸發。我們繼續輸入:
輸出:
彙總:
此時,我們已經看到,window的觸發要符合以下幾個條件:
1、watermark時間 >= window_end_time
2、在[window_start_time,window_end_time)中有資料存在
同時滿足了以上2個條件,window才會觸發。
而且,這裡要強調一點,watermark是一個全域性的值,不是某一個key下的值,所以即使不是同一個key的資料,其warmark也會增加,例如:
輸入:
000002,1461756879000
輸出:
timestamp:000002,1461756879000|2016-04-27 19:34:39.000,1461756879000|2016-04-27 19:34:39.000,Watermark @ 1461756867000
我們看到,currentMaxTimestamp也增加了。
6、watermark+window處理亂序
我們上面的測試,資料都是按照時間順序遞增的,現在,我們輸入一些亂序的(late)資料,看看watermark結合window機制,是如何處理亂序的。
輸入:
輸出:
彙總:
可以看到,雖然我們輸入了一個19:34:31的資料,但是currentMaxTimestamp和watermark都沒變。此時,按照我們上面提到的公式:
1、watermark時間 >= window_end_time
2、在[window_start_time,window_end_time)中有資料存在
watermark時間(19:34:29) < window_end_time(19:34:33),因此不能觸發window。
那如果我們再次輸入一條19:34:43的資料,此時watermark時間會升高到19:34:33,這時的window一定就會觸發了,我們試一試:
輸入:
輸出:
這裡,我麼看到,視窗中有2個數據,19:34:31和19:34:32的,但是沒有19:34:33的資料,原因是視窗是一個前閉後開的區間,19:34:33的資料是屬於[19:34:33,19:34:36)的視窗的。
上邊的結果,已經表明,對於out-of-order的資料,Flink可以通過watermark機制結合window的操作,來處理一定範圍內的亂序資料。那麼對於“遲到”太多的資料,Flink是怎麼處理的呢?
7、late element的處理
我們輸入一個亂序很多的(其實只要Event Time < watermark時間)資料來測試下:
輸入:
輸出:
我們看到,我們輸入的資料是19:34:32的,而當前watermark時間已經來到了19:34:33,Event Time < watermark時間,所以來一條就觸發一個window。
8、總結
8.1、Flink如何處理亂序?
watermark+window機制
window中可以對input進行按照Event Time排序,使得完全按照Event Time發生的順序去處理資料,以達到處理亂序資料的目的。
8.2、Flink何時觸發window?
1、Event Time < watermark時間(對於late element太多的資料而言)
或者
1、watermark時間 >= window_end_time(對於out-of-order以及正常的資料而言)
2、在[window_start_time,window_end_time)中有資料存在
8.3、Flink應該如何設定最大亂序時間?
這個要結合自己的業務以及資料情況去設定。如果maxOutOfOrderness設定的太小,而自身資料傳送時由於網路等原因導致亂序或者late太多,那麼最終的結果就是會有很多單條的資料在window中被觸發,資料的正確性影響太大。
最後,我們通過一張圖來看看watermark、Event Time和window的關係: