1. 程式人生 > >Flink流計算程式設計--Flink中allowedLateness詳細介紹及思考

Flink流計算程式設計--Flink中allowedLateness詳細介紹及思考

1、簡介

Flink中藉助watermark以及window和trigger來處理基於event time的亂序問題,那麼如何處理“late element”呢?

也許有人會問,out-of-order element與late element有什麼區別?不都是一回事麼?

答案是一回事,都是為了處理亂序問題而產生的概念。要說區別,可以總結如下:

1、通過watermark機制來處理out-of-order的問題,屬於第一層防護,屬於全域性性的防護,通常說的亂序問題的解決辦法,就是指這類;
2、通過視窗上的allowedLateness機制來處理out-of-order的問題,屬於第二層防護,屬於特定window operator
的防護,late element的問題就是指這類。

下面我們重點介紹allowedLateness。

2、allowedLateness介紹

預設情況下,當watermark通過end-of-window之後,再有之前的資料到達時,這些資料會被刪除。

為了避免有些遲到的資料被刪除,因此產生了allowedLateness的概念。

簡單來講,allowedLateness就是針對event time而言,對於watermark超過end-of-window之後,還允許有一段時間(也是以event time來衡量)來等待之前的資料到達,以便再次處理這些資料。

下圖是其API:

這裡寫圖片描述

預設情況下,如果不指定allowedLateness,其值是0,即對於watermark超過end-of-window之後,還有此window的資料到達時,這些資料被刪除掉了。

注意:對於trigger是預設的EventTimeTrigger的情況下,allowedLateness會再次觸發視窗的計算,而之前觸發的資料,會buffer起來,直到watermark超過end-of-window + allowedLateness()的時間,視窗的資料及元資料資訊才會被刪除。再次計算就是DataFlow模型中的Accumulating的情況。

同時,對於sessionWindow的情況,當late element在allowedLateness範圍之內到達時,可能會引起視窗的merge,這樣,之前視窗的資料會在新視窗中累加計算,這就是DataFlow模型中的AccumulatingAndRetracting的情況。

3、allowedLateness的例子

3.1、TumblingEventTime視窗

這裡watermark允許3秒的亂序,allowedLateness允許資料遲到5秒。

import java.text.SimpleDateFormat

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time

/**
  * windowedStream.allowedLateness() test,this is also called window accumulating.
  * allowedLateness will trigger window again when 'late element' arrived to the window 
  */
object TumblingWindowAccumulatingTest {

  def  main(args : Array[String]) : Unit = {

    if (args.length != 2) {
      System.err.println("USAGE:\nSocketTextStreamWordCount <hostname> <port>")
      return
    }

    val hostName = args(0)
    val port = args(1).toInt


    val env = StreamExecutionEnvironment.getExecutionEnvironment //獲取流處理執行環境
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //設定Event Time作為時間屬性
    //env.setBufferTimeout(10)
    //env.enableCheckpointing(5000,CheckpointingMode.EXACTLY_ONCE)

    val input = env.socketTextStream(hostName,port) //socket接收資料

    val inputMap = input.map(f=> {
      val arr = f.split("\\W+")
      val code = arr(0)
      val time = arr(1).toLong
      (code,time)
    })

    /**
      * 允許3秒的亂序
      */
    val watermarkDS = inputMap.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[(String,Long)] {

      var currentMaxTimestamp = 0L
      val maxOutOfOrderness = 3000L

      val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")

      override def getCurrentWatermark: Watermark = {
        new Watermark(currentMaxTimestamp - maxOutOfOrderness)
      }

      override def extractTimestamp(t: (String,Long), l: Long): Long = {
        val timestamp = t._2
        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp)
        timestamp
      }
    })

    /**
      * 對於此視窗而言,允許5秒的遲到資料,即第一次觸發是在watermark > end-of-window時
      * 第二次(或多次)觸發的條件是watermark < end-of-window + allowedLateness時間內,這個視窗有late資料到達
      */
    val accumulatorWindow = watermarkDS
      .keyBy(_._1)
      .window(TumblingEventTimeWindows.of(Time.seconds(10)))
      .allowedLateness(Time.seconds(5))
      .apply(new AccumulatingWindowFunction)
      .name("window accumulate test")
      .setParallelism(2)

    accumulatorWindow.print()


    env.execute()

  }
}

AccumulatingWindowFunction:

主要是計算視窗內的元素個數以及累計個數

import java.text.SimpleDateFormat

import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.scala.function.RichWindowFunction
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector


class AccumulatingWindowFunction extends RichWindowFunction[(String, Long),(String,String,String,Int, String, Int),String,TimeWindow]{

