1. 程式人生 > >Apache Flink原始碼解析之stream-operator

Apache Flink原始碼解析之stream-operator

前面我們談論了Flink stream中的transformation。你可以將transformation看成編寫Flink程式並構建流式處理程式的必要組成部分(靜態表現形式);而本篇我們將探討transformation在Flink執行時對應的動態表現形式——operator。他們之間的對映關係見下圖:

flink-stream-operator_transformation-operator

具體的探討可以檢視前文:Flink中的一些核心概念

StreamOperator

所有operator的最終基類,operator的分類方式,按照輸入流個數不同分為:

  • 無輸入:StreamSource
  • 單個流輸入:OneInputStreamOperator
  • 兩個流輸入:TwoInputStreamOperator

跟生命週期有關的核心抽象方法:

  • setup : 例項化operator
  • open :該方法會在任何元素被處理之前執行,它的實現通常包含了operator的初始化邏輯
  • close :該方法在所有的元素都進入到operator被處理之後呼叫
  • dispose :該方法在operator生命週期的最後階段執行,主要用於回收資源

StreamOperator及其實現類中還包含了一些狀態恢復與儲存相關的邏輯,但這些不是本文的主題,所有暫時不做探討。

先來看一下整個package的類關係圖:

flink-stream-operator_all-class-diagram

我們整個剖析方式大致也按照以上operator的分類方式以及類的層次結構來。

StreamSource

作為一個流處理DAG的起點,source operator相比其他operator無疑是特別的(從類的繼承關係圖也可以看出來)。

它需要接受SourceFunction的例項。並且我們可以看到,它的chaining strategyHEAD(它表示operator不能有前置operator,但可以作為其他operator的前置operator,下文會談到)。

this.chainingStrategy = ChainingStrategy.HEAD;

StreamSource的實現略顯複雜,因為它涉及到我們前面文章談SourceFunction時談到的SourceFunction.SourceContext

的實現。在這裡提供了三個實現,分別對應我們之前談到的Flink對事件時間的三個分類:

flink-stream-operator_streamsource

  • NonTimestampContext:針對ProcessingTime,該SourceContext將時間戳設定為-1,並且不發射watermark
  • AutomaticWatermarkContext:針對IngestionTime,提供自動的watermark發射機制的SourceContext
  • ManualWatermarkContext:針對EventTime的人工發射watermarkSourceContext

它們之間的對應關係也體現在其run方法的實現中:

        switch (timeCharacteristic) {
            case EventTime:
                ctx = new ManualWatermarkContext<>(this, lockingObject, collector);
                break;
            case IngestionTime:
                ctx = new AutomaticWatermarkContext<>(this, lockingObject, collector,
                        getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval());
                break;
            case ProcessingTime:
                ctx = new NonTimestampContext<>(this, lockingObject, collector);
                break;
            default:
                throw new Exception(String.valueOf(timeCharacteristic));
        }

run方法內部會呼叫SourceFunctionrun方法:

try {
            userFunction.run(ctx);

            // if we get here, then the user function either exited after being done (finite source)
            // or the function was canceled or stopped. For the finite source case, we should emit
            // a final watermark that indicates that we reached the end of event-time
            if (!isCanceledOrStopped()) {
                ctx.emitWatermark(Watermark.MAX_WATERMARK);
            }
        } finally {
            // make sure that the context is closed in any case
            ctx.close();
        }

StreamSource通過一個屬性:canceledOrStopped來控制sourceFunction的停止。

整個StreamSource的執行邏輯由run來表述,通過cancel來控制停止邏輯。

NonTimestampContext

NonTimestampContext會忽略時間戳,因此它的實現裡稍微特別一點的地方在下面的這兩個方法:

public void collectWithTimestamp(T element, long timestamp) {
    // ignore the timestamp
    collect(element);
}

以及

public void emitWatermark(Watermark mark) {
    owner.checkAsyncException();
    // do nothing else
}

第一個方法忽略了時間戳,第二個方法不傳送watermark

ManualWatermarkContext

無需特別說明

