1. 程式人生 > >[原始碼分析] 從原始碼入手看 Flink Watermark 之傳播過程

[原始碼分析] 從原始碼入手看 Flink Watermark 之傳播過程

[原始碼分析] 從原始碼入手看 Flink Watermark 之傳播過程

0x00 摘要

本文將通過原始碼分析,帶領大家熟悉Flink Watermark 之傳播過程,順便也可以對Flink整體邏輯有一個大致把握。

0x01 總述

從靜態角度講,watermarks是實現流式計算的核心概念;從動態角度說,watermarks貫穿整個流處理程式。所以為了講解watermarks的傳播,需要對flink的很多模組/概念進行了解,涉及幾乎各個階段。我首先會講解相關概念,然後會根據一個例項程式碼從以下幾部分來解釋:程式邏輯/計算圖模型/程式執行。最後是詳細Flink原始碼分析(略冗長,可以選擇性閱讀)。

0x02 相關概念

流計算被抽象成四個問題,what,where,when,how。

window解決的是where,也就是將無界資料劃分成有界資料。

window的資料何時被計算是when?解決這個問題用的方式是watermark和trigger,watermark用來標記視窗的完整性。trigger用來設計視窗資料觸發條件。

1. 亂序處理

亂序問題一般是和event time關聯的, 對於一個流式處理系統的process time來說,是不存在亂序問題的。所以下面介紹的watermark/allowedLateness也只是在event time作為主時間才生效。

Flink中處理亂序依賴的 watermark+window+trigger,屬於全域性性的處理;Flink同時對於window而言,還提供了allowedLateness方法,使得更大限度的允許亂序,屬於區域性性的處理;

即watermark是全域性的,不止針對window計算,而allowedLateness讓某一個特定window函式能自己控制處理延遲資料的策略,allowedLateness是視窗函式的屬性。

2. Watermark(水位線)

watermark是流式系統中主要用於解決流式系統中資料亂序問題的機制,方法是用於標記當前處理到什麼水位的資料了,這意味著再早於這個水位的資料過來會被直接丟棄。這使得引擎可以自動跟蹤資料中的當前事件時間,並嘗試相應地清除舊狀態。

Watermarking表示多長時間以前的資料將不再更新,您可以通過指定事件時間列來定義查詢的Watermarking,並根據事件時間預測資料的延遲時間。也就是說每次視窗滑動之前會進行Watermarking的計算。當一組資料或新接收的資料事件時間小於Watermarking時,則該資料不會更新,在記憶體中就不會維護該組資料的狀態。

換一種說法,閾值內的滯後資料將被聚合,但是晚於閾值到來的資料(其實際時間比watermark小)將被丟棄。

watermark和資料本身一樣作為正常的訊息在流中流動

3. Trigger

Trigger 指明在哪些條件下觸發window計算,基於處理資料時的時間以及事件的特定屬性。一般trigger的實現是當watermark處於某種時間條件下或者視窗資料達到一定條件,視窗的資料開始計算。

每個視窗分配器都會有一個預設的Trigger。如果預設的Trigger不能滿足你的需求,你可以指定一個自定義的trigger()。Flink Trigger介面有如下方法允許trigger對不同的事件做出反應:

* onElement():進入視窗的每個元素都會呼叫該方法。
* onEventTime():事件時間timer觸發的時候被呼叫。
* onProcessingTime():處理時間timer觸發的時候會被呼叫。
* onMerge():有狀態的觸發器相關,並在它們相應的視窗合併時合併兩個觸發器的狀態,例如使用會話視窗。
* clear():該方法主要是執行視窗的刪除操作。

每次trigger,都是要對新增的資料,相關的window進行重新計算,並輸出。輸出有complete, append,update三種輸出模式:

  • Complete mode:Result Table 全量輸出,也就是重新計算過的window結果都輸出。意味著這種模式下,每次讀了新增的input資料,output的時候會把記憶體中resulttable中所有window的結果都輸出一遍。

  • Append mode (default):只有 Result Table 中新增的行才會被輸出,所謂新增是指自上一次 trigger 的時候。因為只是輸出新增的行,所以如果老資料有改動就不適合使用這種模式。 更新的window並不輸出,否則外存裡的key就重了。

  • Update mode:只要更新的 Row 都會被輸出,相當於 Append mode 的加強版。而且是對外存中的相同key進行update,而不是append,需要外存是能kv操作的!只會輸出新增和更新過的window的結果。

從上面能看出來,流式框架對於window的結果資料是存在一個 result table裡的!

4. allowedLateness

Flink中藉助watermark以及window和trigger來處理基於event time的亂序問題,那麼如何處理“late element”呢?

也許還有人會問,out-of-order element與late element有什麼區別?不都是一回事麼?答案是一回事,都是為了處理亂序問題而產生的概念。要說區別,可以總結如下:

  • 通過watermark機制來處理out-of-order的問題,屬於第一層防護,屬於全域性性的防護,通常說的亂序問題的解決辦法,就是指這類;
  • 通過視窗上的allowedLateness機制來處理out-of-order的問題,屬於第二層防護,屬於特定window operator的防護,late element的問題就是指這類。

預設情況下,當watermark通過end-of-window之後,再有之前的資料到達時,這些資料會被刪除。為了避免有些遲到的資料被刪除,因此產生了allowedLateness的概念。

簡單來講,allowedLateness就是針對event time而言,對於watermark超過end-of-window之後,還允許有一段時間(也是以event time來衡量)來等待之前的資料到達,以便再次處理這些資料。

5. 處理訊息過程

  1. windowoperator接到訊息以後,首先存到state,存放的格式為k,v,key的格式是key + window,value是key和window對應的資料。
  2. 註冊一個timer,timer的資料結構為 [key,window,window邊界 - 1],將timer放到集合中去。
  3. 當windowoperator收到watermark以後,取出集合中小於watermark的timer,觸發其window。觸發的過程中將state裡面對應key及window的資料取出來,這裡要經過序列化的過程,傳送給windowfunction計算。
  4. 資料傳送給windowfunction,實現windowfunction的window資料計算邏輯。

比如某視窗有三個資料:[key A, window A, 0], [key A, window A, 4999], [key A, window A, 5000]

對於固定視窗,當第一個watermark (Watermark 5000)到達時候,[key A, window A, 0], [key A, window A, 4999] 會被計算,當第二個watermark (Watermark 9999)到達時候,[key A, window A, 5000]會被計算。

6. 累加(再次)計算

watermark是全域性性的引數,用於管理訊息的亂序,watermark超過window的endtime之後,就會觸發視窗計算。一般情況下,觸發視窗計算之後,視窗就銷燬掉了,後面再來的資料也不會再計算。

因為加入了allowedLateness,所以計算會和之前不同了。window這個allowedLateness屬性,預設為0,如果allowedLateness > 0,那麼在某一個特定watermark到來之前,這個觸發過計算的視窗還會繼續保留,這個保留主要是窗口裡的訊息。

