1. 程式人生 > >Flink的Job啟動Driver端(原始碼分析)

Flink的Job啟動Driver端(原始碼分析)

整個Flink的Job啟動是通過在Driver端通過使用者的Envirement的execute()方法將使用者的運算元轉化成StreamGraph

然後得到JobGraph通過遠端RPC將這個JobGraph提交到JobManager對應的介面

JobManager轉化成executionGraph.deploy(),然後生成TDD發給TaskManager,然後整個Job就啟動起來了

這裡來看一下Driver端的實現從使用者的Envirement.execute()方法作為入口

這裡的Envirement分為

RemoteStreamEnvironment

LocalStreamEnvironment

因為local模式比較簡單這裡就不展開了,主要是看下RemoteStreamEnvironment的execute方法

可以看到這裡先獲取到了streamGraph,具體獲取的實現

這裡傳入了一個transformations其中就包含了我們使用者的所有operator

這個地方就是遍歷了使用者端所有的operator生成StreamGraph,遍歷的每一個運算元具體轉化成streamGraph的邏輯

1處會遞迴遍歷input直到input已經transfor,然後拿到了上游的ids

然後將operator加入到了streamGraph中呼叫addNode()方法將operator作為一個node,包含了一些資訊,上下游的型別,並行度,soltGroup

最後遍歷上游的ids,建立邊新增到streamGraph

到這裡streamGraph就建立完成了

回到最開始的地方,建立完streamGraph以後,會將streamGraph傳入executeRemotely(streamGraph, jarFiles)這個方法,這裡就是streamGraph轉化成jobgraph的邏輯

其中建立了一個RestClusterClient

可以看到這裡,通過getJobGraph方法將streamGraph轉換成了jobgraph

然後就submitJob將這個JobGraph提交Jobmanager了

先看一下streamGraph如何轉化成jobgraph的

通過getJobGraph方法然後

這個createJobGraph方法是主要的轉化邏輯

廣度優先遍歷為所有streamGraph的node 即operator生成hash雜湊值,為什麼要生成這個operator的hash?

因為這個hash需要作為每一個operator的唯一標示,標示每一個operator用於cp的恢復,當用戶程式碼沒有修改時,這個hash值是不會改變的

接下來

這裡會將flink中上下游的operator操作根據是否滿足chain條件鏈在一起,在createChian中

這個isChainable()方法就是是否可以chain的判斷條件

1.下游的輸入邊只有一條

2.下游操作operator不為空

3.上游操作operator不為空

4.上游必須有相同的solt組

5.下游chain策略為always

6.上游chain策略為head或上游chain策略為always

7.forwardpartition的邊

8.上下游並行度相同

9.使用者程式碼設定的operator是否可以chian

 

將可以chain的streamnode 鏈在一起以後就可以建立成為jobGraph的jobVertex了

然後通過RestClusterClient會將這個jobGraph往jobmanager的Dispatcher對應的RPC介面上面傳送

整個job的啟動Driver端的任務就結束了

 

總結:

  在Driver端使用者的運算元會被建立成為streamGraph,其中包含了一些邊,角,上下游型別,並行度等一些資訊

  然後將streamGraph通過一些chain條件將可以chain的頂點chain在了一起轉化成了JobGraph

  streamEdge變成了jobEdge,chain在一起的streamnode變成了jobVertex

  最後然後通過RPC將整個jobGraph向jobmanager提