1. 程式人生 > >Flink流計算程式設計--watermark(水位線)簡介

Flink流計算程式設計--watermark(水位線)簡介

1、watermark的概念

watermark是一種衡量Event Time進展的機制,它是資料本身的一個隱藏屬性。通常基於Event Time的資料,自身都包含一個timestamp,例如1472693399700(2016-09-01 09:29:59.700),而這條資料的watermark時間則可能是:

watermark(1472693399700) = 1472693396700(2016-09-01 09:29:56.700)

這條資料的watermark時間是什麼含義呢?即:timestamp小於1472693396700(2016-09-01 09:29:56.700)的資料,都已經到達了。

這裡寫圖片描述
圖中藍色虛線和實線代表著watermark的時間。

2、watermark有什麼用?

watermark是用於處理亂序事件的,而正確的處理亂序事件,通常用watermark機制結合window來實現。

我們知道,流處理從事件產生,到流經source,再到operator,中間是有一個過程和時間的。雖然大部分情況下,流到operator的資料都是按照事件產生的時間順序來的,但是也不排除由於網路、背壓等原因,導致亂序的產生(out-of-order或者說late element)。

但是對於late element,我們又不能無限期的等下去,必須要有個機制來保證一個特定的時間後,必須觸發window去進行計算了。這個特別的機制,就是watermark。

3、watermark如何分配?

通常,在接收到source的資料後,應該立刻生成watermark;但是,也可以在source後,應用簡單的map或者filter操作,然後再生成watermark。

生成watermark的方式主要有2大類:

(1):With Periodic Watermarks
(2):With Punctuated Watermarks

第一種可以定義一個最大允許亂序的時間,這種情況應用較多。
我們主要來圍繞Periodic Watermarks來說明,下面是生成periodic watermark的方法:

/**
 * This generator generates watermarks assuming that elements come out of order to a certain degree only.
 * The latest elements for a certain timestamp t will arrive at most n milliseconds after the earliest
 * elements for timestamp t.
 */
class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEvent] { val maxOutOfOrderness = 3500L; // 3.5 seconds var currentMaxTimestamp: Long; override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = { val timestamp = element.getCreationTime() currentMaxTimestamp = max(timestamp, currentMaxTimestamp) timestamp; } override def getCurrentWatermark(): Watermark = { // return the watermark as current highest timestamp minus the out-of-orderness bound new Watermark(currentMaxTimestamp - maxOutOfOrderness); } }

程式中有一個extractTimestamp方法,就是根據資料本身的Event time來獲取;還有一個getCurrentWatermar方法,是用currentMaxTimestamp - maxOutOfOrderness來獲取的。

這裡的概念有點抽象,下面我們結合資料,在window中來實際演示下每個element的timestamp和watermark是多少,以及何時觸發window。

4、生成並跟蹤watermark程式碼

4.1、程式說明
我們從socket接收資料,然後經過map後立刻抽取timetamp並生成watermark,之後應用window來看看watermark和event time如何變化,才導致window被觸發的。
4.2、程式碼如下

import java.text.SimpleDateFormat

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector


object WatermarkTest {

  def main(args: Array[String]): Unit = {
    if (args.length != 2) {
      System.err.println("USAGE:\nSocketWatermarkTest <hostname> <port>")
      return
    }

    val hostName = args(0)
    val port = args(1).toInt

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val input = env.socketTextStream(hostName,port)

    val inputMap = input.map(f=> {
      val arr = f.split("\\W+")
      val code = arr(0)
      val time = arr(1).toLong
      (code,time)
    })

    val watermark = inputMap.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[(String,Long)] {

      var currentMaxTimestamp = 0L
      val maxOutOfOrderness = 10000L//最大允許的亂序時間是10s

      var a : Watermark = null

      val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")

      override def getCurrentWatermark: Watermark = {
        a = new Watermark(currentMaxTimestamp - maxOutOfOrderness)
        a
      }

      override def extractTimestamp(t: (String,Long), l: Long): Long = {
        val timestamp = t._2
        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp)
        println("timestamp:" + t._1 +","+ t._2 + "|" +format.format(t._2) +","+  currentMaxTimestamp + "|"+ format.format(currentMaxTimestamp) + ","+ a.toString)
        timestamp
      }
    })

