1. 程式人生 > >Flink原始碼系列——獲取JobGraph的過程

Flink原始碼系列——獲取JobGraph的過程

接《Flink原始碼系列——獲取StreamGraph的過程》獲取到StreamGraph後,繼續分析,如果通過獲取到的StreamGraph來轉化為JobGraph。轉化邏輯在StreamingJobGraphGenerator這個類中,入口是createJobGraph(StreamGraph)方法。先是初始化了一個StreamingJobGraphGenerator的例項,StreamingJobGraphGenerator建構函式是私有的,只能通過這裡進行例項構造,建構函式中就是做了一些基本的初始化的工作,並初始化了一個JobGraph例項,然後呼叫內部的私有方法createJobGraph()。

public static JobGraph createJobGraph(StreamGraph streamGraph) {
   return new StreamingJobGraphGenerator(streamGraph).createJobGraph();
}

createJobGraph()方法就是jobGraph進行配置的主要邏輯,如下:

private JobGraph createJobGraph() {
   /** 設定排程模式,採用的EAGER模式,既所有節點都是立即啟動的 */
   jobGraph.setScheduleMode(ScheduleMode.EAGER);
   /** 第一步 */
   /** 
    * 1.1
    * 廣度優先遍歷StreamGraph,並且為每個SteamNode生成雜湊值, 這裡的雜湊值產生演算法,可以保證如果提交的拓撲沒有改變,則每次生成的雜湊值都是一樣的。
    * 一個StreamNode的ID對應一個雜湊值。
    */
   Map<Integer, byte[]> hashes = defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);
   /**
    * 1.2
    * 為向後相容性生成遺留版本雜湊
    * 目前好像就是根據使用者對每個StreamNode設定的hash值,生產StreamNode對應的hash值
    */
   List<Map<Integer, byte[]>> legacyHashes = new ArrayList<>(legacyStreamGraphHashers.size());
   for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
      legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
   }
   /** 相連線的操作符的雜湊值對 */
   Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes = new HashMap<>();
   /** 
    * 第二步
    * 最重要的函式,生成JobVertex,JobEdge等,並儘可能地將多個節點chain在一起 
    */
   setChaining(hashes, legacyHashes, chainedOperatorHashes);
   /**
    * 第三步
    * 將每個JobVertex的入邊集合也序列化到該JobVertex的StreamConfig中
    * (出邊集合已經在setChaining的時候寫入了)
    */
   setPhysicalEdges();
   /**
    * 第四步
    * 據group name,為每個 JobVertex 指定所屬的 SlotSharingGroup,
    * 以及針對 Iteration的頭尾設定  CoLocationGroup
    */
   setSlotSharing();
   /** 
    * 第五步
    * 配置checkpoint 
    */
   configureCheckpointing();
   /** 將快取檔案的配置新增到configuration中 */
   for (Tuple2<String, DistributedCache.DistributedCacheEntry> e : streamGraph.getEnvironment().getCachedFiles()) {
      DistributedCache.writeFileInfoToConfig(e.f0, e.f1, jobGraph.getJobConfiguration());
   }
   /** 設定ExecutionConfig */
   try {
      jobGraph.setExecutionConfig(streamGraph.getExecutionConfig());
   }
   catch (IOException e) {
      throw new IllegalConfigurationException("Could not serialize the ExecutionConfig." +
            "This indicates that non-serializable types (like custom serializers) were registered");
   }
   /** 返回轉化好的jobGraph */
   return jobGraph;
}

jobGraph的整個產生過程就如上所示,接下來針對其中的主要步驟進行簡單分析。

第一步、為每個節點產生雜湊值

這裡就是根據StreamGraph的配置,給StreamGraph中的每個StreamNode產生一個長度為16的位元組陣列的雜湊值,這個雜湊值是用來後續生成JobGraph中對應的JobVertex的ID。在Flink中,任務存在從checkpoint中進行狀態恢復的場景,而在恢復時,是以JobVertexID為依據的,所有就需要任務在重啟的過程中,對於相同的任務,其各JobVertexID能夠保持不變,而StreamGraph中各個StreamNode的ID,就是其包含的StreamTransformation的ID,而StreamTransformation的ID是在對資料流中的資料進行轉換的過程中,通過一個靜態的累加器生成的,比如有多個數據源時,每個資料來源新增的順序不一致,則有可能導致相同資料處理邏輯的任務,就會對應於不同的ID,所以為了得到確定的ID,在進行JobVertexID的產生時,需要以一種確定的方式來確定其值,要麼是通過使用者為每個ID直接指定對應的一個雜湊值,要麼參考StreamGraph中的一些特徵,為每個JobVertex產生一個確定的ID。

