1. 程式人生 > >Flink 的Window 操作(基於flink 1.3描述)

Flink 的Window 操作(基於flink 1.3描述)

Window是無限資料流處理的核心,Window將一個無限的stream拆分成有限大小的”buckets”桶,我們可以在這些桶上做計算操作。本文主要聚焦於在Flink中如何進行視窗操作,以及程式設計師如何從window提供的功能中獲得最大的收益。
  視窗化的Flink程式的一般結構如下,第一個程式碼段中是分組的流,而第二段是非分組的流。正如我們所見,唯一的區別是分組的stream呼叫keyBy(…)window(…),而非分組的stream中window()換成了windowAll(…),這些也將貫穿都這一頁的其他部分中。

Keyed Windows
stream.keyBy(...)          <-  keyed versus non-keyed windows
       .window(...)         <-  required: "assigner"
      [.trigger(...)]       <-  optional: "trigger" (else default trigger)
      [.evictor(...)]       <-  optional: "evictor" (else no evictor)
      [.allowedLateness()]  <-  optional, else zero
       .reduce/fold/apply() <-  required: "function"
Non-Keyed Windows
stream.windowAll(...)      <-  required: "assigner"
      [.trigger(...)]       <-  optional: "trigger" (else default trigger)
      [.evictor(...)]       <-  optional: "evictor" (else no evictor)
      [.allowedLateness()]  <-  optional, else zero
       .reduce/fold/apply() <-  required: "function"

在上面的例子中,方括號[]內的命令是可選的,這表明Flink允許你根據最符合你的要求來定義自己的window邏輯。

Window 的生命週期