    val window = watermark
    .keyBy(_._1)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply(new WindowFunctionTest)

    window.print()

    env.execute()
  }

  class WindowFunctionTest extends WindowFunction[(String,Long),(String, Int,String,String,String,String),String,TimeWindow]{

    override def apply(key: String, window: TimeWindow, input: Iterable[(String, Long)], out: Collector[(String, Int,String,String,String,String)]): Unit = {
      val list = input.toList.sortBy(_._2)
      val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
      out.collect(key,input.size,format.format(list.head._2),format.format(list.last._2),format.format(window.getStart),format.format(window.getEnd))
    }

  }


}

4.3、程式詳解
(1)接收socket資料
(2)將每行資料按照字元分隔,每行map成一個tuple型別(code,time)
(3)抽取timestamp生成watermark。並列印(code,time,格式化的time,currentMaxTimestamp,currentMaxTimestamp的格式化時間,watermark時間)。
(4)event time每隔3秒觸發一次視窗,輸出(code,視窗內元素個數,視窗內最早元素的時間,視窗內最晚元素的時間,視窗自身開始時間,視窗自身結束時間)

注意:new AssignerWithPeriodicWatermarks[(String,Long)中有抽取timestamp和生成watermark2個方法,在執行時,它是先抽取timestamp,後生成watermark,因此我們在這裡print的watermark時間,其實是上一條的watermark時間,我們到資料輸出時再解釋。

這裡寫圖片描述
生成的Job Graph

5、通過資料跟蹤watermark的時間

我們重點看看watermark與timestamp的時間,並通過資料來看看window的觸發時機。

首先,我們開啟socket,輸入第一條資料:

000001,1461756862000

輸出的out檔案如下:

timestamp:000001,1461756862000|2016-04-27 19:34:22.000,1461756862000|2016-04-27 19:34:22.000,Watermark @ -10000

這裡,看下watermark的值,-10000,即0-10000得到的。這就說明程式先執行timestamp,後執行watermark。所以,每條記錄打印出的watermark,都應該是上一條的watermark。為了觀察方便,我彙總了輸出如下:
這裡寫圖片描述

此時,wartermark的時間按照邏輯,已經落後於currentMaxTimestamp10秒了。我們繼續輸入:

這裡寫圖片描述
此時,輸出內容如下:
這裡寫圖片描述
我們再次彙總,見下表:
這裡寫圖片描述

我們繼續輸入,這時我們再次輸入:
這裡寫圖片描述
輸出如下:
這裡寫圖片描述
彙總如下:
這裡寫圖片描述

到這裡,window仍然沒有被觸發,此時watermark的時間已經等於了第一條資料的Event Time了。那麼window到底什麼時候被觸發呢?我們再次輸入:
這裡寫圖片描述
輸出:
這裡寫圖片描述
彙總:
這裡寫圖片描述

OK,window仍然沒有觸發,此時,我們的資料已經發到2016-04-27 19:34:33.000了,最早的資料已經過去了11秒了,還沒有開始計算。那是不是要等到13(10+3)秒過去了,才開始觸發window呢?答案是否定的。

我們再次增加1秒,輸入:
這裡寫圖片描述
輸出:
這裡寫圖片描述
彙總:
這裡寫圖片描述

到這裡,我們做一個說明:
window的觸發機制,是先按照自然時間將window劃分,如果window大小是3秒,那麼1分鐘內會把window劃分為如下的形式:

[00:00:00,00:00:03)
[00:00:03,00:00:06)
...
[00:00:57,00:01:00)

如果window大小是10秒,則window會被分為如下的形式:

[00:00:00,00:00:10)
[00:00:10,00:00:20)
...
[00:00:50,00:01:00)

window的設定無關資料本身,而是系統定義好了的。

輸入的資料中,根據自身的Event Time,將資料劃分到不同的window中,如果window中有資料,則當watermark時間>=Event Time時,就符合了window觸發的條件了,最終決定window觸發,還是由資料本身的Event Time所屬的window中的window_end_time決定。

上面的測試中,最後一條資料到達後,其水位線已經升至19:34:24秒,正好是最早的一條記錄所在window的window_end_time,所以window就被觸發了。

為了驗證window的觸發機制,我們繼續輸入資料:
這裡寫圖片描述
輸出:
這裡寫圖片描述
彙總:
這裡寫圖片描述

此時,watermark時間雖然已經達到了第二條資料的時間,但是由於其沒有達到第二條資料所在window的結束時間,所以window並沒有被觸發。那麼,第二條資料所在的window時間是:

[19:34:24,19:34:27)

也就是說,我們必須輸入一個19:34:27秒的資料,第二條資料所在的window才會被觸發。我們繼續輸入:
這裡寫圖片描述
輸出:
這裡寫圖片描述
彙總:
這裡寫圖片描述

此時,我們已經看到,window的觸發要符合以下幾個條件:

1、watermark時間 >= window_end_time
2、在[window_start_time,window_end_time)中有資料存在

同時滿足了以上2個條件,window才會觸發。

而且,這裡要強調一點,watermark是一個全域性的值,不是某一個key下的值,所以即使不是同一個key的資料,其warmark也會增加,例如:
輸入:

000002,1461756879000

輸出:

timestamp:000002,1461756879000|2016-04-27 19:34:39.000,1461756879000|2016-04-27 19:34:39.000,Watermark @ 1461756867000

我們看到,currentMaxTimestamp也增加了。

6、watermark+window處理亂序

我們上面的測試,資料都是按照時間順序遞增的,現在,我們輸入一些亂序的(late)資料,看看watermark結合window機制,是如何處理亂序的。
輸入:
這裡寫圖片描述
輸出:
這裡寫圖片描述
彙總:
這裡寫圖片描述
可以看到,雖然我們輸入了一個19:34:31的資料,但是currentMaxTimestamp和watermark都沒變。此時,按照我們上面提到的公式:

1、watermark時間 >= window_end_time
2、在[window_start_time,window_end_time)中有資料存在

watermark時間(19:34:29) < window_end_time(19:34:33),因此不能觸發window。

那如果我們再次輸入一條19:34:43的資料,此時watermark時間會升高到19:34:33,這時的window一定就會觸發了,我們試一試:
輸入:
這裡寫圖片描述
輸出:
這裡寫圖片描述

這裡,我麼看到,視窗中有2個數據,19:34:31和19:34:32的,但是沒有19:34:33的資料,原因是視窗是一個前閉後開的區間,19:34:33的資料是屬於[19:34:33,19:34:36)的視窗的。

上邊的結果,已經表明,對於out-of-order的資料,Flink可以通過watermark機制結合window的操作,來處理一定範圍內的亂序資料。那麼對於“遲到”太多的資料,Flink是怎麼處理的呢?

7、late element的處理

我們輸入一個亂序很多的(其實只要Event Time < watermark時間)資料來測試下:
輸入:
這裡寫圖片描述
輸出:
這裡寫圖片描述

我們看到,我們輸入的資料是19:34:32的,而當前watermark時間已經來到了19:34:33,Event Time < watermark時間,所以來一條就觸發一個window。

8、總結

8.1、Flink如何處理亂序?

watermark+window機制

window中可以對input進行按照Event Time排序,使得完全按照Event Time發生的順序去處理資料,以達到處理亂序資料的目的。

8.2、Flink何時觸發window?

1、Event Time < watermark時間(對於late element太多的資料而言)

或者

1、watermark時間 >= window_end_time(對於out-of-order以及正常的資料而言)
2、在[window_start_time,window_end_time)中有資料存在

8.3、Flink應該如何設定最大亂序時間?

這個要結合自己的業務以及資料情況去設定。如果maxOutOfOrderness設定的太小,而自身資料傳送時由於網路等原因導致亂序或者late太多,那麼最終的結果就是會有很多單條的資料在window中被觸發,資料的正確性影響太大。

最後,我們通過一張圖來看看watermark、Event Time和window的關係:
這裡寫圖片描述

9、參考