1. 程式人生 > >Flink 原理與實現:Session Window

Flink 原理與實現:Session Window

轉載來源:http://wuchong.me/blog/2016/06/06/flink-internals-session-window/

上一篇文章:Window機制中,我們介紹了視窗的概念和底層實現,以及 Flink 一些內建的視窗,包括滑動視窗、翻滾視窗。本文將深入講解一種較為特殊的視窗:會話視窗(session window)。建議您在閱讀完上一篇文章的基礎上再閱讀本文。

當我們需要分析使用者的一段互動的行為事件時,通常的想法是將使用者的事件流按照“session”來分組。session 是指一段持續活躍的期間,由活躍間隙分隔開。通俗一點說,訊息之間的間隔小於超時閾值(sessionGap)的,則被分配到同一個視窗,間隔大於閾值的,則被分配到不同的視窗。目前開源領域大部分的流計算引擎都有視窗的概念,但是沒有對 session window 的支援,要實現 session window,需要使用者自己去做完大部分事情。而當 Flink 1.1.0 版本正式釋出時,Flink 將會是開源流計算領域第一個內建支援 session window 的引擎。

在 Flink 1.1.0 之前,Flink 也可以通過自定義的window assigner和trigger來實現一個基本能用的session window。release-1.0 版本中提供了一個實現 session window 的 example:SessionWindowing。這個session window範例的實現原理是,基於GlobleWindow這個window assigner,將所有元素都分配到同一個視窗中,然後指定一個自定義的trigger來觸發執行視窗。這個trigger的觸發機制是,對於每個到達的元素都會根據其時間戳(timestamp)註冊一個會話超時的定時器(timestamp+sessionTimeout),並移除上一次註冊的定時器。最新一個元素到達後,如果超過 sessionTimeout 的時間還沒有新元素到達,那麼trigger就會觸發,當前視窗就會是一個session window。處理完視窗後,視窗中的資料會清空,用來快取下一個session window的資料。

但是這種session window的實現是非常弱的,無法應用到實際生產環境中的。因為它無法處理亂序 event time 的訊息。 而在即將到來的 Flink 1.1.0 版本中,Flink 提供了對 session window 的直接支援,使用者可以通過SessionWindows.withGap()來輕鬆地定義 session widnow,而且能夠處理亂序訊息。Flink 對 session window 的支援主要借鑑自 Google 的 DataFlow 。

假設有這麼個場景,使用者點開手機淘寶後會進行一系列的操作(點選、瀏覽、搜尋、購買、切換tab等),這些操作以及對應發生的時間都會發送到伺服器上進行使用者行為分析。那麼使用者的操作行為流的樣例可能會長下面這樣:

通過上圖,我們可以很直觀地觀察到,使用者的行為是一段一段的,每一段內的行為都是連續緊湊的,段內行為的關聯度要遠大於段之間行為的關聯度。我們把每一段使用者行為稱之為“session”,段之間的空檔我們稱之為“session gap”。所以,理所當然地,我們應該按照 session window 對使用者的行為流進行切分,並計算每個session的結果。如下圖所示:

為了定義上述的視窗切分規則,我們可以使用 Flink 提供的 SessionWindows 這個 widnow assigner API。如果你用過 SlidingEventTimeWindowsTumlingProcessingTimeWindows等,你會對這個很熟悉。

 

DataStream input = …

DataStream result = input

.keyBy(<key selector>)