這個特定的watermark是什麼呢? watermark-allowedLateness>=視窗endtime。這個特定watermark來了之後,視窗就要消失了,後面再來屬於這個視窗的訊息,就丟掉了。在 "watermark(=視窗endtime)" ~ “watermark(=endtime+allowedLateness)" 這段時間之間,對應視窗可能會多次計算。那麼要window的endtime+allowedLateness <= watermark的時候,window才會被清掉。

比如window的endtime是5000,allowedLateness=0,那麼如果watermark 5000到來之後,這個window就應該被清除。但是如果allowedLateness = 1000,則需要等water 6000(endtime + allowedLateness)到來之後,這個window才會被清掉。

Flink的allowedLateness可用於TumblingEventTimeWindow、SlidingEventTimeWindow以及EventTimeSessionWindows,這可能使得視窗再次被觸發,相當於對前一次視窗的視窗的修正(累加計算或者累加撤回計算);

注意:對於trigger是預設的EventTimeTrigger的情況下,allowedLateness會再次觸發視窗的計算,而之前觸發的資料,會buffer起來,直到watermark超過end-of-window + allowedLateness的時間,視窗的資料及元資料資訊才會被刪除。再次計算就是DataFlow模型中的Accumulating的情況。

同時,對於sessionWindow的情況,當late element在allowedLateness範圍之內到達時,可能會引起視窗的merge,這樣之前視窗的資料會在新視窗中累加計算,這就是DataFlow模型中的AccumulatingAndRetracting的情況。

7. Watermark傳播

生產任務的pipeline中通常有多個stage,在源頭產生的watermark會在pipeline的多個stage間傳遞。瞭解watermark如何在一個pipeline的多個stage間進行傳遞,可以更好的瞭解watermark對整個pipeline的影響,以及對pipeline結果延時的影響。我們在pipeline的各stage的邊界上對watermark做如下定義:

  • 輸入watermark(An input watermark):捕捉上游各階段資料處理進度。對源頭運算元,input watermark是個特殊的function,對進入的資料產生watermark。對非源頭運算元,input watermark是上游stage中,所有shard/partition/instance產生的最小的watermark
  • 輸出watermark(An output watermark):捕捉本stage的資料進度,實質上指本stage中,所有input watermark的最小值,和本stage中所有非late event的資料的event time。比如,該stage中,被快取起來等待做聚合的資料等。

每個stage內的操作並不是線性遞增的。概念上,每個stage的操作都可以被分為幾個元件(components),每個元件都會影響pipeline的輸出watermark。每個元件的特性與具體的實現方式和包含的運算元相關。理論上,這類運算元會快取資料,直到觸發某個計算。比如快取一部分資料並將其存入狀態(state)中,直到觸發聚合計算,並將計算結果寫入下游stage。

watermark可以是以下項的最小值:

  • 每個source的watermark(Per-source watermark) - 每個傳送資料的stage.
  • 每個外部資料來源的watermark(Per-external input watermark) - pipeline之外的資料來源
  • 每個狀態元件的watermark(Per-state component watermark) - 每種需要寫入的state型別
  • 每個輸出buffer的watermark(Per-output buffer watermark) - 每個接收stage

這種精度的watermark能夠更好的描述系統內部狀態。能夠更簡單的跟蹤資料在系統各個buffer中的流轉狀態,有助於排查資料堵塞問題。

1. 程式結構

Flink程式像常規的程式一樣對資料集合進行轉換操作,每個程式由下面幾部分組成:

  1. 獲取一個執行環境
  2. 載入/建立初始化資料
  3. 指定對於資料的transformations操作
  4. 指定計算的輸出結果(列印或者輸出到檔案)
  5. 觸發程式執行

flink流式計算的核心概念,就是將資料從輸入流一個個傳遞給Operator進行鏈式處理,最後交給輸出流的過程。對資料的每一次處理在邏輯上成為一個operator,並且為了本地化處理的效率起見,operator之間也可以串成一個chain一起處理。

下面這張圖表明瞭flink是如何看待使用者的處理流程的:使用者操作被抽象化為一系列operator。以source開始,以sink結尾,中間的operator做的操作叫做transform,並且可以把幾個操作串在一起執行。

Source ---> Transformation ----> Transformation ----> Sink

以下是一個樣例程式碼,後續的分析會基於此程式碼

DataStream<String> text = env.socketTextStream(hostname, port);

DataStream counts = text
    .filter(new FilterClass())
    .map(new LineSplitter())
    .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator) 
    .keyBy(0)
    .timeWindow(Time.seconds(10))
    .sum(2)
  
counts.print()  
System.out.println(env.getExecutionPlan());

2. 核心類/介面

在使用者設計程式時候,對應如下核心類/介面:

  • DataStream:描述的是一個具有相同資料型別的資料流,底層是通過具體的Transformation來實現,其負責提供各種對流上的資料進行操作轉換的API介面。
  • Transformation:描述了構建一個DataStream的操作,以及該操作的並行度、輸出資料型別等資訊,並有一個屬性,用來持有StreamOperator的一個具體例項;

上述程式碼邏輯中,對資料流做了如下操作:filter, map, keyBy, assignTimestampsAndWatermarks, timeWindow, sum。每次轉換都生成了一個新的DataStream

比如例項程式碼中的timeWindow最後生成了windowedStream。windowedStream之上執行的apply方法會生成了WindowOperator,初始化時包含了trigger以及allowedLateness的值。然後經過transform轉換,實際上是執行了DataStream中的transform方法,最後生成了SingleOutputStreamOperator。SingleOutputStreamOperator這個類名字有點誤導,實際上它是DataStream的子類

    public <R> SingleOutputStreamOperator<R> apply(WindowFunction<T, R, K, W> function, TypeInformation<R> resultType) {
        KeySelector<T, K> keySel = input.getKeySelector(); //根據keyedStream獲取key
        WindowOperator<K, T, Iterable<T>, R, W> operator;
        operator =  new WindowOperator<>(windowAssigner, ... ,
                            new InternalIterableWindowFunction<>(function),
                          trigger,
                          allowedLateness,
                          legacyWindowOpType);
        return input.transform(opName, resultType, operator);//根據operator name,視窗函式的型別,以及window operator,執行keyedStream.transaform操作
    }

Flink 中的執行圖可以分成四層:StreamGraph ---> JobGraph ---> ExecutionGraph -> 物理執行圖。

  • StreamGraph:是對使用者邏輯的對映,代表程式的拓撲結構,是根據使用者通過 Stream API 編寫的程式碼生成的最初的圖。
  • JobGraph:StreamGraph經過優化後生成了 JobGraph,提交給 JobManager 的資料結構。主要的優化為,將多個符合條件的節點 chain 在一起作為一個節點,這樣可以減少資料在節點之間流動所需要的序列化/反序列化/傳輸消耗。
  • ExecutionGraph:JobManager 根據 JobGraph 生成ExecutionGraph。ExecutionGraph是JobGraph的並行化版本,是排程層最核心的資料結構。
  • 物理執行圖:JobManager 根據 ExecutionGraph 對 Job 進行排程後,在各個TaskManager 上部署 Task 後形成的“圖”,並不是一個具體的資料結構。

我們這裡重點看StreamGraph,其相關重點資料結構是:

  • StreamNode 是用來描述 operator 的邏輯節點,並具有所有相關的屬性,如併發度、入邊和出邊等。
  • StreamEdge 是用來描述兩個 StreamNode(operator) 邏輯的連結邊。