  val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")

  var state: ValueState[Int] = _

  var count = 0

  override def open(config: Configuration): Unit = {
    state = getRuntimeContext.getState(new ValueStateDescriptor[Int]("AccumulatingWindow Test", classOf[Int], 0))
  }

  override def apply(key: String, window: TimeWindow, input: Iterable[(String, Long)], out: Collector[(String, String, String, Int, String, Int)]): Unit = {
    count = state.value() + input.size

    state.update(count)

    // key,window start time, window end time, window size, system time, total size
    out.collect(key, format.format(window.getStart),format.format(window.getEnd),input.size, format.format(System.currentTimeMillis()),count)
  }
}

測試:

這裡寫圖片描述

輸出結果:

這裡寫圖片描述

解釋:

對於key1而言,1487225040000代表2017-02-16 14:04:00.000,10秒鐘的視窗範圍是[2017-02-16 14:04:00.000,2017-02-16 14:04:10.000),因此當watermark超過2017-02-16 14:04:10.000時,視窗會被觸發,由於watermark是允許3秒亂序,因此當標籤時間是1487225053000的資料到來時,視窗被觸發了。我們看到結果中的第一行就是第一次觸發的視窗,視窗內的元素個數是2,累加個數也是2。

由於此視窗還允許5秒的late,因此在watermark < end-of-window + allowedLateness(2017-02-16 14:04:15.000)之內到達的資料,都會被再次觸發視窗的計算。我們看標籤是key1,1487225045000的資料,此時的watermark是2017-02-16 14:04:12.000(1487225055000 - 3000得來),因此符合觸發條件,立刻觸發視窗的計算。也就是我們看到的第二條資料的結果。此時視窗內的元素個數是3,累加個數卻是5!!!

同理,key1,1487225048000這條資料第三次觸發了視窗的計算。視窗內的個數是4,累加個數是9!!!

當key1,1487225058000的資料到來時,此時的watermark是2017-02-16 14:04:15.000,已經超過了這個視窗的觸發範圍(視窗都是前閉後開的區間),因此,這個視窗再有遲到的資料,將直接被刪除。

我們看到key1,1487225049000這條資料到達時,系統沒有任何的反應,即此資料太晚了,被刪除了。

最後的一條資料是key1,1487225063000,其watermark是2017-02-16 14:04:20.000,已經超過了下一個視窗的觸發時間,因此會觸發下一個視窗的計算,結果就是我們看到的最後一條輸出的資料,視窗內的元素個數是4(分別是1487225050000,1487225053000,1487225055000和1487225058000),但是累加個數是13!!!

思考:

對於TumblingEventTime視窗的累加處理,很好的一點是及時更新了最後的結果,但是也有一個令人無法忽視的問題,即再次觸發的視窗,視窗內的UDF狀態錯了!這就是我們要關注的問題,我們要做的是去重操作。

至於這個去重怎麼做,這個就要看UDF中的狀態具體要算什麼內容了,例如本例中的state只是簡單的sum累計值,此時可以在UDF中新增一個hashMap,map中的key就設計為視窗的起始與結束時間,如果map中已經存在這個key,則state.update時,就不要再加上window.size,而是直接加1即可。這裡不再做具體的演示。

附上一張這個測試程式的DAG圖:

這裡寫圖片描述

可以看到,window操作接到10條資料(輸入),觸發了4次window計算(輸出)。而且Flink1.2中對與DAG也增加了一些reason元資料資訊,例如每個邊的型別(Hash或Rebalance)。

3.2、Session Window的視窗(EventTimeSessionWindows)

修改window的型別為SessionWindow並調整windowFunction,去掉state,程式碼如下:

import java.text.SimpleDateFormat

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.watermark.Watermark
import org.apache.flink.streaming.api.windowing.assigners.{EventTimeSessionWindows, TumblingEventTimeWindows}
import org.apache.flink.streaming.api.windowing.time.Time


object SessionWindowAccumulatingAndRetractingTest {

  def  main(args : Array[String]) : Unit = {

    if (args.length != 2) {
      System.err.println("USAGE:\nSocketTextStreamWordCount <hostname> <port>")
      return
    }

    val hostName = args(0)
    val port = args(1).toInt


    val env = StreamExecutionEnvironment.getExecutionEnvironment //獲取流處理執行環境
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) //設定Event Time作為時間屬性
    //env.setBufferTimeout(10)
    //env.enableCheckpointing(5000,CheckpointingMode.EXACTLY_ONCE)

    val input = env.socketTextStream(hostName,port)

    val inputMap = input.map(f=> {
      val arr = f.split("\\W+")
      val code = arr(0)
      val time = arr(1).toLong
      (code,time)
    })

