1. 程式人生 > >Storm Window機制詳解

Storm Window機制詳解

概念

window 型別

Tumbling Window

按照固定的時間間隔或者Tuple數量劃分視窗。

例子一,按照固定時間滾動,5秒滾一個視窗:

| e1 e2 | e3 e4 e5 e6 | e7 e8 e9 |...
0       5             10         15    -> time
|   w1  |    w2       |     w3   |...

例子二,按照固定Tuple數量滾動,5個Tuple滾一個視窗

| e1 e2 e3 e4 e5 | e6 e7 e8 e9 e10 |...
0              5                10    -> count
|       w1       |        w2       |...

Sliding Window

也可以根據時間間隔或者Tuple數量來劃分視窗,由於視窗長度也可以是時間或者Tuple數量,所以Sliding Window的形式比Tumbling Window多

例子一,視窗長度為10s,滑動間隔為5s

| e1 e2 | e3 e4 e5 e6 | e7 e8 e9 |...
0       5             10         15    -> time
w1----->|
|----------w2---------|
        |-----------w3-----------|

例子二,視窗長度為10s,滑動間隔為5個Tuple

| e1 e2 e3 e4 e5 | e6 e7 e8 e9 e10 | e11 ...
0             5                10       -> count
|-------w1------|
         |----------w2------------|

例子三,視窗長度為10個Tuple,滑動間隔為5個Tuple

| e1 e2 e3 e4 e5 | e6 e7 e8 e9 e10 | e11 e12 e13 e14 e15 | ...
0              5                 10                    15  -> count
|-------w1-------|
|-----------------w2---------------|
                 |-------------------w2-----------------|

例子四,視窗長度為10個Tuplp,滑動間隔為5s

| e1 e2 e3 e4 e5 e6 e7 e8 e9 e10 e11 e12 e13 e14 e15 ...
0                      5             10           15   -> time
|------------w1---------|
        |--------------w2--------------|

當視窗長度和滑動距離相等時,便成了滾動視窗

TriggerPolicy

視窗的觸發策略,用於確定視窗的計算點,以時間或者Tuplt數量為標準

EvictionPolicy

視窗的事件回收策略,用標記的方式確定事件是否屬於本次視窗

Watermark和Lag

Watermark用於標記資料的處理進度,Lag主要是應對資料亂序的情況。

從當前資料中的最新一條資料的時間算起,往前減去Lag,得到一個時間,這個時間成為Watermark,認為Watermark之前的資料都已經到了。

06:00:00的資料有可能在06:00:06之後才到,若Lag=5s,不好意思,進不了視窗,會被 當成超時的資料。

程式碼分析

EvictionPolicy

事件回收策略介面,目前有4種實現,用於將event標記為以下4個狀態:

EXPIRE:失效的事件,會從queue中移除
PROCESS:將在最近的一個window中處理事件
KEEP:將在以後的window中處理些事件
STOP:停止處理些事件之後的event,認為此event之後的event將不再滿足這個策略

介面程式碼如下:

public interface EvictionPolicy<T> { 
    enum Action {
       EXPIRE,PROCESS,KEEP,STOP
    }
    Action evict(Event<T> event);  //對事件進行標記
    void track(Event<T> event);  //對事件進行跟蹤
    void setContext(EvictionContext context);  //設定context
}

分別介紹4種EvictionPolicy的實現類。

CountEvictionPolicy

以event數量做為視窗長度,只會標記兩種狀態:EXPIRE和PROCESS,有track()和evict()兩個主要方法。

track方法如下,用成員變數currentCount記錄已經跟蹤的event資料,但並不包括watermark event。

@Override
public void track(Event<T> event) {
    if (!event.isWatermark()) {
        currentCount.incrementAndGet();
    }
}

evict方法如下,返回一個標記後的Action,成員變數threshold即為視窗長度,當currentCount的值大於threshold時,則表示此事件不在本次視窗處理之內,需要標記為EXPIRE,否則標記為PROCESS。

@Override
public Action evict(Event<T> event) {
    while (true) {
        long curVal = currentCount.get();
        if (curVal > threshold) {
            if (currentCount.compareAndSet(curVal, curVal - 1)) {
                return Action.EXPIRE;
            }
        } else {
            break;
        }
    }
    return Action.PROCESS;
}

WatermarkCountEvictionPolicy

繼承自CountEvictionPolicy,增加referenceTime和processed兩個私有成員變數

referenceTime:即watermark(後面有解釋)
processed:本次視窗中已經被標記為PROCESS狀態的event數量

WatermarkCountEvictionPolicy比CountEvictionPolicy多了一個KEEP狀態,被標記為KEEP狀態的event將在下一個視窗中處理。

track()方法實現為空

evict的實現如下

@Override
public Action evict(Event<T> event) {
    Action action;
    if (event.getTimestamp() <= referenceTime && processed < currentCount.get()) {
        action = super.evict(event);
        if (action == Action.PROCESS) {
            ++processed;
        }
    } else {
        action = Action.KEEP;
    }
    return action;
}

