Flink 原理與實現:如何生成 JobGraph
轉載來源:http://wuchong.me/blog/2016/05/10/flink-internals-how-to-build-jobgraph/
繼前文Flink 原理與實現:架構和拓撲概覽中介紹了Flink的四層執行圖模型,本文將主要介紹 Flink 是如何將 StreamGraph 轉換成 JobGraph 的。根據使用者用Stream API編寫的程式,構造出一個代表拓撲結構的StreamGraph的。以 WordCount 為例,轉換圖如下圖所示:
StreamGraph 和 JobGraph 都是在 Client 端生成的,也就是說我們可以在 IDE 中通過斷點除錯觀察 StreamGraph 和 JobGraph 的生成過程。
JobGraph 的相關資料結構主要在 org.apache.flink.runtime.jobgraph
包中。構造 JobGraph 的程式碼主要集中在 StreamingJobGraphGenerator
類中,入口函式是 StreamingJobGraphGenerator.createJobGraph()
。我們首先來看下StreamingJobGraphGenerator
的核心原始碼:
public class StreamingJobGraphGenerator { private StreamGraph streamGraph; private JobGraph jobGraph; // id -> JobVertex private Map<Integer, JobVertex> jobVertices; // 已經構建的JobVertex的id集合 private Collection<Integer> builtVertices; // 物理邊集合(排除了chain內部的邊), 按建立順序排序 private List<StreamEdge> physicalEdgesInOrder; // 儲存chain資訊,部署時用來構建 OperatorChain,startNodeId -> (currentNodeId -> StreamConfig) private Map<Integer, Map<Integer, StreamConfig>> chainedConfigs; // 所有節點的配置資訊,id -> StreamConfig private Map<Integer, StreamConfig> vertexConfigs; // 儲存每個節點的名字,id -> chainedName private Map<Integer, String> chainedNames; // 建構函式,入參只有 StreamGraph public StreamingJobGraphGenerator(StreamGraph streamGraph) { this.streamGraph = streamGraph; } // 根據 StreamGraph,生成 JobGraph public JobGraph createJobGraph() { jobGraph = new JobGraph(streamGraph.getJobName()); // streaming 模式下,排程模式是所有節點(vertices)一起啟動 jobGraph.setScheduleMode(ScheduleMode.ALL); // 初始化成員變數 init(); // 廣度優先遍歷 StreamGraph 並且為每個SteamNode生成hash id, // 保證如果提交的拓撲沒有改變,則每次生成的hash都是一樣的 Map<Integer, byte[]> hashes = traverseStreamGraphAndGenerateHashes(); // 最重要的函式,生成JobVertex,JobEdge等,並儘可能地將多個節點chain在一起 setChaining(hashes); // 將每個JobVertex的入邊集合也序列化到該JobVertex的StreamConfig中 // (出邊集合已經在setChaining的時候寫入了) setPhysicalEdges(); // 根據group name,為每個 JobVertex 指定所屬的 SlotSharingGroup // 以及針對 Iteration的頭尾設定 CoLocationGroup setSlotSharing(); // 配置checkpoint configureCheckpointing(); // 配置重啟策略(不重啟,還是固定延遲重啟) configureRestartStrategy(); try { // 將 StreamGraph 的 ExecutionConfig 序列化到 JobGraph 的配置中 InstantiationUtil.writeObjectToConfig(this.streamGraph.getExecutionConfig(), this.jobGraph.getJobConfiguration(), ExecutionConfig.CONFIG_KEY); } catch (IOException e) { throw new RuntimeException("Config object could not be written to Job Configuration: ", e); } return jobGraph; } ... } |
StreamingJobGraphGenerator
的成員變數都是為了輔助生成最終的JobGraph。createJobGraph()
函式的邏輯也很清晰,首先為所有節點生成一個唯一的hash id,如果節點在多次提交中沒有改變(包括併發度、上下游等),那麼這個id就不會改變,這主要用於故障恢復。這裡我們不能用 StreamNode.id
來代替,因為這是一個從1開始的靜態計數變數,同樣的Job可能會得到不一樣的id,如下程式碼示例的兩個job是完全一樣的,但是source的id卻不一樣了。然後就是最關鍵的chaining處理,和生成JobVetex、JobEdge等。之後就是寫入各種配置相關的資訊。
// 範例1:A.id=1 B.id=2 DataStream<String> A = ... DataStream<String> B = ... A.union(B).print(); // 範例2:A.id=2 B.id=1 DataStream<String> B = ... DataStream<String> A = ... A.union(B).print(); |
下面具體分析下關鍵函式 setChaining
的實現:
// 從source開始建立 node chains private void setChaining(Map<Integer, byte[]> hashes) { for (Integer sourceNodeId : streamGraph.getSourceIDs()) { createChain(sourceNodeId, sourceNodeId, hashes); } } // 構建node chains,返回當前節點的物理出邊 // startNodeId != currentNodeId 時,說明currentNode是chain中的子節點 private List<StreamEdge> createChain( Integer startNodeId, Integer currentNodeId, Map<Integer, byte[]> hashes) { if (!builtVertices.contains(startNodeId)) { // 過渡用的出邊集合, 用來生成最終的 JobEdge, 注意不包括 chain 內部的邊 List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>(); List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>(); List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>(); // 將當前節點的出邊分成 chainable 和 nonChainable 兩類 for (StreamEdge outEdge : streamGraph.getStreamNode(currentNodeId).getOutEdges()) { if (isChainable(outEdge)) { chainableOutputs.add(outEdge); } else { nonChainableOutputs.add(outEdge); } } //==> 遞迴呼叫 for (StreamEdge chainable : chainableOutputs) { transitiveOutEdges.addAll(createChain(startNodeId, chainable.getTargetId(), hashes)); } for (StreamEdge nonChainable : nonChainableOutputs) { transitiveOutEdges.add(nonChainable); createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes); } // 生成當前節點的顯示名,如:"Keyed Aggregation -> Sink: Unnamed" chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs)); // 如果當前節點是起始節點, 則直接建立 JobVertex 並返回 StreamConfig, 否則先建立一個空的 StreamConfig // createJobVertex 函式就是根據 StreamNode 建立對應的 JobVertex, 並返回了空的 StreamConfig StreamConfig config = currentNodeId.equals(startNodeId) ? createJobVertex(startNodeId, hashes) : new StreamConfig(new Configuration()); // 設定 JobVertex 的 StreamConfig, 基本上是序列化 StreamNode 中的配置到 StreamConfig 中. // 其中包括 序列化器, StreamOperator, Checkpoint 等相關配置 setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs); if (currentNodeId.equals(startNodeId)) { // 如果是chain的起始節點。(不是chain中的節點,也會被標記成 chain start) config.setChainStart(); // 我們也會把物理出邊寫入配置, 部署時會用到 config.setOutEdgesInOrder(transitiveOutEdges); config.setOutEdges(streamGraph.getStreamNode(currentNodeId).getOutEdges()); // 將當前節點(headOfChain)與所有出邊相連 for (StreamEdge edge : transitiveOutEdges) { // 通過StreamEdge構建出JobEdge,建立IntermediateDataSet,用來將JobVertex和JobEdge相連 connect(startNodeId, edge); } // 將chain中所有子節點的StreamConfig寫入到 headOfChain 節點的 CHAINED_TASK_CONFIG 配置中 config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId)); } else { // 如果是 chain 中的子節點 Map<Integer, StreamConfig> chainedConfs = chainedConfigs.get(startNodeId); if (chainedConfs == null) { chainedConfigs.put(startNodeId, new HashMap<Integer, StreamConfig>()); } // 將當前節點的StreamConfig新增到該chain的config集合中 chainedConfigs.get(startNodeId).put(currentNodeId, config); } // 返回連往chain外部的出邊集合 return transitiveOutEdges; } else { return new ArrayList<>(); } } |
每個 JobVertex 都會對應一個可序列化的 StreamConfig, 用來發送給 JobManager 和 TaskManager。最後在 TaskManager 中起 Task 時,需要從這裡面反序列化出所需要的配置資訊, 其中就包括了含有使用者程式碼的StreamOperator。
setChaining
會對source呼叫createChain
方法,該方法會遞迴呼叫下游節點,從而構建出node chains。createChain
會分析當前節點的出邊,根據Operator Chains中的chainable條件,將出邊分成chainalbe和noChainable兩類,並分別遞迴呼叫自身方法。之後會將StreamNode中的配置資訊序列化到StreamConfig中。如果當前不是chain中的子節點,則會構建 JobVertex 和 JobEdge相連。如果是chain中的子節點,則會將StreamConfig新增到該chain的config集合中。一個node chains,除了 headOfChain node會生成對應的 JobVertex,其餘的nodes都是以序列化的形式寫入到StreamConfig中,並儲存到headOfChain的 CHAINED_TASK_CONFIG
配置項中。直到部署時,才會取出並生成對應的ChainOperators,具體過程請見理解 Operator Chains。
總結
本文主要對 Flink 中將 StreamGraph 轉變成 JobGraph 的核心原始碼進行了分析。思想還是很簡單的,StreamNode 轉成 JobVertex,StreamEdge 轉成 JobEdge,JobEdge 和 JobVertex 之間建立 IntermediateDataSet 來連線。關鍵點在於將多個 SteamNode chain 成一個 JobVertex的過程,這部分原始碼比較繞,有興趣的同學可以結合原始碼單步除錯分析。下一章將會介紹 JobGraph 提交到 JobManager 後是如何轉換成分散式化的 ExecutionGraph 的。