.window(SessionWindows.withGap(Time.seconds(<seconds>))

.apply(<window function>) // or reduce() or fold()

這樣,Flink 就會基於元素的時間戳,自動地將元素放到不同的session window中。如果兩個元素的時間戳間隔小於 session gap,則會在同一個session中。如果兩個元素之間的間隔大於session gap,且沒有元素能夠填補上這個gap,那麼它們會被放到不同的session中。

底層實現

為了實現 session window,我們需要擴充套件 Flink 中的視窗機制,使得能夠支援視窗合併。要理解其原因,我們需要先了解視窗的現狀。在上一篇文章中,我們談到了 Flink 中 WindowAssigner 負責將元素分配到哪個/哪些視窗中去,Trigger 決定了一個視窗何時能夠被計算或清除。當元素被分配到視窗之後,這些視窗是固定的不會改變的,而且視窗之間不會相互作用。

對於session window來說,我們需要視窗變得更靈活。基本的思想是這樣的:SessionWindows assigner 會為每個進入的元素分配一個視窗,該視窗以元素的時間戳作為起始點,時間戳加會話超時時間為結束點,也就是該視窗為[timestamp, timestamp+sessionGap)。比如我們現在到了兩個元素,它們被分配到兩個獨立的視窗中,兩個視窗目前不相交,如圖:

當第三個元素進入時,分配到的視窗與現有的兩個視窗發生了疊加,情況變成了這樣:

由於我們支援了視窗的合併,WindowAssigner可以合併這些視窗。它會遍歷現有的視窗,並告訴系統哪些視窗需要合併成新的視窗。Flink 會將這些視窗進行合併,合併的主要內容有兩部分:

  1. 需要合併的視窗的底層狀態的合併(也就是視窗中快取的資料,或者對於聚合視窗來說是一個聚合值)
  2. 需要合併的視窗的Trigger的合併(比如對於EventTime來說,會刪除舊視窗註冊的定時器,並註冊新視窗的定時器)

總之,結果是三個元素現在在同一個視窗中了:

需要注意的是,對於每一個新進入的元素,都會分配一個屬於該元素的視窗,都會檢查併合並現有的視窗。在觸發視窗計算之前,每一次都會檢查該視窗是否可以和其他視窗合併,直到trigger觸發後,會將該視窗從視窗列表中移除。對於 event time 來說,視窗的觸發是要等到大於視窗結束時間的 watermark 到達,當watermark沒有到,視窗會一直快取著。所以基於這種機制,可以做到對亂序訊息的支援。

這裡有一個優化點可以做,因為每一個新進入的元素都會建立屬於該元素的視窗,然後合併。如果新元素連續不斷地進來,並且新元素的視窗一直都是可以和之前的視窗重疊合並的,那麼其實這裡多了很多不必要的建立視窗、合併視窗的操作,我們可以直接將新元素放到那個已存在的視窗,然後擴充套件該視窗的大小,看起來就像和新元素的視窗合併了一樣。

原始碼分析

FLINK-3174 這個JIRA中有對 Flink 如何支援 session window 的詳細說明,以及程式碼更新。建議可以結合該 PR的程式碼來理解本文討論的實現原理。

為了擴充套件 Flink 中的視窗機制,使得能夠支援視窗合併,首先 window assigner 要能合併現有的視窗,Flink 增加了一個新的抽象類 MergingWindowAssigner 繼承自 WindowAssigner,這裡面主要多了一個 mergeWindows 的方法,用來決定哪些視窗是可以合併的。

 

public abstract class MergingWindowAssigner<T, W extends Window> extends WindowAssigner<T, W> {

private static final long serialVersionUID = 1L;

/**

* 決定哪些視窗需要被合併。對於每組需要合併的視窗, 都會呼叫 callback.merge(toBeMerged, mergeResult)

*

* @param windows 現存的視窗集合 The window candidates.

* @param callback 需要被合併的視窗會回撥 callback.merge 方法

*/

public abstract void mergeWindows(Collection<W> windows, MergeCallback<W> callback);

public interface MergeCallback<W> {

/**

* 用來宣告合併視窗的具體動作(合併視窗底層狀態、合併視窗trigger等)。

*

* @param toBeMerged 需要被合併的視窗列表

* @param mergeResult 合併後的視窗

*/

void merge(Collection<W> toBeMerged, W mergeResult);

}

}

所有已經存在的 assigner 都繼承自 WindowAssigner,只有新加入的 session window assigner 繼承自 MergingWindowAssigner,如:ProcessingTimeSessionWindowsEventTimeSessionWindows

另外,Trigger 也需要能支援對合並視窗後的響應,所以 Trigger 添加了一個新的介面 onMerge(W window, OnMergeContext ctx),用來響應發生視窗合併之後對trigger的相關動作,比如根據合併後的視窗註冊新的 event time 定時器。

OK,接下來我們看下最核心的程式碼,也就是對於每個進入的元素的處理,程式碼位於WindowOperator.processElement方法中,如下所示:

 

public void processElement(StreamRecord<IN> element) throws Exception {

Collection<W> elementWindows = windowAssigner.assignWindows(element.getValue(), element.getTimestamp());

final K key = (K) getStateBackend().getCurrentKey();

if (windowAssigner instanceof MergingWindowAssigner) {

// 對於session window 的特殊處理,我們只關注該條件塊內的程式碼

MergingWindowSet<W> mergingWindows = getMergingWindowSet();

for (W window: elementWindows) {

final Tuple1<TriggerResult> mergeTriggerResult = new Tuple1<>(TriggerResult.CONTINUE);

// 加入新視窗, 如果沒有合併發生,那麼actualWindow就是新加入的視窗

// 如果有合併發生, 那麼返回的actualWindow即為合併後的視窗,

// 並且會呼叫 MergeFunction.merge 方法, 這裡方法中的內容主要是更新trigger, 合併舊視窗中的狀態到新視窗中

W actualWindow = mergingWindows.addWindow(window, new MergingWindowSet.MergeFunction<W>() {

@Override

public void merge(W mergeResult,

Collection<W> mergedWindows, W stateWindowResult,

Collection<W> mergedStateWindows) throws Exception {

context.key = key;

context.window = mergeResult;

// 這裡面會根據新視窗的結束時間註冊新的定時器

mergeTriggerResult.f0 = context.onMerge(mergedWindows);

// 刪除舊視窗註冊的定時器

for (W m: mergedWindows) {

context.window = m;

context.clear();

}

// 合併舊視窗(mergedStateWindows)中的狀態到新視窗(stateWindowResult)中

getStateBackend().mergePartitionedStates(stateWindowResult,

mergedStateWindows,

windowSerializer,

(StateDescriptor<? extends MergingState<?,?>, ?>) windowStateDescriptor);

}

});

// 取 actualWindow 對應的用來存狀態的視窗

W stateWindow = mergingWindows.getStateWindow(actualWindow);

// 從狀態後端拿出對應的狀態

AppendingState<IN, ACC> windowState = getPartitionedState(stateWindow, windowSerializer, windowStateDescriptor);

// 將新進入的元素資料加入到新視窗(或者說合並後的視窗)中對應的狀態中

windowState.add(element.getValue());

context.key = key;

context.window = actualWindow;

// 檢查是否需要fire or purge

TriggerResult triggerResult = context.onElement(element);

TriggerResult combinedTriggerResult = TriggerResult.merge(triggerResult, mergeTriggerResult.f0);

// 根據trigger結果決定怎麼處理視窗中的資料

processTriggerResult(combinedTriggerResult, actualWindow);

}

} else {

// 對於普通window assigner的處理, 這裡我們不關注

for (W window: elementWindows) {

AppendingState<IN, ACC> windowState = getPartitionedState(window, windowSerializer,

windowStateDescriptor);

windowState.add(element.getValue());

context.key = key;

context.window = window;

TriggerResult triggerResult = context.onElement(element);

processTriggerResult(triggerResult, window);

}

}

}

其實這段程式碼寫的並不是很clean,並且不是很好理解。在第六行中有用到MergingWindowSet,這個類很重要所以我們先介紹它。這是一個用來跟蹤視窗合併的類。比如我們有A、B、C三個視窗需要合併,合併後的視窗為D視窗。這三個視窗在底層都有對應的狀態集合,為了避免代價高昂的狀態替換(建立新狀態是很昂貴的),我們保持其中一個視窗作為原始的狀態視窗,其他幾個視窗的資料合併到該狀態視窗中去,比如隨機選擇A作為狀態視窗,那麼B和C視窗中的資料需要合併到A視窗中去。這樣就沒有新狀態產生了,但是我們需要額外維護視窗與狀態視窗之間的對映關係(D->A),這就是MergingWindowSet負責的工作。這個對映關係需要在失敗重啟後能夠恢復,所以MergingWindowSet內部也是對該對映關係做了容錯。狀態合併的工作示意圖如下所示:

然後我們來解釋下processElement的程式碼,首先根據window assigner為新進入的元素分配視窗集合。接著進入第一個條件塊,取出當前的MergingWindowSet。對於每個分配到的視窗,我們就會將其加入到MergingWindowSet中(addWindow方法),由MergingWindowSet維護視窗與狀態視窗之間的關係,並在需要視窗合併的時候,合併狀態和trigger。然後根據對映關係,取出結果視窗對應的狀態視窗,根據狀態視窗取出對應的狀態。將新進入的元素資料加入到該狀態中。最後,根據trigger結果來對視窗資料進行處理,對於session window來說,這裡都是不進行任何處理的。真正對視窗處理是由定時器超時後對完成的視窗呼叫processTriggerResult

總結

本文在上一篇文章:Window機制的基礎上,深入講解了 Flink 是如何支援 session window 的,核心的原理是視窗的合併。Flink 對於 session window 的支援很大程度上受到了 Google DataFlow 的啟發,所以也建議閱讀下 DataFlow 的論文。

參考資料