    val watermarkDS = inputMap.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[(String,Long)] {

      var currentMaxTimestamp = 0L
      val maxOutOfOrderness = 3000L

      val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")

      override def getCurrentWatermark: Watermark = {
        new Watermark(currentMaxTimestamp - maxOutOfOrderness)
      }

      override def extractTimestamp(t: (String,Long), l: Long): Long = {
        val timestamp = t._2
        currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp)
        timestamp
      }
    })

    // allow 5 sec for late element after watermark passed the
    val accumulatorWindow = watermarkDS
      .keyBy(_._1)
      .window(EventTimeSessionWindows.withGap(Time.seconds(10)))
      .allowedLateness(Time.seconds(5))
      .apply(new AccumulatingAndRetractingWindowFunction)
      .name("window accumulate test")
      .setParallelism(2)

    accumulatorWindow.print()


    env.execute()

  }
}

AccumulatingAndRetractingWindowFunction:

import java.text.SimpleDateFormat

import org.apache.flink.streaming.api.scala.function.WindowFunction
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector


class AccumulatingAndRetractingWindowFunction extends WindowFunction[(String, Long),(String,String,String,Int, String),String,TimeWindow]{

  val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")

  override def apply(key: String, window: TimeWindow, input: Iterable[(String, Long)], out: Collector[(String, String, String, Int, String)]): Unit = {

    // key,window start time, window end time, window size, system time
    out.collect(key,format.format(window.getStart),format.format(window.getEnd),input.size,format.format(System.currentTimeMillis()))

  }
}

測試:

這裡寫圖片描述

輸出結果:

這裡寫圖片描述

解釋:

sessionWindow由於存在視窗的merge,所以對於late element,並不是來一條就重新觸發一次視窗。

第2條資料的watermark沒有超過第1條資料的end-of-window時間,因此沒有發生gap。

對於第3條資料key1,1487225063000到達時,其watermark是2017-02-16 14:04:20.000,第2條資料的end-of-window的時間,因此第一個視窗觸發了,即我們看到的第一條輸出記錄,其視窗數量是2(前2條輸入資料)。

第4條資料key1,1487225065000,其watermark是2017-02-16 14:04:22.000,因此和上一條資料沒有gap。

第5條資料key1,1487225057000,其watermar依然是2017-02-16 14:04:22.000,但此資料所在的視窗範圍在[2017-02-16 14:04:17.000,2017-02-16 14:04:27.000),屬於第一個視窗。雖然此時第一個視窗已經觸發,但是第一個視窗的結束時間2017-02-16 14:04:20.000 + allowedLateness(5秒) = 2017-02-16 14:04:25.000,而此時的watermark是2017-02-16 14:04:22.000,因此這條資料不能被刪除,而是重新納入到第一個視窗中,被累加起來。注意,此時視窗發生了合併,但是還未到新的視窗的觸發條件。

第6條資料key1,1487225078000到達時,其watermark提高到2017-02-16 14:04:35.000,比此前1487225065000的視窗結束時間(正好是2017-02-16 14:04:35.000)大,因此符合新視窗的觸發條件。
即我們看到的輸出中第2條記錄。此時的視窗範圍是從[2017-02-16 14:04:00.000,2017-02-16 14:04:35.000),其中late element和之前的視窗發生了merge,因此前一條輸出中的視窗時間變了。新視窗中一共包含了5條資料(前5條資料)。

思考:

這個session window late element的例子,並沒有包含UDF的state的內容。但是這裡同樣不能忽視這個問題,假如你的應用程式中包含UDF的state,那麼也要考慮當late element到達且沒有被刪除的情況下,當發生或者不發生視窗merge的情況下,如何處理state的問題。

對於session window而言,allowedLateness的引入會使得部分late element可以被正確處理,同時也會增加state處理複雜度,這裡主要涉及late element可能導致原視窗的merge或者不發生merge。

4、allowedLateness原始碼

此方法位於WindowedStream中,僅僅是對成員變數allowedLateness賦予初值(但不能指定負數值):

    /**
     * Sets the time by which elements are allowed to be late. Elements that
     * arrive behind the watermark by more than the specified time will be dropped.
     * By default, the allowed lateness is {@code 0L}.
     *
     * <p>Setting an allowed lateness is only valid for event-time windows.
     */
    @PublicEvolving
    public WindowedStream<T, K, W> allowedLateness(Time lateness) {
        final long millis = lateness.toMilliseconds();
        checkArgument(millis >= 0, "The allowed lateness cannot be negative.");

        this.allowedLateness = millis;
        return this;
    }