我們可以直接列印 Execution Plan

System.out.println(env.getExecutionPlan());

其內部呼叫 StreamExecutionEnvironment.getExecutionPlan 得到 StreamGraph。

public String getExecutionPlan() {
        return getStreamGraph(DEFAULT_JOB_NAME, false).getStreamingPlanAsJSON();
}

StreamGraph的轉換流是:

* Source --> Filter --> Map --> Timestamps/Watermarks --> Window(SumAggregator) --> Sink

下面是我把 示例程式碼 列印StreamGraph結果整理出來一個靜態架構。可以看出程式碼中的轉換被翻譯成了如下執行Unit(在下面圖中,其執行序列是由上而下)。

*        +-----> Data Source(ID = 1) [ Source Socket Stream ]  
*        |      // env.socketTextStream(hostname, port) 方法中生成了一個 Data Source
*        |      
*        +-----> Operator(ID = 2) [ Filter ]
*        | 
*        |      
*        +-----> Operator(ID = 3) [ Map ]
*        | 
*        |      
*        +-----> Operator(ID = 4) [ Timestamps/Watermarks ]
*        | 
*        |      
*        +-----> Operator(ID = 6) [ Window(SumAggregator) ]
*        |       // 多個Operator被構建成 Operator Chain
*        | 
*        |      
*        +-----> Data Sink(ID = 7) [ Sink : Print to Std. Out ] 
*                // counts.print() 是在資料流最後添加了個 Data Sink,用於承接統計結果   

示例程式碼中,Flink生成StreamGraph的大致處理流程是:

  • 首先處理的Source,生成了SourceStreamNode
  • 處理Filter,生成了FilterStreamNode,並生成StreamEdge連線上游SourceFilter
  • 處理Map,生成了MapStreamNode,並生成StreamEdge連線上游FilterMap
  • 處理assignTimestampsAndWatermarks,生成了Timestamps/WatermarksStreamNode,並生成StreamEdge連線上游MapTimestamps/Watermarks
  • 處理keyBy/timeWindow/sum,生成了WindowStreamNode 以及 Operator Chain,並生成StreamEdge連線上游Timestamps/WatermarksWindow
  • 最後處理Sink,建立SinkStreamNode,並生成StreamEdge與上游Window相連。

0x05. 執行模組生命週期

這裡主要核心類是:

  • Function:使用者通過繼承該介面的不同子類來實現使用者自己的資料處理邏輯。如子類SocketTextStreamFunction實現從指定hostname和port來接收資料,並轉發字串的邏輯;

  • Task: 是Flink中執行的基本單位,代表一個 TaskManager 中所起的並行子任務,執行封裝的 flink 運算元並執行,提供以下服務:消費輸入data、生產 IntermediateResultPartition [ flink關於中間結果的抽象 ]、與 JobManager 互動。

  • StreamTask : 是本地執行的基本單位,由TaskManagers部署執行。包含了多個StreamOperator,封裝了運算元的處理邏輯。

  • StreamOperator:DataStream 上的每一個 Transformation 都對應了一個 StreamOperator,StreamOperator是執行時的具體實現,會決定UDF(User-Defined Funtion)的呼叫方式。

  • StreamSource 是StreamOperator介面的一個具體實現類,其建構函式入參就是SourceFunction的子類,這裡就是SocketTextStreamFunction的例項。

Task 是直接受 TaskManager 管理和排程的,而 Task 又會呼叫 StreamTask(主要是其各種子類),StreamTask 中封裝了運算元(StreamOperator)的處理邏輯。StreamSource是用來開啟整個流的運算元。我們接下來就說說動態邏輯。

我們的示例程式碼中,所有程式邏輯都是執行在StreamTask(主要是其各種子類)中,filter/map對應了StreamOperator;assignTimestampsAndWatermarks用來生成Watermarks,傳遞給下游的.keyBy.timeWindow(WindowOperator)。而keyBy/timeWindow/sum又被構建成OperatorChain。所以我們下面就逐一講解這些概念。

1. Task

Task,它是線上程中執行的Runable物件,每個Task都是由一組Operators Chaining在一起的工作集合,Flink Job的執行過程可看作一張DAG圖,Task是DAG圖上的頂點(Vertex),頂點之間通過資料傳遞方式相互連結構成整個Job的Execution Graph。

Task 是直接受 TaskManager 管理和排程的,Flink最後通過RPC方法提交task,實際會呼叫到TaskExecutor.submitTask方法中。這個方法會建立真正的Task,然後呼叫task.startTaskThread();開始task的執行。而startTaskThread方法,則會執行executingThread.start,從而呼叫Task.run方法。
它的最核心的程式碼如下:

 * public class Task implements Runnable...
 * The Task represents one execution of a parallel subtask on a TaskManager.
 * A Task wraps a Flink operator (which may be a user function) and runs it
 *
 *  -- doRun()
 *        |
 *        +----> 從 NetworkEnvironment 中申請 BufferPool
 *        |      包括 InputGate 的接收 pool 以及 task 的每個 ResultPartition 的輸出 pool
 *        +----> invokable = loadAndInstantiateInvokable(userCodeClassLoader, 
 *        |                  nameOfInvokableClass) 通過反射建立
 *        |      load and instantiate the task's invokable code
 *        |      invokable即為operator物件例項,例如OneInputStreamTask,SourceStreamTask等
 *        |      OneInputStreamTask繼承了StreamTask,這裡實際呼叫的invoke()方法是StreamTask裡的
 *        +----> invokable.invoke()
 *        |      run the invokable, 
 *        |      
 *        |        
 * OneInputStreamTask<IN,OUT> extends StreamTask<OUT,OneInputStreamOperator<IN, OUT>>    

這個nameOfInvokableClass是哪裡生成的呢?其實早在生成StreamGraph的時候,這就已經確定了,見StreamGraph.addOperator方法

        if (operatorObject instanceof StoppableStreamSource) {
            addNode(vertexID, slotSharingGroup, StoppableSourceStreamTask.class, operatorObject, operatorName);
        } else if (operatorObject instanceof StreamSource) {
            addNode(vertexID, slotSharingGroup, SourceStreamTask.class, operatorObject, operatorName);
        } else {
            addNode(vertexID, slotSharingGroup, OneInputStreamTask.class, operatorObject, operatorName);
        }

這裡的OneInputStreamTask.class即為生成的StreamNode的vertexClass。這個值會一直傳遞

StreamGraph --> JobVertex.invokableClass --> ExecutionJobVertex.TaskInformation.invokableClassName --> Task

2. StreamTask

是本地執行的基本單位,由TaskManagers部署執行,Task會呼叫 StreamTask。StreamTask包含了headOperator 和 operatorChain,封裝了運算元的處理邏輯。可以理解為,StreamTask是執行流程框架,OperatorChain(StreamOperator)是負責具體運算元邏輯,嵌入到StreamTask的執行流程框架中

直接從StreamTask的註釋中,能看到StreamTask的生命週期。