TimeEvictionPolicy

以時間做為視窗的長度,只會標記兩種狀態:EXPIRE和PROCESS,用事件的時間和最新的時間做為標記的依據

track方法實現為空

evicty方法如下,成員變數referenceTime可能是在window中計算而來,或者是系統當前時間,

public Action evict(Event<T> event) {
    long now = referenceTime == null ? System.currentTimeMillis() : referenceTime;
    long diff = now - event.getTimestamp();
    if (diff >= windowLength) {
        return Action.EXPIRE;
    }
    return Action.PROCESS;
}

WatermarkTimeEvictionPolicy

WatermarkTimeEvictionPolicy繼承自TimeEvictionPolicy,增加了成員變數lag,標記的狀態也比TimeEvictionPolicy多了STOP和KEEP

重寫了evict方法

1   public Action evict(Event<T> event) {
2       long diff = referenceTime - event.getTimestamp();
3       if (diff < -lag) {
4           return Action.STOP; 
5       } else if (diff < 0) {
6           return Action.KEEP;
7       } else {
8           return super.evict(event);
9       }
10  }

第3行的判斷可以理解為event的時間比referenceTime大了一個lag以上,在此標記為STOP,後面的scan方法將在此處停止,認為後面的event都不可能在本次視窗事件中處理

第5行的判斷為event的時間比referenceTime大了一個lag以內,標記為KEEP。

TriggerPolicy

window的觸發策略介面,滿足觸發條件時,WindowManager的onTrigger方法得以執行。介面程式碼如下:

public interface TriggerPolicy<T> {
    void track(Event<T> event);  //跟蹤每個event,看是否滿足觸發條件
    void reset();
    void start();
    void shutdown();
}

也有4個實現類

CountTriggerPolicy

track方法如下,很簡單,當跟蹤的event資料大於count時,觸發onTrigger,count為構造方法傳的觸發上限。

public void track(Event<T> event) {
    if (started && !event.isWatermark()) {
        if (currentCount.incrementAndGet() >= count) {
            evictionPolicy.setContext(new DefaultEvictionContext(System.currentTimeMillis()));
            handler.onTrigger();
        }
    }
}

WatermarkCountTriggerPolicy

直接實現TriggerPolicy,而不是繼承CountTriggerPolicy。由於watermark event來觸發onTrigger

track方法如下:

public void track(Event<T> event) {
    if (started && event.isWatermark()) {
        handleWaterMarkEvent(event);
    }
}
private void handleWaterMarkEvent(Event<T> waterMarkEvent) {
    long watermarkTs = waterMarkEvent.getTimestamp();
    List<Long> eventTs = windowManager.getSlidingCountTimestamps(lastProcessedTs, watermarkTs, count);
    for (long ts : eventTs) {
        evictionPolicy.setContext(new DefaultEvictionContext(ts, null, Long.valueOf(count)));
        handler.onTrigger();
        lastProcessedTs = ts;
    }
}

TimeTriggerPolicy

定時來觸發視窗的計算。這個實現中用會啟動一個ScheduledExecutorService定時器,週期性執行內部執行緒newTriggerTask,由newTriggerTask來呼叫onTrigger方法。(程式碼很簡單,就不貼啦)

WatermarkTimeTriggerPolicy

這個類也是直接實現了TriggerPolicy介面,在track方法中判斷事件是否是watermark event,來決定是否觸發視窗計算。
track方法如下:

public void track(Event<T> event) {
    if (started && event.isWatermark()) {
        handleWaterMarkEvent(event);
    }
}

handleWaterMarkEvent方法通過while偱環,windowEndTs是當然計算的視窗的終點,起點就是終點減去視窗長度,本次視窗計算結束,onTrigger如果返回為true,windowEndTs將加上一個slidingIntervalMs(滑動長度)做為下一個視窗的終點。onTrigger如果返回為false,則表示這次計算的視窗中沒有event,將通過getNextAlignedWindowT方法來找到下一個視窗的終點。

private void handleWaterMarkEvent(Event<T> event) {
    long watermarkTs = event.getTimestamp();
    long windowEndTs = nextWindowEndTs;
    LOG.debug("Window end ts {} Watermark ts {}", windowEndTs, watermarkTs);
    while (windowEndTs <= watermarkTs) {
        long currentCount = windowManager.getEventCount(windowEndTs);
        evictionPolicy.setContext(new DefaultEvictionContext(windowEndTs, currentCount));
        if (handler.onTrigger()) {
            windowEndTs += slidingIntervalMs;
        } else {
            /*
             * 如果上次onTrigger沒有event,將通過getNextAlignedWindowTs方法來找到下一個視窗
             */
            long ts = getNextAlignedWindowTs(windowEndTs, watermarkTs);
            LOG.debug("Next aligned window end ts {}", ts);
            if (ts == Long.MAX_VALUE) {
                LOG.debug("No events to process between {} and watermark ts {}", windowEndTs, watermarkTs);
                break;
            }
            windowEndTs = ts;
        }
    }
    nextWindowEndTs = windowEndTs;
}