簡單地說,當一個屬於window的元素到達之後這個window就建立了,而噹噹前時間(事件或者處理時間)為window的建立時間跟使用者指定的延遲時間相加時,視窗將被徹底清除。Flink 確保了只清除基於時間的window,其他型別的window不清除,例如:全域性window(詳情:https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#window-assigners

) 。例如:對於一個每5分鐘建立無覆蓋的(即 翻滾視窗)視窗,允許一個1分鐘的時延的視窗策略,Flink將會在12:00到12:05這段時間內第一個元素到達時建立視窗,當水印通過12:06時,移除這個視窗。
  此外,每個 Window 都有一個Trigger(觸發器,詳情請見:https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#triggers) 和一個附屬於 Window 的函式(例如: WindowFunction, ReduceFunctionFoldFunction),詳情請見:https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#window-functions 。函式裡包含了應用於視窗(Window)內容的計算,而Trigger(觸發器)則指定了函式在什麼條件下可被應用(函式何時被觸發),一個觸發策略可以是 "當視窗中的元素個數超過4個時" 或者 "當水印達到視窗的邊界時"。觸發器還可以決定在視窗建立和刪除之間的任意時刻清除視窗的內容,本例中的清除僅指清除視窗的內容而不是視窗的元資料,也就是說新的資料還是可以被新增到當前的window中。
  除了上面的提到之外,你還可以指定一個驅逐者(Evictor,詳情請見:https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#evictors ), Evictor
將在觸發器觸發之後或者在函式被應用之前或者之後,清楚視窗中的元素。
  接下來我們將更深入的去了解上述的部件,我們從上述片段的主要部分開始(如:Keyed vs Non-Keyed Windows, Window Assigner, 及 Window Function),然後是可選部分。

分組和非分組Windows (Keyed vs Non-Keyed Windows)

首先,第一件事是指定你的資料流是分組的還是未分組的,這個必須在定義 window 之前指定好。使用 keyBy(...) 會將你的無限資料流拆分成邏輯分組的資料流,如果 keyBy(...) 函式不被呼叫的話,你的資料流將不是分組的。
  在分組資料流中,任何正在傳入的事件的屬性都可以被當做key(更多詳情請見:https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/api_concepts.html#specifying-keys ),分組資料流將你的window計算通過多工併發執行,以為每一個邏輯分組流在執行中與其他的邏輯分組流是獨立地進行的。
  在非分組資料流中,你的原始資料流並不會拆分成多個邏輯流並且所有的window邏輯將在一個任務中執行,併發度為1。

視窗分配器(Window Assingers)

指定完你的資料流是分組的還是非分組的之後,接下來你需要定義一個視窗分配器(window assigner),視窗分配器定義了元素如何分配到視窗中,這是通過在分組資料流中呼叫window(...)或者非分組資料流中呼叫windowAll(...)時你選擇的視窗分配器(WindowAssigner)來指定的。WindowAssigner是負責將每一個到來的元素分配給一個或者多個視窗(window),Flink 提供了一些常用的預定義視窗分配器,即:滾動視窗、滑動視窗、會話視窗和全域性視窗。你也可以通過繼承WindowAssigner類來自定義自己的視窗。所有的內建視窗分配器(除了全域性視窗 global window)都是通過時間來分配元素到視窗中的,這個時間要麼是處理的時間,要麼是事件發生的時間。請看一下我們的 event time (https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/event_time.html )部分來了解更多處理時間和事件時間的區別及時間戳(timestamp)和水印(watermark)是如何產生的。
  接下來我們將展示Flink的預定義視窗分配器是如何工作的,以及它們在DataStream程式中是如何使用的。接下來我們將展示Flink的預定義視窗分配器是如何工作的,以及它們在DataStream程式中是如何使用的。下圖中展示了每個分配器是如何工作的,紫色圓圈代表著資料流中的一個元素,這些元素是通過一些key進行分割槽(在本例中是 user1,user2,user3), X軸顯示的是時間進度。

滾動視窗

滾動視窗分配器將每個元素分配的一個指定視窗大小的視窗中,滾動視窗有一個固定的大小,並且不會出現重疊。例如:如果你指定了一個5分鐘大小的滾動視窗,當前視窗將被評估並將按下圖說明每5分鐘建立一個新的視窗。
![滾動視窗][https://ci.apache.org/projects/flink/flink-docs-release-1.3/fig/tumbling-windows.svg ]
下面的程式碼片段展示瞭如何使用滾動視窗。

Java 程式碼
DataStream<T> input = ...;

滾動事件時間視窗( tumbling event-time windows )

input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>); 

滾動處理時間視窗(tumbling processing-time windows)

input
    .keyBy(<key selector>)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>);

每日偏移8小時的滾動事件時間視窗(daily tumbling event-time windows offset by -8 hours. )

input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
    .<windowed transformation>(<window function>);
Scala 程式碼:
val input:DataStream[T] =  

滾動事件時間視窗(tumbling event-time windows)

input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>)

滾動處理時間視窗(tumbling processing-time windows)

input
    .keyBy(<key selector>)
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .<windowed transformation>(<window function>)

每日偏移8小時的滾動事件時間視窗(daily tumbling event-time windows offset by -8 hours. )

input
    .keyBy(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
    .<windowed transformation>(<window function>)

時間間隔可以通過Time.milliseconds(x)Time.seconds(x)Time.minutes(x)等其中的一個來指定。
在上面最後的例子中,滾動視窗分配器還接受了一個可選的偏移引數,可以用來改變視窗的排列。例如,沒有偏移的話按小時的滾動視窗將按時間紀元來對齊,也就是說你將一個如: 1:00:00.000~1:59:59.999,2:00:00.000~2:59:59.999等,如果你想改變一下,你可以指定一個偏移,如果你指定了一個15分鐘的偏移,你將得到1:15:00.000~2:14:59.999,2:15:00.000~3:14:59.999等。時間偏移一個很大的用處是用來調準非0時區的視窗,例如:在中國你需要指定一個8小時的時間偏移。

滑動視窗(Sliding Windows)

滑動視窗分配器將元素分配到固定長度的視窗中,與滾動視窗類似,視窗的大小由視窗大小引數來配置,另一個視窗滑動引數控制滑動視窗開始的頻率。因此,滑動視窗如果滑動引數小於滾動引數的話,視窗是可以重疊的,在這種情況下元素會被分配到多個視窗中。
  例如,你有10分鐘的視窗和5分鐘的滑動,那麼每個視窗中5分鐘的窗口裡包含著上個10分鐘產生的資料,如下圖所示:
![][https://ci.apache.org/projects/flink/flink-docs-release-1.3/fig/sliding-windows.svg]
下面的程式碼片段中展示瞭如何使用滑動視窗:

Java 程式碼:
DataStream<T> input = ...;

滑動事件時間視窗

input
    .keyBy(<key selector>)
    .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .<windowed transformation>(<window function>);

滑動處理時間視窗

input
    .keyBy(<key selector>)
    .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .<windowed transformation>(<window function>);

//偏移8小時的滑動處理時間視窗(sliding processing-time windows offset by -8 hours)

input
    .keyBy(<key selector>)
    .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
    .<windowed transformation>(<window function>);

Scala 程式碼:

val input: DataStream[T] = ...

// 滑動事件時間視窗(sliding event-time windows)

input
    .keyBy(<key selector>)
    .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .<windowed transformation>(<window function>)

//滑動處理時間視窗(sliding processing-time windows)

input
    .keyBy(<key selector>)
    .window(SlidingProcessingTimeWindows.of(Time.seconds(10), Time.seconds(5)))
    .<windowed transformation>(<window function>)

// 偏移8小時的滑動處理時間視窗(sliding processing-time windows offset by -8 hours)

input
    .keyBy(<key selector>)
    .window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
    .<windowed transformation>(<window function>)

時間間隔可以通過Time.milliseconds(x),Time.seconds(x),Time.minutes(x)等來指定。
  正如上述例子所示,滑動視窗分配器也有一個可選的偏移引數來改變視窗的對齊。例如,沒有偏移引數,按小時的視窗,有30分鐘的滑動,將根據時間紀元來對齊,也就是說你將得到如下的視窗1:00:00.001:59:59.999,1:30:00.0002:29:59.999等。而如果你想改變視窗的對齊,你可以給定一個偏移,如果給定一個15分鐘的偏移,你將得到如下的視窗:1:15:00.000~2:14.59.999, 1:45:00.000~2:44:59.999等。時間偏移一個很大的用處是用來調準非0時區的視窗,例如:在中國你需要指定一個8小時的時間偏移。

會話視窗(Session Windows)

session視窗分配器通過session活動來對元素進行分組,session視窗跟滾動視窗和滑動視窗相比,不會有重疊和固定的開始時間和結束時間的情況。相反,當它在一個固定的時間週期內不再收到元素,即非活動間隔產生,那個這個視窗就會關閉。一個session視窗通過一個session間隔來配置,這個session間隔定義了非活躍週期的長度。當這個非活躍週期產生,那麼當前的session將關閉並且後續的元素將被分配到新的session視窗中去。

下面的程式碼片段中展示瞭如何使用session視窗
Java程式碼:

DataStream<T> input = ...;

// 事件時間會話視窗(event-time session windows)

input
    .keyBy(<key selector>)
    .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
    .<windowed transformation>(<window function>);

// 處理時間會話視窗(processing-time session windows)

input
    .keyBy(<key selector>)
    .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
    .<windowed transformation>(<window function>);

Scala程式碼:

val input: DataStream[T] = ...

// 事件時間會話視窗(event-time session windows)

input
    .keyBy(<key selector>)
    .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
    .<windowed transformation>(<window function>)

// 處理時間會話視窗(processing-time session windows)

input
    .keyBy(<key selector>)
    .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
    .<windowed transformation>(<window function>)

時間間隔可以通過Time.milliseconds(x),Time.seconds(x),Time.minutes(x)等來指定。
注意: 因為session看視窗沒有一個固定的開始和結束,他們的評估與滑動視窗和滾動視窗不同。在內部,session操作為每一個到達的元素建立一個新的視窗,併合並間隔時間小於指定非活動間隔的視窗。為了進行合併,session視窗的操作需要指定一個合併觸發器(Trigger)和一個合併視窗函式(Window Function),如:ReduceFunction或者WindowFunction(FoldFunction不能合併)。

全域性視窗(Global Windows)

全域性視窗分配器將所有具有相同key的元素分配到同一個全域性視窗中,這個視窗模式僅適用於使用者還需自定義觸發器的情況。否則,由於全域性視窗沒有一個自然的結尾,無法執行元素的聚合,將不會有計算被執行。
![全域性視窗][https://ci.apache.org/projects/flink/flink-docs-release-1.3/fig/non-windowed.svg]
下面的程式碼片段展示瞭如何使用全域性視窗:
Java 程式碼:

DataStream<T> input = ...;
input
    .keyBy(<key selector>)
    .window(GlobalWindows.create())
    .<windowed transformation>(<window function>);

Scala程式碼:

val input: DataStream[T] = ...
input
    .keyBy(<key selector>)
    .window(GlobalWindows.create())
    .<windowed transformation>(<window function>)

視窗函式(Window Functions)

定義完視窗分配器後,我們還需要為每一個視窗指定我們需要執行的計算,這是視窗的責任,當系統決定一個視窗已經準備好執行之後,這個視窗函式將被用來處理視窗中的每一個元素(可能是分組的)。請參考:https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/windows.html#triggers 來了解當一個視窗準備好之後,Flink是如何決定的。
  window函式可以是ReduceFunction, FoldFunction 或者 WindowFunction 中的一個。前面兩個更高效一些(),因為在每個視窗中增量地對每一個到達的元素執行聚合操作。一個 WindowFunction 可以獲取一個視窗中的所有元素的一個迭代以及哪個元素屬於哪個視窗的額外元資訊。
  有WindowFunction的視窗化操作會比其他的操作效率要差一些,因為Flink內部在呼叫函式之前會將視窗中的所有元素都快取起來。這個可以通過WindowFunctionReduceFunction或者FoldFunction結合使用來獲取視窗中所有元素的增量聚合和WindowFunction接收的額外的視窗元資料,接下來我們將看一看每一種變體的示例。

ReduceFunction

ReduceFunction指定了如何通過兩個輸入的引數進行合併輸出一個同類型的引數的過程,Flink使用ReduceFunction來對視窗中的元素進行增量聚合。
  一個ReduceFunction 可以通過如下的方式來定義和使用:
Java 程式碼:

DataStream<Tuple2<String, Long>> input = ...;
 input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .reduce(new ReduceFunction<Tuple2<String, Long>> {
      public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) {
        return new Tuple2<>(v1.f0, v1.f1 + v2.f1);
      }
    });

Scala 程式碼:

val input: DataStream[(String, Long)] = ...
 input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .reduce { (v1, v2) => (v1._1, v1._2 + v2._2) }

上面的例子是將視窗所有元素中元組的第二個屬性進行累加操作。

FoldFunction

FoldFunction 指定了一個輸入元素如何與一個輸出型別的元素合併的過程,這個FoldFunction 會被每一個加入到視窗中的元素和當前的輸出值增量地呼叫,第一個元素是與一個預定義的型別為輸出型別的初始值合併。
  一個FoldFunction可以通過如下的方式定義和呼叫:
Java 程式碼:

DataStream<Tuple2<String, Long>> input = ...;
 input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .fold("", new FoldFunction<Tuple2<String, Long>, String>> {
       public String fold(String acc, Tuple2<String, Long> value) {
         return acc + value.f1;
       }
    });

Scala 程式碼:

 val input: DataStream[(String, Long)] = ...
 input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .fold("") { (acc, v) => acc + v._2 }

上面例子追加所有輸入的長整型到一個空的字串中。
注意 fold()不能應用於回話視窗或者其他可合併的視窗中。

視窗函式 —— 一般用法(WindowFunction - The Generic Case)

一個WindowFunction將獲得一個包含了window中的所有元素迭代(Iterable),並且提供所有視窗函式的最大靈活性。這些帶來了效能的成本和資源的消耗,因為window中的元素無法進行增量迭代,而是快取起來直到window被認為是可以處理時為止。
WindowFunction的使用說明如下:
Java 程式碼:

public interface WindowFunction<IN, OUT, KEY, W extends Window> extends Function, Serializable {
  /**
   // Evaluates the window and outputs none or several elements.
   // @param key The key for which this window is evaluated.
  // @param window The window that is being evaluated.
   // @param input The elements in the window being evaluated.
   // @param out A collector for emitting elements.
  // @throws Exception The function may throw exceptions to fail the program and trigger recovery.
  */
  void apply(KEY key, W window, Iterable<IN> input, Collector<OUT> out) throws Exception;
}

Scala 程式碼:

trait WindowFunction[IN, OUT, KEY, W <: Window] extends Function with Serializable {
  /**
    // Evaluates the window and outputs none or several elements.
    //
    // @param key    The key for which this window is evaluated.
    // @param window The window that is being evaluated.
    // @param input  The elements in the window being evaluated.
    // @param out    A collector for emitting elements.
    // @throws Exception The function may throw exceptions to fail the program and trigger recovery.
    */
  def apply(key: KEY, window: W, input: Iterable[IN], out: Collector[OUT])
}

一個WindowFunction可以按如下方式來定義和使用:
Java 程式碼:

DataStream<Tuple2<String, Long>> input = ...;
 input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .apply(new MyWindowFunction());
/* ... */
public class MyWindowFunction implements WindowFunction<Tuple<String, Long>, String, String, TimeWindow> {
  void apply(String key, TimeWindow window, Iterable<Tuple<String, Long>> input, Collector<String> out) {
    long count = 0;
    for (Tuple<String, Long> in: input) {
      count++;
    }
    out.collect("Window: " + window + "count: " + count);
  }
}

Scala 程式碼:

val input: DataStream[(String, Long)] = ...
input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .apply(new MyWindowFunction())
/* ... */
class MyWindowFunction extends WindowFunction[(String, Long), String, String, TimeWindow] {
  def apply(key: String, window: TimeWindow, input: Iterable[(String, Long)], out: Collector[String]): () = {
    var count = 0L
    for (in <- input) {
      count = count + 1
    }
    out.collect(s"Window $window count: $count")
  }
}

上面的例子展示了統計一個window中元素個數的WindowFunction,此外,還將window的資訊新增到輸出中。
注意:使用WindowFunction來做簡單的聚合操作如計數操作,效能是相當差的。下一章節我們將展示如何將ReduceFunctionWindowFunction結合起來,來獲取增量聚合和新增到WindowFunction中的資訊。

ProcessWindowFunction

在使用WindowFunction的地方你也可以用ProcessWindowFunction,這跟WindowFunction很類似,除了介面允許查詢跟多關於context的資訊,contextwindow評估發生的地方。
下面是ProcessWindowFunction的介面:
Java 程式碼:

public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> implements Function {
    /**
     // Evaluates the window and outputs none or several elements.
     //
     // @param key The key for which this window is evaluated.
     // @param context The context in which the window is being evaluated.
     // @param elements The elements in the window being evaluated.
     // @param out A collector for emitting elements.
     //
     // @throws Exception The function may throw exceptions to fail the program and trigger recovery.
     */
    public abstract void process(
            KEY key,
            Context context,
            Iterable<IN> elements,
            Collector<OUT> out) throws Exception;
    /**
     // The context holding window metadata
     */
    public abstract class Context {
        /**
         // @return The window that is being evaluated.
         */
        public abstract W window();
    }
}

Scala 程式碼:

abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] extends Function {
  /**
    // Evaluates the window and outputs none or several elements.
    //
    // @param key      The key for which this window is evaluated.
    // @param context  The context in which the window is being evaluated.
    // @param elements The elements in the window being evaluated.
    // @param out      A collector for emitting elements.
    // @throws Exception The function may throw exceptions to fail the program and trigger recovery.
    */
  @throws[Exception]
  def process(
      key: KEY,
      context: Context,
      elements: Iterable[IN],
      out: Collector[OUT])
  /**
    // The context holding window metadata
    */
  abstract class Context {
    /**
      // @return The window that is being evaluated.
      */
    def window: W
  }
}

ProcessWindowFunction可以通過如下方式呼叫:
Java 程式碼:

DataStream<Tuple2<String, Long>> input = ...;
 input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .process(new MyProcessWindowFunction());`
Scala 程式碼:
`val input: DataStream[(String, Long)] = ...
 input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .process(new MyProcessWindowFunction())

有增量聚合功能的WindowFunction (WindowFunction with Incremental Aggregation)

WindowFunction可以跟ReduceFunction或者FoldFunction結合來增量地對到達window中的元素進行聚合,當window關閉之後,WindowFunction就能提供聚合結果。當獲取到WindowFunction額外的window元資訊後就可以進行增量計算視窗了。
標註:你也可以使用ProcessWindowFunction替換WindowFunction來進行增量視窗聚合。

使用FoldFunction 進行增量視窗聚合(Incremental Window Aggregation with FoldFunction)

下面的例子展示了一個增量的FoldFunction如何跟一個WindowFunction結合,來獲取視窗的事件數,並同時返回視窗的key和視窗的最後時間。
Java 程式碼:

DataStream<SensorReading> input = ...;
input
  .keyBy(<key selector>)
  .timeWindow(<window assigner>)
  .fold(new Tuple3<String, Long, Integer>("",0L, 0), new MyFoldFunction(), new MyWindowFunction())
// Function definitions
private static class MyFoldFunction
    implements FoldFunction<SensorReading, Tuple3<String, Long, Integer> > {
  public Tuple3<String, Long, Integer> fold(Tuple3<String, Long, Integer> acc, SensorReading s) {
      Integer cur = acc.getField(2);
      acc.setField(2, cur + 1);
      return acc;
  }
}
private static class MyWindowFunction
    implements WindowFunction<Tuple3<String, Long, Integer>, Tuple3<String, Long, Integer>, String, TimeWindow> {
  public void apply(String key,
                    TimeWindow window,
                    Iterable<Tuple3<String, Long, Integer>> counts,
                    Collector<Tuple3<String, Long, Integer>> out) {
    Integer count = counts.iterator().next().getField(2);
    out.collect(new Tuple3<String, Long, Integer>(key, window.getEnd(),count));
  }
}

Scala 程式碼:

val input: DataStream[SensorReading] = ...
 input
 .keyBy(<key selector>)
 .timeWindow(<window assigner>)
 .fold (
    ("", 0L, 0),
    (acc: (String, Long, Int), r: SensorReading) => { ("", 0L, acc._3 + 1) },
    ( key: String,
      window: TimeWindow,
      counts: Iterable[(String, Long, Int)],
      out: Collector[(String, Long, Int)] ) =>
      {
        val count = counts.iterator.next()
        out.collect((key, window.getEnd, count._3))
      }
  )

使用ReduceFunction進行增量視窗聚合(Incremental Window Aggregation with ReduceFunction)

下面例子展示了一個增量額ReduceFunction如何跟一個WindowFunction結合,來獲取視窗中最小的事件和視窗的開始時間。
Java 程式碼:

DataStream<SensorReading> input = ...;
input
  .keyBy(<key selector>)
  .timeWindow(<window assigner>)
  .reduce(new MyReduceFunction(), new MyWindowFunction());
// Function definitions
private static class MyReduceFunction implements ReduceFunction<SensorReading> {
  public SensorReading reduce(SensorReading r1, SensorReading r2) {
      return r1.value() > r2.value() ? r2 : r1;
  }
}
private static class MyWindowFunction
    implements WindowFunction<SensorReading, Tuple2<Long, SensorReading>, String, TimeWindow> {
  public void apply(String key,
                    TimeWindow window,
                    Iterable<SensorReading> minReadings,
                    Collector<Tuple2<Long, SensorReading>> out) {
      SensorReading min = minReadings.iterator().next();
      out.collect(new Tuple2<Long, SensorReading>(window.getStart(), min));
  }
}

Scala 程式碼:

val input: DataStream[SensorReading] = ...
 input
  .keyBy(<key selector>)
  .timeWindow(<window assigner>)
  .reduce(
    (r1: SensorReading, r2: SensorReading) => { if (r1.value > r2.value) r2 else r1 },
    ( key: String,
      window: TimeWindow,
      minReadings: Iterable[SensorReading],
      out: Collector[(Long, SensorReading)] ) =>
      {
        val min = minReadings.iterator.next()
        out.collect((window.getStart, min))
      }
  )

觸發器(Triggers)

觸發器決定了一個視窗何時可以被視窗函式處理,每一個視窗分配器都有一個預設的觸發器,如果預設的觸發器不能滿足你的需要,你可以通過呼叫trigger(...)來指定一個自定義的觸發器。觸發器的介面有5個方法來允許觸發器處理不同的事件:
  *onElement()方法,每個元素被新增到視窗時呼叫
  *onEventTime()方法,當一個已註冊的事件時間計時器啟動時呼叫
  *onProcessingTime()方法,當一個已註冊的處理時間計時器啟動時呼叫
  *onMerge()方法,與狀態性觸發器相關,當使用會話視窗時,兩個觸發器對應的視窗合併時,合併兩個觸發器的狀態。
  *最後一個clear()方法執行任何需要清除的相應視窗
上面的方法中有兩個需要注意的地方:
1)第一、三通過返回一個TriggerResult來決定如何操作呼叫他們的事件,這些操作可以是下面操作中的一個;
CONTINUE:什麼也不做
FIRE:觸發計算
PURGE:清除視窗中的資料
FIRE_AND_PURGE:觸發計算並清除視窗中的資料
2)這些函式可以被用來為後續的操作註冊處理時間定時器或者事件時間計時器