其中,每個operator的open()方法都被StreamTaskopenAllOperators()方法呼叫。該方法(指openAllOperators)執行所有的operational的初始化,例如使用定時器服務註冊定時器。單個task可能正在執行多個operator,消耗其前驅的輸出,在這種情況下,該open()方法在最後一個operator中呼叫,這個operator的輸出也是task本身的輸出。這樣做使得當第一個operator開始處理任務的輸入時,它的所有下游operator都準備好接收其輸出。

OperatorChain是在StreamTask的invoke方法中被建立的,在執行的時候,如果一個operator無法被chain起來,那它就只有headOperator,chain裡就沒有其他operator了。

注意: task中的連續operator是從最後到第一個依次open。

以OneInputStreamTask為例,Task的核心執行程式碼即為OneInputStreamTask.invoke方法,它會呼叫StreamTask.invoke方法。

 * The life cycle of the task(StreamTask) is set up as follows:
 * {@code
 *  -- setInitialState -> provides state of all operators in the chain
 *        |   
 *        +----> 重新初始化task的state,並且在如下兩種情況下尤為重要:
 *        |      1. 當任務從故障中恢復並從最後一個成功的checkpoint點重新啟動時
 *        |      2. 從一個儲存點恢復時。
 *  -- invoke()
 *        |
 *        +----> Create basic utils (config, etc) and load the chain of operators
 *        +----> operators.setup() //建立 operatorChain 並設定為 headOperator 的 Output
 *        --------> openAllOperators()
 *        +----> task specific init()
 *        +----> initialize-operator-states()
 *        +----> open-operators() //執行 operatorChain 中所有 operator 的 open 方法
 *        +----> run() //runMailboxLoop()方法將一直執行,直到沒有更多的輸入資料
 *        --------> mailboxProcessor.runMailboxLoop();
 *        --------> StreamTask.processInput()
 *        --------> StreamTask.inputProcessor.processInput()   
 *        --------> 間接呼叫 operator的processElement()和processWatermark()方法
 *        +----> close-operators() //執行 operatorChain 中所有 operator 的 close 方法
 *        +----> dispose-operators()
 *        +----> common cleanup
 *        +----> task specific cleanup()
 * }

3. OneInputStreamTask

OneInputStreamTask是 StreamTask 的實現類之一,具有代表性。我們示例程式碼中基本都是由OneInputStreamTask來做具體執行。

看看OneInputStreamTask 是如何生成的?

 * 生成StreamNode時候
 *
 *  -- StreamGraph.addOperator()
 *        |   
 *        +----> addNode(... OneInputStreamTask.class, operatorObject, operatorName);
 *        |      將 OneInputStreamTask 等 StreamTask 設定到 StreamNode 的節點屬性中
 *     
 *  
 * 在 JobVertex 的節點構造時也會做一次初始化
 *        |      
 *        +----> jobVertex.setInvokableClass(streamNode.getJobVertexClass());   

後續在 TaskDeploymentDescriptor 例項化的時候會獲取 jobVertex 中的屬性。