defaultStreamGraphHasher是在StreamingJobGraphGenerator建構函式中初始化的,其對應StreamGraphHasherV2的例項,這個類就是負責給StreamGraph中的每個StreamNode產生一個確定的雜湊值,其具體的實現這裡不做介紹,感興趣的可以看下它的原始碼,邏輯還是很清晰的,這裡主要介紹下其在產生一個StreamNode時,主要考慮的因素。(最好是結合著具體的程式碼看這段邏輯,會更清晰)

如果使用者對節點指定了一個雜湊值,則基於使用者指定的值,產生一個長度為16的位元組陣列; 
如果使用者沒有指定,則根據當前節點所處的位置,產生一個雜湊值,考慮的因素有:
a、在當前StreamNode之前已經處理過的節點的個數,作為當前StreamNode的id,新增到hasher中; 
b、遍歷當前StreamNode輸出的每個StreamEdge,並判斷當前StreamNode與這個StreamEdge的目標StreamNode是否可以進行連結,如果可以,則將目標StreamNode的id也放入hasher中,且這個目標StreamNode的id與當前StreamNode的id取相同的值; 
c、將上述步驟後產生的位元組資料,與當前StreamNode的所有輸入StreamNode對應的位元組資料,進行相應的位操作,最終得到的位元組資料,就是當前StreamNode對應的長度為16的位元組陣列。
另外在StreamingJobGraphGenerator的建構函式中,legacyStreamGraphHashers這個陣列中,預設新增一個StreamGraphHasher的子類實現StreamGraphUserHashHasher。所以在上述的程式碼中,1.2步驟就是執行StreamGraphUserHashHasher這個類的邏輯。這個類的邏輯很簡單,就是判斷使用者是否設定了雜湊值,如果設定了,就為對應的StreamNode產生一個雜湊值陣列。

這裡涉及到兩個使用者設定的雜湊值,StreamingJobGraphGenerator中使用的是StreamTransformation的uid屬性,StreamGraphUserHashHasher使用的是StreamTransformation的userProvidedNodeHash屬性。這兩個屬性解析如下:

uid —— 這個欄位是使用者設定的,用來在任務重啟時,保障JobVertexID一致,一般是從之前的任務日誌中,找出對應的值而設定的; 
userProvidedNodeHash —— 這個欄位也是使用者設定的,設定的使用者自己產生的雜湊值。
第二步、設定執行鏈

執行鏈的設定,就是從資料來源StreamNode,依次遍歷,如下:

private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer, byte[]>> legacyHashes, Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) {
   /** 從源StreamNode進行遍歷 */
   for (Integer sourceNodeId : streamGraph.getSourceIDs()) {
      createChain(sourceNodeId, sourceNodeId, hashes, legacyHashes, 0, chainedOperatorHashes);
   }
}

三個入參:

hashes和legacyHashes就是上面產生的每個StreamNode的ID對應的雜湊位元組陣列。 
chainedOperatorHashes是一個map:
其key是順序連結在一起的StreamNode的起始那個StreamNode的ID,比如source->flatMap這個兩個對應的StreamNode,在這個例子中,key的值就是source對應的id,為1; 
value是一個列表,包含了這個鏈上的所有操作符的雜湊值;
這個列表中的每個元素是一個二元組,這個列表的值就是{[source的主hash,source的備用hash_1],[source的主hash,source的備用hash_2],[flatMap的主hash,flatMap的備用hash_1],…},對於這裡的例子,列表中只有二個元素,為{[source的主hash,null],[flatMap的主hash,null]}
在進行分析createChain方法之前,先看一下兩個StreamNode是否可以連結到一起執行的判斷邏輯,如下:

public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
   /** 獲取StreamEdge的源和目標StreamNode */
   StreamNode upStreamVertex = edge.getSourceVertex();
   StreamNode downStreamVertex = edge.getTargetVertex();
   /** 獲取源和目標StreamNode中的StreamOperator */
   StreamOperator<?> headOperator = upStreamVertex.getOperator();
   StreamOperator<?> outOperator = downStreamVertex.getOperator();
   /**
    * 1、下游節點只有一個輸入
    * 2、下游節點的操作符不為null
    * 3、上游節點的操作符不為null
    * 4、上下游節點在一個槽位共享組內
    * 5、下游節點的連線策略是 ALWAYS
    * 6、上游節點的連線策略是 HEAD 或者 ALWAYS
    * 7、edge 的分割槽函式是 ForwardPartitioner 的例項
    * 8、上下游節點的並行度相等
    * 9、可以進行節點連線操作
    */
   return downStreamVertex.getInEdges().size() == 1
         && outOperator != null
         && headOperator != null
         && upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
         && outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS
         && (headOperator.getChainingStrategy() == ChainingStrategy.HEAD ||
            headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS)
         && (edge.getPartitioner() instanceof ForwardPartitioner)
         && upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
         && streamGraph.isChainingEnabled();
}

只有上述的9個條件都同時滿足時,才能說明兩個StreamEdge的源和目標StreamNode是可以連結在一起執行的。

createChain方法的處理邏輯就是依次遍歷StreamGraph中的所有資料來源的ID,對於這裡案例來說,只有一個數據源,其ID為1。createChain的入參解釋如下:

startNodeId —— StreamNode的鏈的起始node的id,由於從source開始,這裡就是1; 
currentNodeId —— 當前處理的node的id,這裡也是1; 
hashes和legacyHashes —— 這兩個就是前面產生的每個StreamNode對應的雜湊值; 
chainIndex —— 表示當前節點在鏈中的位置,每個鏈都是從0開始編號,當前才處理source,所以為0; 
chainedOperatorHashes —— 用來儲存每個鏈中所有操作符的雜湊值,含義前面已經解釋過,初始值是一個空map。
以上面給定的入參值,來繼續分析構建過程。