AutomaticWatermarkContext

該類是自動傳送watermark的實現,在構造器中接收引數watermarkInterval來指定自動傳送watermark的時間間隔。具體的實現機制是,新建一個獨立的發射執行緒,以指定的時間間隔發射:

            this.scheduleExecutor = Executors.newScheduledThreadPool(1);

            this.watermarkTimer = scheduleExecutor.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    final long currentTime = System.currentTimeMillis();

                    if (currentTime > nextWatermarkTime) {
                        // align the watermarks across all machines. this will ensure that we
                        // don't have watermarks that creep along at different intervals because
                        // the machine clocks are out of sync
                        final long watermarkTime = currentTime - (currentTime % watermarkInterval);

                        synchronized (lockingObjectParam) {
                            if (currentTime > nextWatermarkTime) {
                                outputParam.emitWatermark(new Watermark(watermarkTime));
                                nextWatermarkTime += watermarkInterval;
                            }
                        }
                    }
                }
            }, 0, watermarkInterval, TimeUnit.MILLISECONDS);

除了這種基於時間的以固定頻率發射watermark的機制,在collect方法被呼叫時,也會檢查當前的時間戳,如果達到傳送條件也會觸發emit watermark

而因為該類實現的是自動傳送,在構造器中實現一個定時傳送機制,所以emitWatermark方法也就不需要再實現傳送邏輯(因為已不再需要使用者程式呼叫emitWatermark方法了),而該方法在該類中的主要任務是負責停止自動傳送。停止自動傳送的觸發條件是收到最後一個元素的訊號(將最後一個元素的時間戳設定為Long.MAX_VALUE),emitWatermark收到該標識後,再將其往下游傳遞並關閉定時傳送執行緒。

OneInputStreamOperator

單一輸入流的operator介面,繼承自StreamOperator。提供了兩個介面方法:

  • processElement:處理到達該operator的一個元素
  • processWatermark:處理一個Watermark

TwoInputStreamOperator

支援兩個流作為輸入的operator,同樣繼承自StreamOperator。擴充了多個介面方法:

  • processElement1 : 處理來自第一個輸入的某個元素
  • processElement2 : 處理來自第二個輸入的某個元素
  • processWatermark1 : 處理來自第一個輸入的一個Watermark
  • processWatermark2 : 處理來自第二個輸入的一個Watermark

輔助實現類

Output

Collector的擴充套件,增加了發射WaterMark的功能。該介面主要供operator用於發射元素或者WaterMark

  • emitWatermark : 該發射WaterMark將廣播給下游的所有operator

TimeCharacteristic

Flink在涉及到時間相關的處理時,將時間劃分為三類。而時間型別的定義在Flink中就是用該列舉來表示:

  • ProcessingTime
  • IngestionTime
  • EventTime

這三種時間型別之前我們曾多次提及,這裡不再囉嗦

TimestampedCollector

Output的包裝器實現,它用於給元素設定時間戳

AbstractStreamOperator

該抽象類為實現一個具體的operator提供基本的支援,Flink內建提供的operator全部都直接或間接繼承自AbstractStreamOperator

它內部包含了三大類的屬性:

  • 配置屬性
  • 執行時屬性
  • 鍵值對狀態屬性

大都數方法都是輔助方法,值得一提的是setup方法。從這裡我們可以看到所有operator識別符號的生成方式:

String operatorIdentifier = getClass().getSimpleName() + "_" + config.getVertexID() + "_" + runtimeContext.getIndexOfThisSubtask();

可以看到標識是由”_”間隔的三段拼接而成。三段分別是:類名,vertex id,以及當前subtask的索引。

然後基於此標識,建立了用於儲存狀態的stateBackend

stateBackend = container.createStateBackend(operatorIdentifier, keySerializer);

stateBackenddispose方法中會被關閉。

AbstractStreamOperator並沒有對open/close等生命週期方法提供具體的實現,這些方法的具體實現被後延至後面談到的AbstractUdfStreamOperator中。

AbstractUdfStreamOperator