再看看OneInputStreamTask 的 init() 和run() 分別都做了什麼

 * OneInputStreamTask
 * class OneInputStreamTask<IN,OUT> extends StreamTask<OUT,OneInputStreamOperator<IN, OUT>>  * {@code
 *  -- init方法
 *        |
 *        +----> 獲取運算元對應的輸入序列化器 TypeSerializer
 *        +----> CheckpointedInputGate inputGate = createCheckpointedInputGate();
 *               獲取輸入資料 InputGate[],InputGate 是 flink 網路傳輸的核心抽象之一
 *               其在內部封裝了訊息的接收和記憶體的管理,從 InputGate 可以拿到上游傳送過來的資料
 *        +----> inputProcessor = new StreamOneInputProcessor<>(input,output,operatorChain) 
 *        |      1. StreamInputProcessor,是 StreamTask 內部用來處理 Record 的元件,  
 *        |      裡面封裝了外部 IO 邏輯【記憶體不夠時將 buffer 吐到磁碟上】以及 時間對齊邏輯【Watermark】
 *        |      2. output 是 StreamTaskNetworkOutput, input是StreamTaskNetworkInput
 *        |      這樣就把input, output 他倆聚合進StreamOneInputProcessor
 *        +----> headOperator.getMetricGroup().gauge 
 *        +----> getEnvironment().getMetricGroup().gauge 
 *               設定一些 metrics 及 累加器
 * 
 * 
 *  -- run方法(就是基類StreamTask.run)
 *        +----> StreamTask.runMailboxLoop
 *        |      從 StreamTask.runMailboxLoop 開始,下面是一層層的呼叫關係
 *        -----> StreamTask.processInput()
 *        -----> StreamTask.inputProcessor.processInput()
 *        -----> StreamOneInputProcessor.processInput
 *        -----> input.emitNext(output) 
 *        -----> StreamTaskNetworkInput.emitNext()
 *        |      while(true) {從輸入source讀取一個record, output是 StreamTaskNetworkOutput}
 *        -----> StreamTaskNetworkInput.processElement()  //具體處理record
 *        |      根據StreamElement的不同型別做不同處理
 *        |      if (recordOrMark.isRecord()) output.emitRecord()
 *        ------------> StreamTaskNetworkOutput.emitRecord()  
 *        ----------------> operator.processElement(record)   
 *        |      if (recordOrMark.isWatermark()) statusWatermarkValve.inputWatermark()
 *        |      if (recordOrMark.isLatencyMarker()) output.emitLatencyMarker()
 *        |      if (recordOrMark.isStreamStatus()) statusWatermarkValve.inputStreamStatus()   

4. OperatorChain

flink 中的一個 operator 代表一個最頂級的 api 介面,拿 streaming 來說就是,在 DataStream 上做諸如 map/reduce/keyBy 等操作均會生成一個運算元。

Operator Chain是指在生成JobGraph階段,將Job中的Operators按照一定策略(例如:single output operator可以chain在一起)連結起來並放置在一個Task執行緒中執行。減少了資料傳遞/執行緒切換等環節,降低系統開銷的同時增加了資源利用率和Job效能。

chained operators實際上是從下游往上游去反向一個個建立和setup的。假設chained operators為:StreamGroupedReduce - StreamFilter - StreamSink,而實際初始化順序則相反:StreamSink - StreamFilter - StreamGroupedReduce

 * OperatorChain(
 *          StreamTask<OUT, OP> containingTask,
 *          RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriterDelegate)
 * {@code
 *  -- collect
 *        |
 *        +----> pushToOperator(StreamRecord<X> record)
 *        +---------> operator.processElement(castRecord); 
 *        //這裡的operator是chainedOperator,即除了headOperator之外,剩餘的operators的chain。
 *        //這個operator.processElement,會迴圈呼叫operator chain所有operator,直到chain end。
 *        //比如 Operator A 對應的 ChainingOutput collect 呼叫了對應的運算元 A 的 processElement 方法,這裡又會呼叫 B 的 ChainingOutput 的 collect 方法,以此類推。這樣便實現了可 chain 運算元的本地處理,最終經由網路輸出 RecordWriterOutput 傳送到下游節點。   

5. StreamOperator

StreamTask會呼叫Operator,所以我們需要看看Operator的生命週期。

邏輯運算元Transformation最後會對應到物理運算元Operator,這個概念對應的就是StreamOperator

StreamOperator是根介面。對於 Streaming 來說所有的運算元都繼承自 StreamOperator。繼承了StreamOperator的擴充套件介面則有OneInputStreamOperator,TwoInputStreamOperator。實現了StreamOperator的抽象類有AbstractStreamOperator以及它的子類AbstractStreamUdfOperator。

其中operator處理輸入的資料(elements)可以是以下之一:input element,watermark和checkpoint barriers。他們中的每一個都有一個特殊的單元來處理。element由processElement()方法處理,watermark由processWatermark()處理,checkpoint barriers由非同步呼叫的snapshotState()方法處理,此方法會觸發一次checkpoint 。

processElement()方法也是UDF的邏輯被呼叫的地方,例如MapFunction裡的map()方法。

 * AbstractUdfStreamOperator, which is the basic class for all operators that execute UDFs.
 * 
 *         // initialization phase
 *         //初始化operator-specific方法,如RuntimeContext和metric collection
 *         OPERATOR::setup 
 *             UDF::setRuntimeContext
 *         //setup的呼叫鏈是invoke(StreamTask) -> constructor(OperatorChain) -> setup
 *         //呼叫setup時,StreamTask已經在各個TaskManager節點上 
 *         //給出一個用來初始state的operator   
 *  
 *         OPERATOR::initializeState
 *         //執行所有operator-specific的初始化  
 *         OPERATOR::open
 *            UDF::open
 *         
 *         // processing phase (called on every element/watermark)
 *         OPERATOR::processElement
 *             UDF::run //給定一個operator可以有一個使用者定義的函式(UDF)
 *         OPERATOR::processWatermark
 *         
 *         // checkpointing phase (called asynchronously on every checkpoint)
 *         OPERATOR::snapshotState
 *                 
 *         // termination phase
 *         OPERATOR::close
 *             UDF::close
 *         OPERATOR::dispose

OneInputStreamOperator與TwoInputStreamOperator介面。這兩個介面非常類似,本質上就是處理流上存在的三種元素StreamRecord,Watermark和LatencyMarker。一個用作單流輸入,一個用作雙流輸入。

6. StreamSource

StreamSource是用來開啟整個流的運算元(繼承AbstractUdfStreamOperator)。StreamSource因為沒有輸入,所以沒有實現InputStreamOperator的介面。比較特殊的是ChainingStrategy初始化為HEAD。

在StreamSource這個類中,在執行時由SourceStreamTask呼叫SourceFunction的run方法來啟動source。

 * class StreamSource<OUT, SRC extends SourceFunction<OUT>>
 *      extends AbstractUdfStreamOperator<OUT, SRC> implements StreamOperator<OUT> 
 * 
 *
 *  -- run()
 *        |   
 *        +----> latencyEmitter = new LatencyMarksEmitter
 *        |      用來產生延遲監控的LatencyMarker
 *        +----> this.ctx = StreamSourceContexts.getSourceContext
 *        |      據時間模式(EventTime/IngestionTime/ProcessingTime)生成相應SourceConext  
 *        |      包含了產生element關聯的timestamp的方法和生成watermark的方法
 *        +----> userFunction.run(ctx);
 *        |      呼叫SourceFunction的run方法來啟動source,進行資料的轉發
 *        
public {
            //讀到資料後,把資料交給collect方法,collect方法負責把資料交到合適的位置(如釋出為br變數,或者交給下個operator,或者通過網路發出去)
    private transient SourceFunction.SourceContext<OUT> ctx;
    private transient volatile boolean canceledOrStopped = false;
    private transient volatile boolean hasSentMaxWatermark = false;
  
    public void run(final Object lockingObject,
            final StreamStatusMaintainer streamStatusMaintainer,
            final Output<StreamRecord<OUT>> collector,
            final OperatorChain<?, ?> operatorChain) throws Exception {
            userFunction.run(ctx);    
  }
}

7. StreamMap

StreamFilter,StreamMap與StreamFlatMap運算元在實現的processElement分別呼叫傳入的FilterFunction,MapFunction, FlatMapFunction的udf將element傳到下游。這裡用StreamMap舉例:

public class StreamMap<IN, OUT>
        extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>>
        implements OneInputStreamOperator<IN, OUT> {

  public StreamMap(MapFunction<IN, OUT> mapper) {
        super(mapper);
        chainingStrategy = ChainingStrategy.ALWAYS;
    }

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        output.collect(element.replace(userFunction.map(element.getValue())));
    }
}

8. WindowOperator

Flink通過水位線分配器(TimestampsAndPeriodicWatermarksOperator和TimestampsAndPunctuatedWatermarksOperator這兩個運算元)向事件流中注入水位線。

我們示例程式碼中,timeWindow()最終對應了WindowStream,視窗運算元WindowOperator是視窗機制的底層實現。assignTimestampsAndWatermarks 則對應了TimestampsAndPeriodicWatermarksOperator運算元,它把產生的Watermark傳遞給了WindowOperator。

元素在streaming dataflow引擎中流動到WindowOperator時,會被分為兩撥,分別是普通事件和水位線。

  • 如果是普通的事件,則會呼叫processElement方法進行處理,在processElement方法中,首先會利用視窗分配器為當前接收到的元素分配視窗,接著會呼叫觸發器的onElement方法進行逐元素觸發。對於時間相關的觸發器,通常會註冊事件時間或者處理時間定時器,這些定時器會被儲存在WindowOperator的處理時間定時器佇列和水位線定時器佇列中,如果觸發的結果是FIRE,則對視窗進行計算。

  • 如果是水位線(事件時間場景),則方法processWatermark將會被呼叫,它將會處理水位線定時器佇列中的定時器。如果時間戳滿足條件,則利用觸發器的onEventTime方法進行處理。

而對於處理時間的場景,WindowOperator將自身實現為一個基於處理時間的觸發器,以觸發trigger方法來消費處理時間定時器佇列中的定時器滿足條件則會呼叫視窗觸發器的onProcessingTime,根據觸發結果判斷是否對視窗進行計算。

 * public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 *  extends AbstractUdfStreamOperator<OUT, InternalWindowFunction<ACC, OUT, K, W>>
 *  implements OneInputStreamOperator<IN, OUT>, Triggerable<K, W> 
 *
 *  -- processElement()
 *        |   
 *        +----> windowAssigner.assignWindows
 *        |      //通過WindowAssigner為element分配一系列windows
 *        +----> windowState.add(element.getValue())
 *        |      //把當前的element加入buffer state 
 *        +----> TriggerResult triggerResult = triggerContext.onElement(element)
 *        |      //觸發onElment,得到triggerResult
 *        +----> Trigger.OnMergeContext.onElement()
 *        +----> trigger.onElement(element.getValue(), element.getTimestamp(), window,...)
 *        +----> EventTimeTriggers.onElement()
 *        |      //如果當前window.maxTimestamp已經小於CurrentWatermark,直接觸發  
 *        |      //否則將window.maxTimestamp註冊到TimeService中,等待觸發   
 *        +----> contents = windowState.get(); emitWindowContents(actualWindow, contents)
 *        |      //對triggerResult做各種處理,如果fire,真正去計算視窗中的elements
   
 *  -- processWatermark()   
 *        -----> 最終進入基類AbstractStreamOperator.processWatermark
 *        -----> AbstractStreamOperator.processWatermark(watermark) 
 *        -----> timeServiceManager.advanceWatermark(mark); 第一步處理watermark
 *        -----> output.emitWatermark(mark) 第二步將watermark傳送到下游
 *        -----> InternalTimeServiceManager.advanceWatermark      

0x06. 處理 Watermark 的簡要流程

最後是處理 Watermark 的簡要流程(OneInputStreamTask為例)

 *  -- OneInputStreamTask.invoke()
 *        |   
 *        +----> StreamTask.init 
 *        |      把StreamTaskNetworkOutput/StreamTaskNetworkInput聚合StreamOneInputProcessor
 *        +----> StreamTask.runMailboxLoop
 *        |      從 StreamTask.runMailboxLoop 開始,下面是一層層的呼叫關係
 *        -----> StreamTask.processInput()
 *        -----> StreamTask.inputProcessor.processInput()
 *        -----> StreamOneInputProcessor.processInput
 *        -----> input.emitNext(output)
 *        -----> StreamTaskNetworkInput.emitNext()
 *        -----> StreamTaskNetworkInput.processElement()
   
   
 *  下面是處理普通 Record  
 *  -- StreamTaskNetworkInput.processElement()  
 *        |   
 *        | 下面都是一層層的呼叫關係
 *        -----> output.emitRecord(recordOrMark.asRecord())
 *        -----> StreamTaskNetworkOutput.emitRecord()
 *        -----> operator.processElement(record)
 *               進入具體運算元 processElement 的處理,比如StreamFlatMap.processElement
 *        -----> StreamFlatMap.processElement(record)
 *        -----> userFunction.flatMap()
    
   
 *  -- 下面是處理 Watermark
 *  -- StreamTaskNetworkInput.processElement()  
 *        |   
 *        | 下面都是一層層的呼叫關係
 *        -----> StatusWatermarkValve.inputWatermark()
 *        -----> StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels()
 *        -----> output.emitWatermark()
 *        -----> StreamTaskNetworkOutput.emitWatermark()
 *        -----> operator.processWatermark(watermark) 
 *        -----> KeyedProcessOperator.processWatermark(watermark) 
 *               具體運算元processWatermark處理,如WindowOperator/KeyedProcessOperator.processWatermark 
 *               最終進入基類AbstractStreamOperator.processWatermark
 *        -----> AbstractStreamOperator.processWatermark(watermark) 
 *        -----> timeServiceManager.advanceWatermark(mark); 第一步處理watermark
 *               output.emitWatermark(mark) 第二步將watermark傳送到下游
 *        -----> InternalTimeServiceManager.advanceWatermark   
 *        -----> 下面看看第一步處理watermark  
 *        -----> InternalTimerServiceImpl.advanceWatermark   
 *               邏輯timer時間小於watermark的都應該被觸發回撥。從eventTimeTimersQueue從小到大取timer,如果小於傳入的water mark,那麼說明這個window需要觸發。注意watermarker是沒有key的,所以當一個watermark來的時候是會觸發所有timer,而timer的key是不一定的,所以這裡一定要設定keyContext,否則就亂了
 *        -----> triggerTarget.onEventTime(timer);
 *               triggerTarget是具體operator物件,open時通過InternalTimeServiceManager.getInternalTimerService傳遞到HeapInternalTimerService  
 *        -----> KeyedProcessOperator.onEeventTime()
 *               呼叫使用者實現的keyedProcessFunction.onTimer去做具體事情。對於window來說也是呼叫onEventTime或者onProcessTime來從key和window對應的狀態中的資料傳送到windowFunction中去計算併發送到下游節點  
 *        -----> invokeUserFunction(TimeDomain.PROCESSING_TIME, timer);
 *        -----> userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
 
   
 *  -- DataStream 設定定時傳送Watermark,是加了個chain的TimestampsAndPeriodicWatermarksOperator
 *  -- StreamTaskNetworkInput.processElement()        
 *        -----> TimestampsAndPeriodicWatermarksOperator.processElement
 *               會呼叫AssignerWithPeriodicWatermarks.extractTimestamp提取event time
 *               然後更新StreamRecord的時間
 *        -----> WindowOperator.processElement
 *               在windowAssigner.assignWindows時以element的timestamp作為assign時間

0x07 處理 Watermark 的詳細流程(原始碼分析)

下面程式碼分析略冗長。

我們再看看樣例程式碼

DataStream<String> text = env.socketTextStream(hostname, port);

DataStream counts = text
    .filter(new FilterClass())
    .map(new LineSplitter())
    .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessGenerator) 
    .keyBy(0)
    .timeWindow(Time.seconds(10))
    .sum(2)
  