private List<StreamEdge> createChain(
      Integer startNodeId,
      Integer currentNodeId,
      Map<Integer, byte[]> hashes,
      List<Map<Integer, byte[]>> legacyHashes,
      int chainIndex,
      Map<Integer, List<Tuple2<byte[], byte[]>>> chainedOperatorHashes) {

   /** builtVertices這個集合是用來存放已經構建好的StreamNode的id */
   if (!builtVertices.contains(startNodeId)) {

      /**
       * 過渡用的出邊集合, 用來生成最終的 JobEdge,
       * 注意:存在某些StreamNode會連線到一起,比如source->map->flatMap,
       * 如果這幾個StreamNode連線到一起,則transitiveOutEdges是不包括 chain 內部的邊,既不包含source->map的StreamEdge的 */
      List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();

      /** 可以與當前節點連結的StreamEdge */
      List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
      /** 不可以與當前節點連結的StreamEdge */
      List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();

      /** 將當前節點的出邊分成 chainable 和 nonChainable 兩類 */
      for (StreamEdge outEdge : streamGraph.getStreamNode(currentNodeId).getOutEdges()) {
         if (isChainable(outEdge, streamGraph)) {
            chainableOutputs.add(outEdge);
         } else {
            nonChainableOutputs.add(outEdge);
         }
      }

      /** 對於每個可連線的StreamEdge,遞迴呼叫其目標StreamNode,startNodeId保持不變,但是chainIndex會加1 */
      for (StreamEdge chainable : chainableOutputs) {
         transitiveOutEdges.addAll(
               createChain(startNodeId, chainable.getTargetId(), hashes, legacyHashes, chainIndex + 1, chainedOperatorHashes));
      }

      /**
       * 對於每個不可連線的StreamEdge,則將對於的StreamEdge就是當前鏈的一個輸出StreamEdge,所以會新增到transitiveOutEdges這個集合中
       * 然後遞迴呼叫其目標節點,注意,startNodeID變成了nonChainable這個StreamEdge的輸出節點id,chainIndex也賦值為0,說明重新開始一條鏈的建立
       */
      for (StreamEdge nonChainable : nonChainableOutputs) {
         transitiveOutEdges.add(nonChainable);
         createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes, legacyHashes, 0, chainedOperatorHashes);
      }

      /** 獲取鏈起始節點對應的操作符雜湊值列表,如果沒有,則是空列表 */
      List<Tuple2<byte[], byte[]>> operatorHashes = chainedOperatorHashes.computeIfAbsent(startNodeId, k -> new ArrayList<>());

      /** 當前 StreamNode 對應的主雜湊值 */
      byte[] primaryHashBytes = hashes.get(currentNodeId);

      /** 遍歷每個備用雜湊值,並與主雜湊值,組成一個二元組,新增到列表中 */
      for (Map<Integer, byte[]> legacyHash : legacyHashes) {
         operatorHashes.add(new Tuple2<>(primaryHashBytes, legacyHash.get(currentNodeId)));
      }

      /** 生成當前節點的顯示名,如:"Keyed Aggregation -> Sink: Unnamed" */
      chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs));
      chainedMinResources.put(currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs));
      chainedPreferredResources.put(currentNodeId, createChainedPreferredResources(currentNodeId, chainableOutputs));

      /**
       * 1、如果當前節點是起始節點, 則直接建立 JobVertex 並返回 StreamConfig,
       * 2、否則先建立一個空的 StreamConfig
       *
       * createJobVertex 函式就是根據 StreamNode 建立對應的 JobVertex, 並返回了空的 StreamConfig
       */
      StreamConfig config = currentNodeId.equals(startNodeId)
            ? createJobVertex(startNodeId, hashes, legacyHashes, chainedOperatorHashes)
            : new StreamConfig(new Configuration());

      /**
       * {@link StreamConfig}就是對{@link Configuration}的封裝,
       * 所以通過{@code StreamConfig}設定的配置,最終都是儲存在{@code Configuration}中的。
       */

      /**
       * 設定 JobVertex 的 StreamConfig, 基本上是序列化 StreamNode 中的配置到 StreamConfig 中.
       * 其中包括 序列化器, StreamOperator, Checkpoint 等相關配置
       * 經過這一步操作後,StreamNode的相關配置會通過對{@code StreamNode}的設定介面,將配置儲存在{@code Configuration}中,
       * 而{@code Configuration}是是{@link JobVertex}的屬性,也就是說經過這步操作,相關配置已經被儲存到了{@code JobVertex}中。
       */
      setVertexConfig(currentNodeId, config, chainableOutputs, nonChainableOutputs);

      if (currentNodeId.equals(startNodeId)) {

         /** 如果是chain的起始節點。(不是chain的中間節點,會被標記成 chain start)*/
         config.setChainStart();
         config.setChainIndex(0);
         config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
         /** 我們也會把物理出邊寫入配置, 部署時會用到 */
         config.setOutEdgesInOrder(transitiveOutEdges);
         config.setOutEdges(streamGraph.getStreamNode(currentNodeId).getOutEdges());

         /** 將當前節點(headOfChain)與所有出邊相連 */
         for (StreamEdge edge : transitiveOutEdges) {
            /** 通過StreamEdge構建出JobEdge,建立 IntermediateDataSet ,用來將JobVertex和JobEdge相連 */
            connect(startNodeId, edge);
         }

         /** 將chain中所有子節點的StreamConfig寫入到 headOfChain 節點的 CHAINED_TASK_CONFIG 配置中 */
         config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));

      } else {
         /** 如果是 chain 中的子節點 */

         Map<Integer, StreamConfig> chainedConfs = chainedConfigs.get(startNodeId);

         if (chainedConfs == null) {
            chainedConfigs.put(startNodeId, new HashMap<Integer, StreamConfig>());
         }
         config.setChainIndex(chainIndex);
         StreamNode node = streamGraph.getStreamNode(currentNodeId);
         config.setOperatorName(node.getOperatorName());
         /** 將當前節點的StreamConfig新增到該chain的config集合中 */
         chainedConfigs.get(startNodeId).put(currentNodeId, config);
      }

      config.setOperatorID(new OperatorID(primaryHashBytes));

      /** 如果節點的輸出StreamEdge已經為空,則說明是鏈的結尾 */
      if (chainableOutputs.isEmpty()) {
         config.setChainEnd();
      }
      return transitiveOutEdges;

   } else {
      /** startNodeId 如果已經構建過,則直接返回 */
      return new ArrayList<>();
   }
}

建立過程詳解程式碼中的註解。

如果startNodeId已經被構建完成,則直接返回一個空集合; 
如果還沒有構建,則開始新的構建;
顯示遞迴構建鏈的下游節點,在下游節點都遞迴構建完成後,再構建當前節點; 
如果當前節點是一個鏈的起始節點,則新建一個JobVertex,並將相關配置都通過StreamConfig提供的介面,配置到JobVertex的configuration屬性中; 
如果是鏈的中間節點,則將相關配置新增到其對應的StreamConfig物件中。
在對head節點設定時,會在head節點與每個輸出StreamEdge的目標節點之間建立連線,程式碼如下:

private void connect(Integer headOfChain, StreamEdge edge) {
   /** 將當前edge記錄物理邊界順序集合中 */
   physicalEdgesInOrder.add(edge);

   /** 獲取StreamEdge的輸出節點的id */
   Integer downStreamvertexID = edge.getTargetId();

   /** 通過節點id獲取到要進行連線的上下游JobVertex節點 */
   JobVertex headVertex = jobVertices.get(headOfChain);
   JobVertex downStreamVertex = jobVertices.get(downStreamvertexID);

   /** 獲取下游JobVertex的配置屬性 */
   StreamConfig downStreamConfig = new StreamConfig(downStreamVertex.getConfiguration());

   /** 下游JobVertex的輸入源加1 */
   downStreamConfig.setNumberOfInputs(downStreamConfig.getNumberOfInputs() + 1);

   /** 獲取StreamEdge中的分割槽器 */
   StreamPartitioner<?> partitioner = edge.getPartitioner();
   JobEdge jobEdge;
   /** 根據分割槽器的不同子類,建立相應的JobEdge */
   if (partitioner instanceof ForwardPartitioner) {
      /** 向前傳遞分割槽 */
      jobEdge = downStreamVertex.connectNewDataSetAsInput(
         headVertex,
         DistributionPattern.POINTWISE,
         ResultPartitionType.PIPELINED_BOUNDED);
   } else if (partitioner instanceof RescalePartitioner){
      /** 可擴充套件分割槽 */
      jobEdge = downStreamVertex.connectNewDataSetAsInput(
         headVertex,
         DistributionPattern.POINTWISE,
         ResultPartitionType.PIPELINED_BOUNDED);
   } else {
      /** 其他分割槽 */
      jobEdge = downStreamVertex.connectNewDataSetAsInput(
            headVertex,
            DistributionPattern.ALL_TO_ALL,
            ResultPartitionType.PIPELINED_BOUNDED);
   }
   /** 設定資料傳輸策略,以便在web上顯示 */
   jobEdge.setShipStrategyName(partitioner.toString());

   /** 列印除錯日誌 */
   if (LOG.isDebugEnabled()) {
      LOG.debug("CONNECTED: {} - {} -> {}", partitioner.getClass().getSimpleName(),
            headOfChain, downStreamvertexID);
   }
}

其中JobEdge是通過下游JobVertex的connectNewDataSetAsInput方法來建立的,在建立JobEdge的前,會先用上游JobVertex建立一個IntermediateDataSet例項,用來作為上游JobVertex的結果輸出,然後作為JobEdge的輸入,構建JobEdge例項,具體實現如下:

public JobEdge connectNewDataSetAsInput(
      JobVertex input,
      DistributionPattern distPattern,
      ResultPartitionType partitionType) {
   /** 建立輸入JobVertex的輸出資料集合 */
   IntermediateDataSet dataSet = input.createAndAddResultDataSet(partitionType);
   /** 構建JobEdge例項 */
   JobEdge edge = new JobEdge(dataSet, this, distPattern);
   /** 將JobEdge例項,作為當前JobVertex的輸入 */
   this.inputs.add(edge);
   /** 設定中間結果集合dataSet的消費者是上面建立的JobEdge */
   dataSet.addConsumer(edge);
   return edge;
}

通過上述的構建過程,就可以實現上下游JobVertex的連線,上游JobVertex ——> 中間結果集合IntermediateDataSet ——> JobEdge ——> 下游JobVertex。其中IntermediateDataSet和JobEdge是用來建立上下游JobVertex之間連線的配置;一個IntermediateDataSet有一個訊息producer,可以有多個訊息消費者JobEdge;一個JobEdge則有一個數據源IntermediateDataSet,一個目標JobVertex;一個JobVertex可以產生多個輸出IntermediateDataSet,也可以接受來自多個JobEdge的資料。

通過上述的構建過程,對於這裡的例子,source -> flatMap 組成一個鏈,構建成一個JobVertex,reduce -> sink 組成一個鏈,構建成一個JobVertex。

JobGraph是在StreamGraph的基礎之上,對StreamNode進行了關聯合並的操作,比如對於source -> flatMap -> reduce -> sink 這樣一個數據處理鏈,當source和flatMap滿足連結的條件時,可以可以將兩個操作符的操作放到一個執行緒並行執行,這樣可以減少網路中的資料傳輸,由於在source和flatMap之間的傳輸的資料也不用序列化和反序列化,所以也提高了程式的執行效率。
--------------------- 
作者:混混fly 
來源:CSDN 
原文:https://blog.csdn.net/qq_21653785/article/details/79510140 
版權宣告:本文為博主原創文章,轉載請附上博文連結!