該類主要針對operator生命週期相關的方法(open/close/dispose)提供了模板實現。而這些實現都統一針對使用者定義的Function的例項(簡稱udf)。

ChainingStrategy

該列舉定義了operatorchain strategy(連結策略)。當一個operator連結到其前置operator時,意味著它們將在同一個執行緒上執行。StreamOperator的預設值是HEAD,這意味著它將沒有前置operator,不過它有可能成為其他operator的前置operator。大部分StreamOperator將該列舉以ALWAYS覆蓋,表示它們將連結到一個前置operator

它的三個列舉值:

  • ALWAYS :上面已經提到過,它允許將當前operator連結到某前置operator,這是提升效能的良好實踐,它能夠提升operator的並行度
  • NEVER :該策略不支援operator被連結到某前置operator也不支援被作為其他operator的前置operator
  • HEAD :該策略表示operator沒有前置operator,不過可以作為其他operatorchain header

內建的Operator實現

StreamCounter

元素累加器,沒有什麼特別的

StreamProject

這裡需要解釋一下,此處的project,並非通常所指的專案的意思,而是投射、投影的意思。你可以將其類比於SQL中的SELECT子句。因此他允許你選擇你需要的fields集合。這通過其構造器的一個欄位索引陣列來指定:

processElement方法中,它依次遍歷所有需要的欄位索引,將元素中需要的欄位提取出來,放入一個用於輸出的outTuple,最後再將其發射出去:

    public void processElement(StreamRecord<IN> element) throws Exception {
        for (int i = 0; i < this.numFields; i++) {
            outTuple.setField(((Tuple) element.getValue()).getField(fields[i]), i);
        }
        output.collect(element.replace(outTuple));
    }

StreamFilter

filter operator,處理邏輯很簡單,根據自定義的FilterFunction方法,對每個元素進行過濾,如果滿足過濾條件,則將該元素emit出去。

StreamMap

map operator,根據傳入的MapFunction,對每個元素應用map操作後將其發射出去。

StreamFlatMap

flatmap operator接收FlatMapFunction函式,有一些特別之處:在其open方法中,它初始化了一個TimestampedCollector,作為傳遞給FlatMapFunctioncollector,該collector是給那些特定的userFunction使用的,並且用於給他們操作的元素設定時間戳。

StreamGroupedFold

分組的fold operatorfold函式的執行依賴於一個初始化值initialValue。因此這裡涉及到狀態儲存。並且狀態是跟具體的分割槽關聯的。因此,在open方法的實現中,需要獲得跟分割槽關聯的ValueState

        ValueStateDescriptor<OUT> stateId = new ValueStateDescriptor<>(STATE_NAME, outTypeSerializer, null);
        values = getPartitionedState(stateId);

processElement方法的實現,涉及到一系列的操作:從ValueState中獲取資料,作為“新”的初始值跟當前元素一起進行fold函式運算,獲得結果後更新ValueState,然後將獲得的結果emit出去。

StreamGroupedReduce

按分組進行reduce操作的operator.

基於特定的狀態名稱:

private static final String STATE_NAME = "_op_state";

構建狀態id

ValueStateDescriptor<IN> stateId = new ValueStateDescriptor<>(STATE_NAME, serializer, null);

然後再獲取狀態值:

values = getPartitionedState(stateId);

以上兩個動作在open方法中實現

processElement方法中,分為兩種情況:

  • 如果之前已存在狀態值,那麼拿當前值跟之前的狀態值做reduce並獲得結果,將結果再次更新到最新狀態並emit出去
  • 如果之前不存在狀態值,那麼直接將當前值更新到狀態中,並將當前值emit出去

StreamSink

sink operator,通常是流處理的最後一個operator。它接收SinkFunction的例項。在processElement中依次呼叫其invoke方法。

小結

本文主要探討了stream transformation的執行時形式operator的大致實現。

微信掃碼關注公眾號:Apache_Flink

apache_flink_weichat

QQ掃碼關注QQ群:Apache Flink學習交流群(123414680)

qrcode_for_apache_flink_qq_group