counts.print()  
System.out.println(env.getExecutionPlan());

1. 程式邏輯 DataStream & Transformation

首先看看邏輯API。

DataStream是資料流概念。A DataStream represents a stream of elements of the same type。

Transformation是一個邏輯API概念。Transformation代表了流的轉換,將一個或多個DataStream轉換為新的DataStream。A Transformation is applied on one or more data streams or data sets and results in one or more output data streams or data sets。

我們認為Transformation就是邏輯運算元,而 Transformation 對應的物理概念是Operators。

DataStream類在內部組合了一個 Transformation類,實際的轉換操作均通過該類完成,描述了這個DataStream是怎麼來的。

針對示例程式碼,"assignTimestampsAndWatermarks","Filter","Map"這幾種,都被轉換為 SingleOutputStreamOperator,繼續由使用者進行邏輯處理。SingleOutputStreamOperator這個類名字有點誤導,實際上它是DataStream的子類

@Public
public class DataStream<T> {
    protected final StreamExecutionEnvironment environment;
    protected final Transformation<T> transformation;  
    
  //assignTimestampsAndWatermarks這個操作實際上也生成了一個SingleOutputStreamOperator運算元
    public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(
            AssignerWithPeriodicWatermarks<T> timestampAndWatermarkAssigner) {

        final int inputParallelism = getTransformation().getParallelism();
        final AssignerWithPeriodicWatermarks<T> cleanedAssigner = clean(timestampAndWatermarkAssigner);

        TimestampsAndPeriodicWatermarksOperator<T> operator =
                new TimestampsAndPeriodicWatermarksOperator<>(cleanedAssigner);

        return transform("Timestamps/Watermarks", getTransformation().getOutputType(), operator)
                .setParallelism(inputParallelism);
    }

