Flink 原始碼解析之從 Example 出發:理解 JobGraph 的生成過程
關注可瞭解更多大資料相關的技術資訊。問題或建議,請公眾號留言;
如果你覺得深廣大資料Club對你有幫助,歡迎點選右上角轉發朋友圈
本文主要講述JobGraph的生成過程,JobGraph是通過streamGraph.getJobGraph生成。建議看這這篇文章之前先看下前一篇文章 ofollow,noindex">Flink原始碼解析 | 從Example出發:理解StreamGraph的生成過程
從上一篇文章中,我們瞭解了StreamGraph的生成過程,現在我們接著講解JobGraph的生成。
先來看下StreamGraph與JobGraph的轉換圖。
StreamGraph 和 JobGraph 都是在 Client 端生成的,我們可以在 IDE 中通過斷點除錯觀察 StreamGraph 和 JobGraph 的生成過程。
JobGraph的生成流程
StreamGraph生成之後,程式碼中通過streamGraph.getJobGraph()來獲取JobGraph例項。
JobGraph jobGraph = streamGraph.getJobGraph(); jobGraph.setAllowQueuedScheduling(true);
getJobGraph()方法中,呼叫StreamingJobGraphGenerator.createJobGraph()方法來建立JobGraph
public JobGraph getJobGraph(@Nullable JobID jobID) { // temporarily forbid checkpointing for iterative jobs if (isIterative() && checkpointConfig.isCheckpointingEnabled() && !checkpointConfig.isForceCheckpointing()) { throw new UnsupportedOperationException( "Checkpointing is currently not supported by default for iterative jobs, as we cannot guarantee exactly once semantics. " + "State checkpoints happen normally, but records in-transit during the snapshot will be lost upon failure. " + "\nThe user can force enable state checkpoints with the reduced guarantees by calling: env.enableCheckpointing(interval,true)"); } return StreamingJobGraphGenerator.createJobGraph(this, jobID); }
createJobGraph方法中會例項化一個StreamingJobGraphGenerator的例項,建立例項時傳入之前生成的streamGraph和jobId,再呼叫createJobGraph()方法建立
public static JobGraph createJobGraph(StreamGraph streamGraph, @Nullable JobID jobID) { return new StreamingJobGraphGenerator(streamGraph, jobID).createJobGraph(); }
我們來看看StreamingJobGraphGenerator的構造方法做了哪些事情。
private StreamingJobGraphGenerator(StreamGraph streamGraph, @Nullable JobID jobID) { this.streamGraph = streamGraph; this.defaultStreamGraphHasher = new StreamGraphHasherV2(); this.legacyStreamGraphHashers = Arrays.asList(new StreamGraphUserHashHasher()); this.jobVertices ID:JobVertex = new HashMap<>(); this.builtVertices = new HashSet<>(); this.chainedConfigs = new HashMap<>(); this.vertexConfigs = new HashMap<>(); this.chainedNames = new HashMap<>(); this.chainedMinResources = new HashMap<>(); this.chainedPreferredResources = new HashMap<>(); this.physicalEdgesInOrder = new ArrayList<>(); jobGraph = new JobGraph(jobID, streamGraph.getJobName()); }
構造方法中建立了一系列hashmap後,建立了一個基礎JobGraph物件,此處只給JobGraph的jobId以及JobName做了賦值,其他的資訊是在createJobGraph()方法中進行填充。
HashMap介紹
-
jobVertices id -> JobVertex
-
builtVertices 已構建的JobVertex的ID集合
-
chainedConfigs 儲存chain資訊,部署時用來構建 OperatorChain,startNodeId -> (currentNodeId -> StreamConfig)
-
vertexConfigs 所有節點的配置資訊,id -> StreamConfig
-
chainedNames 儲存每個節點的名字,id -> chainedName
-
chainedMinResources 儲存每個節點的最小資源 id -> ResourceSpec
-
chainedPreferredResources 儲存每個節點的首選資源 id -> ResourceSpec
-
physicalEdgesInOrder 物理邊集合(排除了chain內部的邊), 按建立順序排序
到此就是建立JobGraph的大體流程。不過我們再更深入一步來了解createJobGraph()方法的詳細內容。
CreateJobGraph分解
private JobGraph createJobGraph() { // make sure that all vertices start immediately //streaming 模式下,排程模式是所有節點(vertices)一起啟動 jobGraph.setScheduleMode(ScheduleMode.EAGER); // Generate deterministic hashes for the nodes in order to identify them across // submission iff they didn't change. // 廣度優先遍歷 StreamGraph 並且為每個SteamNode生成hash id, // 保證如果提交的拓撲沒有改變,則每次生成的hash都是一樣的 Map<Integer, byte[]> hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph); // 為向後相容性生成遺留版本Hash List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size()); for (StreamGraphHasher hasher : legacyStreamGraphHashers) { legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph)); } //用於儲存遺留hash與當前primary之間的對應關係 Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes = new HashMap<>(); // 最重要的函式,生成JobVertex,JobEdge等,並儘可能地將多個節點chain在一起 setChaining(hashes, legacyHashes, chainedOperatorHashes); // 將每個JobVertex的入邊集合也序列化到該JobVertex的StreamConfig中 // (出邊集合已經在setChaining的時候寫入了) setPhysicalEdges(); // 根據group name,為每個 JobVertex 指定所屬的 SlotSharingGroup // 以及針對 Iteration的頭尾設定CoLocationGroup setSlotSharingAndCoLocation(); // 配置checkpoint configureCheckpointing(); //為JobGraph新增使用者Artifact Entries JobGraphGenerator.addUserArtifactEntries(streamGraph.getEnvironment().getCachedFiles(), jobGraph); // set the ExecutionConfig last when it has been finalized try { //從streamGraph獲取Execution並設定到jobGraph中 jobGraph.setExecutionConfig(streamGraph.getExecutionConfig()); } catch (IOException e) { throw new IllegalConfigurationException("Could not serialize the ExecutionConfig." + "This indicates that non-serializable types (like custom serializers) were registered"); } return jobGraph; }
我們先來看下排程模式,ScheduleMode中包含兩種模式:LAZY_FROM_SOURCES和EAGER。
-
LAZY_FROM_SOURCES 延遲啟動
-
EAGER 立即全部啟動
public enum ScheduleMode { /** 從源開始延遲地安排任務。一旦下游任務的輸入資料準備好,下游任務就會啟動 */ LAZY_FROM_SOURCES, /** 排程所有任務立即啟動. */ EAGER; /** * Returns whether we are allowed to deploy consumers lazily. */ public boolean allowLazyDeployment() { return this == LAZY_FROM_SOURCES; } }
Chain建立過程
接下來我們重點分析下 setChaining
方法
private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes, Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) { for (Integer sourceNodeId : streamGraph.getSourceIDs()) { createChain(sourceNodeId, sourceNodeId, hashes, legacyHashes, 0, chainedOperatorHashes); } } private List<StreamEdge> createChain( Integer startNodeId, Integer currentNodeId, Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes, int chainIndex, Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) { if (!builtVertices.contains(startNodeId)) { // 過渡用的出邊集合, 用來生成最終的JobEdge, 注意不包括 chain 內部的邊 List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>(); List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>(); List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>(); // 將當前節點的出邊分成 chainable 和 nonChainable 兩類 for (StreamEdge outEdge : streamGraph.getStreamNode(currentNodeId).getOutEdges()) { if (isChainable(outEdge, streamGraph)) { chainableOutputs.add(outEdge); } else { nonChainableOutputs.add(outEdge); } } //遞迴呼叫 for (StreamEdge chainable : chainableOutputs) { transitiveOutEdges.addAll( createChain(startNodeId, chainable.getTargetId(), hashes, legacyHashes, chainIndex + 1, chainedOperatorHashes)); } for (StreamEdge nonChainable : nonChainableOutputs) { transitiveOutEdges.add(nonChainable); createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes, legacyHashes, 0, chainedOperatorHashes); } List<Tuple2<byte[], byte[]>> operatorHashes = chainedOperatorHashes.computeIfAbsent(startNodeId, k -> new ArrayList<>()); byte[] primaryHashBytes = hashes.get(currentNodeId); // 迴圈填充primaryHash與legacyHash的對應關係 for (Map<Integer, byte[]> legacyHash : legacyHashes) { operatorHashes.add(new Tuple2<>(primaryHashBytes, legacyHash.get(currentNodeId))); } // 生成當前節點的顯示名,如:"Keyed Aggregation -> Sink: Unnamed" chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs)); //設定當前節點的最小資源 chainedMinResources.put(currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs)); //設定當前節點的首選資源 chainedPreferredResources.put(currentNodeId, createChainedPreferredResources(currentNodeId, chainableOutputs)); // 如果當前節點是起始節點, 則直接建立 JobVertex 並返回 StreamConfig, 否則先建立一個空的 StreamConfig // createJobVertex 函式就是根據 StreamNode 建立對應的 JobVertex, 並返回了空的 StreamConfig StreamConfig config = currentNodeId.equals(startNodeId) ? createJobVertex(startNodeId, hashes, legacyHashes, chainedOperatorHashes) : new StreamConfig(new Configuration()); // 設定 JobVertex 的 StreamConfig, 基本上是序列化 StreamNode 中的配置到 StreamConfig 中. // 其中包括 序列化器, StreamOperator, Checkpoint 等相關配置 setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs); if (currentNodeId.equals(startNodeId)) { // 如果是chain的起始節點。(不是chain中的節點,也會被標記成 chain start) config.setChainStart(); config.setChainIndex(0); config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName()); // 我們也會把物理出邊寫入配置, 部署時會用到 config.setOutEdgesInOrder(transitiveOutEdges); config.setOutEdges(streamGraph.getStreamNode(currentNodeId).getOutEdges()); // 將當前節點(headOfChain)與所有出邊相連 for (StreamEdge edge : transitiveOutEdges) { // 通過StreamEdge構建出JobEdge,建立IntermediateDataSet,用來將JobVertex和JobEdge相連 connect(startNodeId, edge); } // 將chain中所有子節點的StreamConfig寫入到 headOfChain 節點的 CHAINED_TASK_CONFIG 配置中 config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId)); } else { // 如果是 chain 中的子節點 Map<Integer, StreamConfig> chainedConfs = chainedConfigs.get(startNodeId); if (chainedConfs == null) { chainedConfigs.put(startNodeId, new HashMap<Integer, StreamConfig>()); } config.setChainIndex(chainIndex); StreamNode node = streamGraph.getStreamNode(currentNodeId); config.setOperatorName(node.getOperatorName()); // 將當前節點的StreamConfig新增到該chain的config集合中 chainedConfigs.get(startNodeId).put(currentNodeId, config); } config.setOperatorID(new OperatorID(primaryHashBytes)); if (chainableOutputs.isEmpty()) { config.setChainEnd(); } // 返回連往chain外部的出邊集合 return transitiveOutEdges; } else { return new ArrayList<>(); } }
StreamGraph以及JobGraph都是在客戶端生成的。客戶端提交任務時傳送給JobManager和TaskManager的則是一個個JobVerex,每一個每個 JobVertex 都會對應一個可序列化的 StreamConfig。TaskManager執行Task任務時,會將序列化的配置資訊進行反序列化,其中就包括了含有使用者程式碼的StreamOperator。
此處 setChainning
會對source呼叫 createChain
方法。
createChain
方法遞迴呼叫下游節點,構建出node chains.具體的流程如下:
-
createChain會分析當前節點的出邊,根據Operator Chains中的chainable條件,將出邊分成chainalbe和noChainable兩類,並分別遞迴呼叫自身方法。
//遞迴呼叫 for (StreamEdge chainable : chainableOutputs) { transitiveOutEdges.addAll( createChain(startNodeId, chainable.getTargetId(), hashes, legacyHashes, chainIndex + 1, chainedOperatorHashes)); }
-
配置operatorHashes用於故障恢復
-
設定當前節點資源
-
將StreamNode中的配置資訊序列化到StreamConfig中
-
判斷是否為chain子節點
-
是,將StreamConfig新增到該chain的config集合中
-
否,構建 JobVertex 和 JobEdge相連
-
一個node chains,除了 headOfChain node會生成對應的 JobVertex,其餘的nodes都是以序列化的形式寫入到StreamConfig中,並儲存到headOfChain的 CHAINED_TASK_CONFIG 配置項中
public void setTransitiveChainedTaskConfigs(Map<Integer, StreamConfig> chainedTaskConfigs) { try { InstantiationUtil.writeObjectToConfig(chainedTaskConfigs, this.config, CHAINED_TASK_CONFIG); } catch (IOException e) { throw new StreamTaskException("Could not serialize configuration.", e); } }
JobVerex建立過程
private StreamConfig createJobVertex( Integer streamNodeId, Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes, Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) { JobVertex jobVertex; StreamNode streamNode = streamGraph.getStreamNode(streamNodeId); byte[] hash = hashes.get(streamNodeId); if (hash == null) { throw new IllegalStateException("Cannot find node hash. " + "Did you generate them before calling this method?"); } JobVertexID jobVertexId = new JobVertexID(hash); List<JobVertexID> legacyJobVertexIds = new ArrayList<>(legacyHashes.size()); for (Map<Integer, byte[]> legacyHash : legacyHashes) { hash = legacyHash.get(streamNodeId); if (null != hash) { legacyJobVertexIds.add(new JobVertexID(hash)); } } List<Tuple2<byte[], byte[]>> chainedOperators = chainedOperatorHashes.get(streamNodeId); List<OperatorID> chainedOperatorVertexIds = new ArrayList<>(); List<OperatorID> userDefinedChainedOperatorVertexIds = new ArrayList<>(); if (chainedOperators != null) { for (Tuple2<byte[], byte[]> chainedOperator : chainedOperators) { chainedOperatorVertexIds.add(new OperatorID(chainedOperator.f0)); userDefinedChainedOperatorVertexIds.add(chainedOperator.f1 != null ? new OperatorID(chainedOperator.f1) : null); } } //如使用者自定義了UserFunction且屬於InputFormatSourceFunction,則通過InputFormatVertex建立JobVertex if (streamNode.getInputFormat() != null) { jobVertex = new InputFormatVertex( chainedNames.get(streamNodeId), jobVertexId, legacyJobVertexIds, chainedOperatorVertexIds, userDefinedChainedOperatorVertexIds); TaskConfig taskConfig = new TaskConfig(jobVertex.getConfiguration()); taskConfig.setStubWrapper(new UserCodeObjectWrapper<Object>(streamNode.getInputFormat())); } else { jobVertex = new JobVertex( chainedNames.get(streamNodeId), jobVertexId, legacyJobVertexIds, chainedOperatorVertexIds, userDefinedChainedOperatorVertexIds); } jobVertex.setResources(chainedMinResources.get(streamNodeId), chainedPreferredResources.get(streamNodeId)); jobVertex.setInvokableClass(streamNode.getJobVertexClass()); //設定並行度 int parallelism = streamNode.getParallelism(); if (parallelism > 0) { jobVertex.setParallelism(parallelism); } else { parallelism = jobVertex.getParallelism(); } //設定最大並行度 jobVertex.setMaxParallelism(streamNode.getMaxParallelism()); if (LOG.isDebugEnabled()) { LOG.debug("Parallelism set: {} for {}", parallelism, streamNodeId); } //設定streamNodeID與jobVertex的對映 jobVertices.put(streamNodeId, jobVertex); //將streamNodeId新增到已構建列表 builtVertices.add(streamNodeId); //將jobVertex新增到JobGraph中 jobGraph.addVertex(jobVertex); return new StreamConfig(jobVertex.getConfiguration()); }
建立流程其實是將StreamNode轉換為JobVerex的過程。主要內容在中間建立JobVerex段,判斷使用者是否自定義UserFunction且屬於InputFormatSourceFunction,是,則通過InputFormatVertex建立JobVertex;否,則通過JobVertex構造方法建立。
public JobVertex(String name, JobVertexID primaryId, List<JobVertexID> alternativeIds, List<OperatorID> operatorIds, List<OperatorID> alternativeOperatorIds) { Preconditions.checkArgument(operatorIds.size() == alternativeOperatorIds.size()); this.name = name == null ? DEFAULT_NAME : name; this.id = primaryId == null ? new JobVertexID() : primaryId; //JobVertex的可選id this.idAlternatives.addAll(alternativeIds); // 該JobVertex中包含的所有運算子ID this.operatorIDs.addAll(operatorIds); //該JobVertex中包含的所有運算子的備選ID。 this.operatorIdsAlternatives.addAll(alternativeOperatorIds); }
注意:這裡的idAlternatives僅為向後相容版本< 1.3而保留。將在未來被移除。目前在新版本已經沒有使用了。
總結
本文主要對 Flink 中將 StreamGraph 轉變成 JobGraph 的核心原始碼進行了分析。
根據開頭所給出的轉換圖片結合程式碼,整理如下:
-
StreamNode轉成JobVertex
-
StreamEdge轉成JobEdge
-
JobEdge和JobVertex之間建立IntermediateDataSet 來連線
最後一步是關鍵所在。感興趣的童鞋可以進行程式碼除錯,更清晰的瞭解轉換的過程。
系列相關文章
Flink原始碼解析 | 從Example出發理解Flink-Flink啟動
Flink原始碼解析 | 從Example出發:讀懂本地任務執行流程
Flink原始碼解析 | 從Example出發:讀懂叢集任務執行流程
Flink原始碼解析 | 從Example出發:讀懂Flink On Yarn任務執行流程
關注公眾號
友情連結
HBase技術社群