觸發和清除(Fire and Purge)

一旦一個觸發器決定一個視窗已經準備好進行處理,它將觸發並返回FIRE或者FIRE_AND_PURGE。這是視窗操作傳送當前視窗結果的訊號,給定一個擁有一個WindowFunction的視窗那麼所有的元素都將傳送到WindowFunction中(可能之後還會發送到驅逐器(Evitor)中)。有ReduceFunction或者FoldFunctionWindow僅僅傳送他們的急切聚合結果。
  當一個觸發器觸發時,它可以是FIRE或者FIRE_AND_PURGE,如果是FIRE的話,將保持window中的內容,FIRE_AND_PURGE的話,會清除window的內容。預設情況下,預實現的觸發器僅僅是FIRE,不會清除window的狀態。
注意:清除操作僅清除window的內容,並留下潛在的視窗元資訊和完整的觸發器狀態。

視窗分配器預設的觸發器(Default Triggers of WindowAssigners)

預設的觸發器適用於許多種情況,例如:所有的事件時間分配器都有一個EventTimeTrigger作為預設的觸發器,這個觸發器僅在當水印通過視窗的最後時間時觸發。
注意:GlobalWindow預設的觸發器是NeverTrigger,是永遠不會觸發的,因此,如果你使用的是GlobalWindow的話,你需要定義一個自定義觸發器。
注意:通過呼叫trigger(...)來指定一個觸發器你就重寫了WindowAssigner的預設觸發器。例如:如果你為TumblingEventTimeWindows指定了一個CountTrigger,你就不會再通過時間來獲取觸發了,而是通過計數。現在,如果你想通過時間和計數來觸發的話,你需要寫你自己自定義的觸發器。