  //Map是一個OneInputStreamOperator運算元。
    public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper, TypeInformation<R> outputType) {
        return transform("Map", outputType, new StreamMap<>(clean(mapper)));
    }

    @PublicEvolving
    public <R> SingleOutputStreamOperator<R> transform(
            String operatorName,
            TypeInformation<R> outTypeInfo,
            OneInputStreamOperatorFactory<T, R> operatorFactory) {
        return doTransform(operatorName, outTypeInfo, operatorFactory);
    }

    protected <R> SingleOutputStreamOperator<R> doTransform(
            String operatorName,
            TypeInformation<R> outTypeInfo,
            StreamOperatorFactory<R> operatorFactory) {

        // read the output type of the input Transform to coax out errors about MissingTypeInfo
        transformation.getOutputType();

        OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
                this.transformation,
                operatorName,
                operatorFactory,
                outTypeInfo,
                environment.getParallelism());

    // SingleOutputStreamOperator 實際上是 DataStream 的子類,名字裡面有Operator容易誤導大家。
        @SuppressWarnings({"unchecked", "rawtypes"})
        SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);

        //就是把Transformation加到執行環境上去。
        getExecutionEnvironment().addOperator(resultTransform); 
        return returnStream;
    }     
}

針對示例程式碼,絕大多數邏輯運算元都轉換為OneInputTransformation,每個Transformation裡面間接記錄了對應的物理Operator。註冊到Env上。

// OneInputTransformation對應了單輸入的運算元
@Internal
public class OneInputTransformation<IN, OUT> extends PhysicalTransformation<OUT> {
    private final Transformation<IN> input;
    private final StreamOperatorFactory<OUT> operatorFactory; // 這裡間接記錄了本Transformation對應的物理Operator。比如StreamMap。
    private KeySelector<IN, ?> stateKeySelector;
    private TypeInformation<?> stateKeyType;
  
    public OneInputTransformation(
            Transformation<IN> input,
            String name,
            OneInputStreamOperator<IN, OUT> operator, // 比如StreamMap
            TypeInformation<OUT> outputType,
            int parallelism) {
        this(input, name, SimpleOperatorFactory.of(operator), outputType, parallelism);
    }  
}   

回到樣例程式碼,DataStream.keyBy會返回一個KeyedStream。KeyedStream. timeWindow會返回一個WindowedStream。同時內部把各種 Transformation 註冊到了 Env 中。

WindowedStream內部對應WindowedOperator。WindowedStream卻不是Stream的子類! 而是把 KeyedStream 包含在內作為一個成員變數。

// 這個居然不是Stream的子類! 而是把 KeyedStream 包含在內作為一個成員變數。
@Public
public class WindowedStream<T, K, W extends Window> {
    private final KeyedStream<T, K> input; // 這裡包含了DataStream。
    private final WindowAssigner<? super T, W> windowAssigner;
    private Trigger<? super T, ? super W> trigger;
    private Evictor<? super T, ? super W> evictor;
    private long allowedLateness = 0L;

  // reduce, fold等函式也是類似操作。
  private <R> SingleOutputStreamOperator<R> apply(InternalWindowFunction<Iterable<T>, R, K, W> function, TypeInformation<R> resultType, Function originalFunction) {

        final String opName = generateOperatorName(windowAssigner, trigger, evictor, originalFunction, null);
        KeySelector<T, K> keySel = input.getKeySelector();
        WindowOperator<K, T, Iterable<T>, R, W> operator;

        ListStateDescriptor<T> stateDesc = new ListStateDescriptor<>("window-contents",
                input.getType().createSerializer(getExecutionEnvironment().getConfig()));

      // 這裡直接生成了 WindowOperator
            operator =
                new WindowOperator<>(windowAssigner,
                    windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()),
                    keySel,
                    input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()),
                    stateDesc,
                    function,
                    trigger,
                    allowedLateness,
                    lateDataOutputTag);
        }

        return input.transform(opName, resultType, operator);
}

在生成了程式邏輯之後,Env裡面就有了 一系列 transformation(每個transformation裡面記錄了自己對應的物理 operator,比如StreamMap,WindowOperator),這個是後面生成計算圖的基礎。

當呼叫env.execute時,通過StreamGraphGenerator.generate遍歷其中的transformation集合構造出StreamGraph。

2. 生成計算圖

我們這裡重點介紹StreamGraph以及如何生成,JobGraph,ExecutionGraph只是簡介。

StreamGraph代表程式的拓撲結構,是從使用者程式碼直接生成的圖。StreamOperator是具體的物理運算元。

一個很重要的點是,把 SourceStreamTask / OneInputStreamTask 新增到StreamNode上,作為 jobVertexClass,這個是真實計算的部分。

StreamOperator是一個介面。StreamOperator 是 資料流操作符的基礎介面,該介面的具體實現子類中,會有儲存使用者自定義資料處理邏輯的函式的屬性,負責對userFunction的呼叫,以及呼叫時傳入所需引數,比如在StreamSource這個類中,在呼叫SourceFunction的run方法時,會構建一個SourceContext的具體例項,作為入參,用於run方法中,進行資料的轉發;

StreamOperator

PublicEvolving
public interface StreamOperator<OUT> extends CheckpointListener, KeyContext, Disposable, Serializable {
}

AbstractStreamOperator

AbstractStreamOperator抽象類實現了StreamOperator。在AbstractStreamOperator中有一些重要的成員變數,總體來說可以分為幾類,一類是執行時相關的,一類是狀態相關的,一類是配置相關的,一類是時間相關的,還有一類是監控相關的。

@PublicEvolving
public abstract class AbstractStreamOperator<OUT>
        implements StreamOperator<OUT>, SetupableStreamOperator<OUT>, Serializable {
    protected ChainingStrategy chainingStrategy = ChainingStrategy.HEAD;
    private transient StreamTask<?, ?> container;
    protected transient StreamConfig config;
    protected transient Output<StreamRecord<OUT>> output;
    private transient StreamingRuntimeContext runtimeContext;
    public void processWatermark(Watermark mark) throws Exception {
        if (timeServiceManager != null) {
            timeServiceManager.advanceWatermark(mark); //第一步處理watermark
        }
        output.emitWatermark(mark);//第二步,將watermark傳送到下游
    }  
}

AbstractUdfStreamOperator

AbstractUdfStreamOperator抽象類繼承了AbstractStreamOperator,對其部分方法做了增強,多了一個成員變數UserFunction。提供了一些通用功能,比如把context賦給運算元,儲存快照等等。此外還實現了OutputTypeConfigurable介面的setOutputType方法對輸出資料的型別做了設定。

@PublicEvolving
public abstract class AbstractUdfStreamOperator<OUT, F extends Function>
        extends AbstractStreamOperator<OUT>
        implements OutputTypeConfigurable<OUT> {
    protected final F userFunction;/** The user function. */
}

KeyedProcessOperator & WindowOperator。

KeyedStream,WindowedStream分別對應KeyedProcessOperator,WindowOperator。

@Internal
public class WindowOperator<K, IN, ACC, OUT, W extends Window>
    extends AbstractUdfStreamOperator<OUT, InternalWindowFunction<ACC, OUT, K, W>>
    implements OneInputStreamOperator<IN, OUT>, Triggerable<K, W> {
    protected final WindowAssigner<? super IN, W> windowAssigner;
    private final KeySelector<IN, K> keySelector;
    private final Trigger<? super IN, ? super W> trigger;
    private final StateDescriptor<? extends AppendingState<IN, ACC>, ?> windowStateDescriptor;
    protected final TypeSerializer<K> keySerializer;
    protected final TypeSerializer<W> windowSerializer;  
}