getNextAlignedWindowT方法通過找到windowEndTs到watermark這段時間裡最早的一個event的時間戳,以nextTs + (slidingIntervalMs - (nextTs % slidingIntervalMs))的方式做時間對齊,找到小於最早時間戳裡,能被滑動間隔整除的最小的一個時間點,做為下次計算的視窗的終點。

private long getNextAlignedWindowTs(long startTs, long endTs) {
    long nextTs = windowManager.getEarliestEventTs(startTs, endTs);
    if (nextTs == Long.MAX_VALUE || (nextTs % slidingIntervalMs == 0)) {
        return nextTs;
    }
    return nextTs + (slidingIntervalMs - (nextTs % slidingIntervalMs));
}

通過EvictionPolicy和TriggerPolicy這兩個介面的組合,形成了前面所講的6個視窗型別。這兩個介面的實現類中,也可以分為帶watermark的類和不帶watermark的類,如果使用者設定了時間欄位,就會以帶watermark的類處理event.

WaterMark的計算

在api中使用以下方法來改變watermark的產生週期,預設值是1000ms

public BaseWindowedBolt withWatermarkInterval(Duration interval)

interval實際被設定到了WaterMarkEventGenerator中,WaterMarkEventGenerator是一個執行緒,每隔interval時間間隔被ScheduledExecutorService執行一次,看以下WaterMarkEventGenerator中的幾個關鍵方法。

track

/**
* Tracks the timestamp of the event in the stream, returns
* true if the event can be considered for processing or
* false if its a late event.
* track在WindowedBoltExecutor的execute方法中被呼叫,以(ts >= lastWaterMarkTs)來判斷事件是否應該被放到queue中
*/

public boolean track(GlobalStreamId stream, long ts) {
    Long currentVal = streamToTs.get(stream);
    if (currentVal == null || ts > currentVal) {
        streamToTs.put(stream, ts);             //更新streamid對應的時間戳
    }
    checkFailures();
    return ts >= lastWaterMarkTs;             
}

computeWaterMarkTs

/**
*計算新的watermark,watermark是所有輸入流中最新的tuple時間戳的最小值(減去延時)
*/
private long computeWaterMarkTs() {
    long ts = 0;
    // only if some data has arrived on each input stream
    if (streamToTs.size() >= inputStreams.size()) {
        ts = Long.MAX_VALUE;
        for (Map.Entry<GlobalStreamId, Long> entry : streamToTs.entrySet()) {
            ts = Math.min(ts, entry.getValue());
        }
    }
    return ts - eventTsLag;
}

Example

(來自於官網)

若基於以下引數和資料:

Window length = 20s, sliding interval = 10s, watermark emit frequency = 1s, max lag = 5s

當前時間 = 09:00:00

Tuples e1(6:00:03), e2(6:00:05), e3(6:00:07), e4(6:00:18), e5(6:00:26), e6(6:00:36)9:00:00 and 9:00:01之前到達

09:00:01產生了新的 watermark, w1 = 6:00:31 ,此時,早於 6:00:31 的event到達時,將被當成超時資料。

通過事件中最早的一個 timestamp (06:00:03)和sliding interval來計算後,將產生三個window,第一個window的終點是06:00:10,如下所示:

  1. 5:59:50 - 06:00:10 : e1, e2, e3
  2. 6:00:00 - 06:00:20 : e1, e2, e3, e4
  3. 6:00:10 - 06:00:30 : e4, e5

由於e6(6:00:36) 比watermark(6:00:31)要晚,所以不在本次觸發中處理。

Tuples e7(8:00:25), e8(8:00:26), e9(8:00:27), e10(8:00:39)9:00:01 and 9:00:02之間到達

09:00:02 產生下一個 watermark, w2 = 08:00:34 ,此時,早於 8:00:34 的event到達時,將被當成超時資料。

將產生以下window:

  1. 6:00:20 - 06:00:40 : e5, e6
  2. 6:00:30 - 06:00:50 : e6
  3. 8:00:10 - 08:00:30 : e7, e8, e9

e10 (8:00:39 )比 watermark 8:00:34 晚,所以不在本次處理

其它

第一個視窗怎麼產生

程序啟動初次觸發視窗計算時,WindowManager的onTrigger()方法會返回false,WatermarkTimeTriggerPolicy中的getNextAlignedWindowTs()方法會被呼叫,從而產生第一個真正的Window。

之後每次觸發視窗計算,會用上一次計算的最後一個視窗的結束時間加上sliding interval得到本次計算的下一個視窗的結束時間。

Guarantees

storm window提供了at-least once的保障,tuple在window中經過 windowLength + slidingInterval後被expire後,然後自動被ack,因此topology.message.timeout.secs 必須遠大於 windowLength + slidingInterval 。如果是基於count觸發的window,需要去結合實際的視窗長度和滑動時間才調整超時時間的大小。