內建的和自定義的觸發器(Build-in and Custom Triggers)

Flink有一些內建的觸發器:
  *EventTimeTrigger(前面提到過)觸發是根據由水印衡量的事件時間的進度來的
  *ProcessingTimeTrigger 根據處理時間來觸發
  *CountTrigger 一旦視窗中的元素個數超出了給定的限制就會觸發
  *PurgingTrigger 作為另一個觸發器的引數並將它轉換成一個清除型別
如果你想實現一個自定義的觸發器,你需要檢視一下這個抽象類Trigger(https://github.com/apache/flink/blob/master//flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java ),請注意,這個API還在優化中,後續的Flink版本可能會改變。

驅逐器(Evictors)

Flink的視窗模型允許指定一個除了WindowAssignerTrigger之外的可選引數Evitor,這個可以通過呼叫evitor(...)方法(在這篇文件的開頭展示過)來實現。這個驅逐器(evitor)可以在觸發器觸發之前或者之後,或者視窗函式被應用之前清理視窗中的元素。為了達到這個目的,Evitor介面有兩個方法:

/**
 // Optionally evicts elements. Called before windowing function.
 //
 // @param elements The elements currently in the pane.
 // @param size The current number of elements in the pane.
 // @param window The {@link Window}
 // @param evictorContext The context for the Evictor
 ///
void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);
/**
 // Optionally evicts elements. Called after windowing function.
 //
 // @param elements The elements currently in the pane.
 // @param size The current number of elements in the pane.
 // @param window The {@link Window}
 // @param evictorContext The context for the Evictor
 */
void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext evictorContext);