@Internal
public class KeyedProcessOperator<K, IN, OUT>
        extends AbstractUdfStreamOperator<OUT, KeyedProcessFunction<K, IN, OUT>>
        implements OneInputStreamOperator<IN, OUT>, Triggerable<K, VoidNamespace> {
    private transient TimestampedCollector<OUT> collector;
    private transient ContextImpl context;
    private transient OnTimerContextImpl onTimerContext;
  
    @Override
    public void open() throws Exception {
        super.open();
        collector = new TimestampedCollector<>(output);
        InternalTimerService<VoidNamespace> internalTimerService =
                getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);
        TimerService timerService = new SimpleTimerService(internalTimerService);
        context = new ContextImpl(userFunction, timerService);
        onTimerContext = new OnTimerContextImpl(userFunction, timerService);
    }

    @Override
    public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
        collector.setAbsoluteTimestamp(timer.getTimestamp());
        invokeUserFunction(TimeDomain.EVENT_TIME, timer);
    }

    @Override
    public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
        collector.eraseTimestamp();
        invokeUserFunction(TimeDomain.PROCESSING_TIME, timer);
    }

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        collector.setTimestamp(element);
        context.element = element;
        userFunction.processElement(element.getValue(), context, collector);
        context.element = null;
    }

    private void invokeUserFunction(
            TimeDomain timeDomain,
            InternalTimer<K, VoidNamespace> timer) throws Exception {
        onTimerContext.timeDomain = timeDomain;
        onTimerContext.timer = timer;
        userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
        onTimerContext.timeDomain = null;
        onTimerContext.timer = null;
    }  
}

OneInputStreamOperator & TwoInputStreamOperator

承接輸入資料並進行處理的運算元就是OneInputStreamOperator、TwoInputStreamOperator等。 這兩個介面非常類似,本質上就是處理流上存在的三種元素StreamRecord,Watermark和LatencyMarker。一個用作單流輸入,一個用作雙流輸入。除了StreamSource以外的所有Stream運算元都必須實現並且只能實現其中一個介面。

@PublicEvolving
public interface OneInputStreamOperator<IN, OUT> extends StreamOperator<OUT> {
    void processElement(StreamRecord<IN> element) throws Exception;
    void processWatermark(Watermark mark) throws Exception;
    void processLatencyMarker(LatencyMarker latencyMarker) throws Exception;
}

StreamMap & StreamFlatMap

map,filter等常用操作都是OneInputStreamOperator。下面給出StreamMap,StreamFlatMap作為具體例子。

// 用StreamMap裡做個實際運算元的例子@
Internal
public class StreamMap<IN, OUT>
        extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>>
        implements OneInputStreamOperator<IN, OUT> {

    private static final long serialVersionUID = 1L;

    public StreamMap(MapFunction<IN, OUT> mapper) {
        super(mapper);
        chainingStrategy = ChainingStrategy.ALWAYS;
    }

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        output.collect(element.replace(userFunction.map(element.getValue())));
    }
}

// 用StreamFlatMap裡做個實際運算元的例子
@Internal
public class StreamFlatMap<IN, OUT>
        extends AbstractUdfStreamOperator<OUT, FlatMapFunction<IN, OUT>>
        implements OneInputStreamOperator<IN, OUT> {

    private transient TimestampedCollector<OUT> collector;

    public StreamFlatMap(FlatMapFunction<IN, OUT> flatMapper) {
        super(flatMapper);
        chainingStrategy = ChainingStrategy.ALWAYS;
    }

    @Override
    public void open() throws Exception {
        super.open();
        collector = new TimestampedCollector<>(output);
    }

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        collector.setTimestamp(element);
        userFunction.flatMap(element.getValue(), collector);
    }
}

生成StreamGraph

程式執行即env.execute("Java WordCount from SocketTextStream Example")這行程式碼的時候,就會生成StreamGraph。代表程式的拓撲結構,是從使用者程式碼直接生成的圖。

StreamGraph生成函式分析

實際生成StreamGraph的入口是StreamGraphGenerator.generate(env, transformations) 。其中的transformations是一個list,裡面記錄的就是我們在transform方法中放進來的運算元。最終會呼叫 transformXXX 來對具體的Transformation進行轉換。

@Internal
public class StreamGraphGenerator {
    private final List<Transformation<?>> transformations;
    private StreamGraph streamGraph;

    public StreamGraph generate() {
        //注意,StreamGraph的生成是從sink開始的
        streamGraph = new StreamGraph(executionConfig, checkpointConfig, savepointRestoreSettings);

        for (Transformation<?> transformation: transformations) {
            transform(transformation);
        }

        final StreamGraph builtStreamGraph = streamGraph;
        return builtStreamGraph;
    }   

    private Collection<Integer> transform(Transformation<?> transform) {
        //這個方法的核心邏輯就是判斷傳入的steamOperator是哪種型別,並執行相應的操作,詳情見下面那一大堆if-else
        //這裡對操作符的型別進行判斷,並以此呼叫相應的處理邏輯.簡而言之,處理的核心無非是遞迴的將該節點和節點的上游節點加入圖
        Collection<Integer> transformedIds;
        if (transform instanceof OneInputTransformation<?, ?>) {
            transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform);
        } else if (transform instanceof TwoInputTransformation<?, ?, ?>) {
            transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);
    }
        .......
  }

  //因為map,filter等常用操作都是OneInputStreamOperator,我們就來看看StreamGraphGenerator.transformOneInputTransform((OneInputTransformation<?, ?>) transform)方法。
  //該函式首先會對該transform的上游transform進行遞迴轉換,確保上游的都已經完成了轉化。然後通過transform構造出StreamNode,最後與上游的transform進行連線,構造出StreamNode。

    private <IN, OUT> Collection<Integer> transformOneInputTransform(OneInputTransformation<IN, OUT> transform) {
        //就是遞迴處理節點,為當前節點和它的依賴節點建立邊,處理邊之類的,把節點加到圖裡。
        Collection<Integer> inputIds = transform(transform.getInput());
        String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds);
    // 這裡新增Operator到streamGraph上。
        streamGraph.addOperator(transform.getId(),
                slotSharingGroup,
                transform.getCoLocationGroupKey(),
                transform.getOperatorFactory(),
                transform.getInputType(),
                transform.getOutputType(),
                transform.getName());

        if (transform.getStateKeySelector() != null) {
            TypeSerializer<?> keySerializer = transform.getStateKeyType().createSerializer(executionConfig);
            streamGraph.setOneInputStateKey(transform.getId(), transform.getStateKeySelector(), keySerializer);
        }

        int parallelism = transform.getParallelism() != ExecutionConfig.PARALLELISM_DEFAULT ?
            transform.getParallelism() : executionConfig.getParallelism();
        streamGraph.setParallelism(transform.getId(), parallelism);
        streamGraph.setMaxParallelism(transform.getId(), transform.getMaxParallelism());

        for (Integer inputId: inputIds) {
            streamGraph.addEdge(inputId, transform.getId(), 0);
        }
        return Collections.singleton(transform.getId());
    }
}
streamGraph.addOperator

在之前的生成圖程式碼中,有streamGraph.add