我們再來看下windowedStream之上執行的apply方法:

    /**
     * Applies the given window function to each window. The window function is called for each
     * evaluation of the window for each key individually. The output of the window function is
     * interpreted as a regular non-windowed stream.
     *
     * <p>
     * Not that this function requires that all data in the windows is buffered until the window
     * is evaluated, as the function provides no means of incremental aggregation.
     * 
     * @param function The window function.
     * @return The data stream that is the result of applying the window function to the window.
     */
    public <R> SingleOutputStreamOperator<R> apply(WindowFunction<T, R, K, W> function) {
        TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
                function, WindowFunction.class, true, true, getInputType(), null, false);//拿到Window的型別(Tumbling、Sliding或Session)以及輸入型別T等資訊

        return apply(function, resultType);
    }

返回apply(function, resultType)方法:

    /**
     * Applies the given window function to each window. The window function is called for each
     * evaluation of the window for each key individually. The output of the window function is
     * interpreted as a regular non-windowed stream.
     *
     * <p>
     * Note that this function requires that all data in the windows is buffered until the window
     * is evaluated, as the function provides no means of incremental aggregation.
     *
     * @param function The window function.
     * @param resultType Type information for the result type of the window function
     * @return The data stream that is the result of applying the window function to the window.
     */
    public <R> SingleOutputStreamOperator<R> apply(WindowFunction<T, R, K, W> function, TypeInformation<R> resultType) {

        //clean the closure
        function = input.getExecutionEnvironment().clean(function);

        String callLocation = Utils.getCallLocationName();
        String udfName = "WindowedStream." + callLocation;

        SingleOutputStreamOperator<R> result = createFastTimeOperatorIfValid(function, resultType, udfName);
        if (result != null) {
            return result;
        }

        LegacyWindowOperatorType legacyWindowOpType = getLegacyWindowType(function);//根據WindowAssigner判斷windowOperator的型別,用於儲存window的狀態
        String opName;
        KeySelector<T, K> keySel = input.getKeySelector(); //根據keyedStream獲取key

        WindowOperator<K, T, Iterable<T>, R, W> operator;

        if (evictor != null) {
            @SuppressWarnings({"unchecked", "rawtypes"})
            TypeSerializer<StreamRecord<T>> streamRecordSerializer =
                    (TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.getType().createSerializer(getExecutionEnvironment().getConfig()));

            ListStateDescriptor<StreamRecord<T>> stateDesc =
                    new ListStateDescriptor<>("window-contents", streamRecordSerializer);

            opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")";

            operator =
                new EvictingWindowOperator<>(windowAssigner,
                    windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
                    keySel,
                    input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
                    stateDesc,
                    new InternalIterableWindowFunction<>(function),
                    trigger,
                    evictor,
                    allowedLateness);

        } else {
            // 視窗中的ListState,用於在視窗觸發前,buffer視窗中的元素
            ListStateDescriptor<T> stateDesc = new ListStateDescriptor<>("window-contents",
                input.getType().createSerializer(getExecutionEnvironment().getConfig()));

            opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")"; // 拼接唯一的operator名字,記錄標誌

            operator =
                new WindowOperator<>(windowAssigner,
                    windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
                    keySel,
                    input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
                    stateDesc,
                    new InternalIterableWindowFunction<>(function),
                    trigger,
                    allowedLateness,
                    legacyWindowOpType);
        }

        return input.transform(opName, resultType, operator);//根據operator name,視窗函式的型別,以及window operator,執行keyedStream.transaform操作
    }

我們看到,最終是生成了WindowOperator,初始化時包含了trigger以及allowedLateness的值。然後經過transform轉換,實際上是執行了DataStream中的transform方法。

其中,WindowOperator中有一個方法onEventTime,其中,此方法中判斷當element的watermark時間超過視窗的結束時間+allowedLateness時,就會清空window中的快取的所有資料,這裡不再贅述。

5、總結

Flink中處理亂序依賴watermark+window+trigger,屬於全域性性的處理;

同時,對於window而言,還提供了allowedLateness方法,使得更大限度的允許亂序,屬於區域性性的處理;

其中,allowedLateness只針對Event Time有效;

allowedLateness可用於TumblingEventTimeWindow、SlidingEventTimeWindow以及EventTimeSessionWindows,要注意這可能使得視窗再次被觸發,相當於對前一次視窗的視窗的修正(累加計算或者累加撤回計算);

要注意再次觸發視窗時,UDF中的狀態值的處理,要考慮state在計算時的去重問題。

最後要注意的問題,就是sink的問題,由於同一個key的同一個window可能被sink多次,因此sink的資料庫要能夠接收此類資料。