evitorBefore()方法包含了在window function之前被應用的驅逐邏輯,而evitorAfter()方法包含了在window function之後被應用的驅逐邏輯。在window function應用之前被驅逐的元素將不會再被window function處理。
Flink有三個預實現的驅逐器,他們是:
  CountEvitor:在視窗中保持一個使用者指定數量的元素,並在視窗的開始處丟棄剩餘的其他元素
  DeltaEvitor: 通過一個DeltaFunction和一個閾值,計算視窗快取中最近的一個元素和剩餘的所有元素的delta值,並清除delta值大於或者等於閾值的元素
  TimeEvitor:使用一個interval的毫秒數作為引數,對於一個給定的視窗,它會找出元素中的最大時間戳max_ts,並清除時間戳小於max_tx - interval的元素。
預設情況下:所有預實現的evitor都是在window function前應用它們的邏輯
注意:指定一個Evitor要防止預聚合,因為視窗中的所有元素必須得在計算之前傳遞到驅逐器中
注意:Flink 並不保證視窗中的元素是有序的,所以驅逐器可能從視窗的開始處清除,元素到達的先後不是那麼必要。

允許延遲(Allowed Lateness)

當處理事件時間的window時,可能會出現元素到達晚了,Flink用來與事件時間聯絡的水印已經過了元素所屬的視窗的最後時間。可以檢視事件時間(event time https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/event_time.html )尤其是晚到元素(late elements https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/event_time.html#late-elements )來了解Flink如何處理事件時間的討論。
  預設情況下,當水印已經過了視窗的最後時間時晚到的元素會被丟棄。然而,Flink允許為視窗操作指定一個最大允許時延,允許時延指定了元素可以晚到多長時間,預設情況下是0。水印已經過了視窗最後時間後才來的元素,如果還未到視窗最後時間加時延時間,那麼元素任然新增到視窗中。如果依賴觸發器的使用的話,晚到但是未丟棄的元素可能會導致視窗再次被觸發。
  為了達到這個目的,Flink將保持視窗的狀態直到允許時延的發生,一旦發生,Flink將清除Window,刪除window的狀態,如Window 生命週期章節中所描述的那樣。
預設情況下,允許時延為0,也就是說水印之後到達的元素將被丟棄。
你可以按如下方式來指定一個允許時延:
Java 程式碼:

 DataStream<T> input = ...;
 input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .allowedLateness(<time>)
    .<windowed transformation>(<window function>);

Scala 程式碼:

 val input: DataStream[T] = ...
 input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .allowedLateness(<time>)
    .<windowed transformation>(<window function>)

注意:當使用GlobalWindows分配器時,沒有資料會被認為是延遲的,因為Global Window的最後時間是Long.MAX_VALUE

以側輸出來獲取延遲資料(Getting Late Data as a Site Output)

使用Flink的側輸出(https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/side_output.html )特性,你可以獲得一個已經被丟棄的延遲資料流。
  首先你需要在視窗化的資料流中呼叫sideOutputLateData(OutputTag)指定你需要獲取延遲資料,然後,你就可以在window 操作的結果中獲取到側輸出流了。
程式碼如下:
Java 程式碼:

final OutputTag<T> lateOutputTag = new OutputTag<T>("late-data"){};
DataStream<T> input = ...;
DataStream<T> result = input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .allowedLateness(<time>)
    .sideOutputLateData(lateOutputTag)
    .<windowed transformation>(<window function>);
DataStream<T> lateStream = result.getSideOutput(lateOutputTag);

Scala程式碼:

val lateOutputTag = OutputTag[T]("late-data")
val input: DataStream[T] = ...
val result = input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .allowedLateness(<time>)
    .sideOutputLateData(lateOutputTag)
    .<windowed transformation>(<window function>)
val lateStream = result.getSideOutput(lateOutputTag)

延遲元素考慮(Late elements considerations)

當指定一個允許延遲大於0時,window