1. 程式人生 > >Hadoop 副本放置策略的源碼閱讀和設置

Hadoop 副本放置策略的源碼閱讀和設置

order ner 隨機 如果 related pac pro mach pre

本文通過MetaWeblog自動發布,原文及更新鏈接:https://extendswind.top/posts/technical/hadoop_block_placement_policy

大多數的叫法都是副本放置策略,實質上是HDFS對所有數據的位置放置策略,並非只是針對數據的副本。因此Hadoop的源碼裏有block replicator(configuration)、 BlockPlacementPolicy(具體邏輯源碼)兩種叫法。

主要用途:上傳文件時決定文件在HDFS上存儲的位置(具體到datanode上的具體存儲介質,如具體到存儲在哪塊硬盤);rebalance、datanode退出集群、副本數量更改等導致數據移動的操作中,數據移動的具體位置。

BlockPlacementPolicy

BlockPlacementPolicy 作為虛基類提供了基本的接口,具體的子類重點實現下面 選擇副本驗證副本放置是否滿足要求選擇能夠刪除的副本 三個函數:

 /**
   * 核心的副本放置策略實現,返回副本放置數量的存儲位置
   * **如果有效節點數量不夠(少於副本數),返回盡可能多的節點,而非失敗**
   *
   * @param srcPath 上傳文件的路徑
   * @param numOfReplicas 除下面chosen參數裏已經選擇的datanode,還需要的副本數量
   * @param writer 寫數據的機器, null if not in the cluster. 一般用於放置第一個副本以降低網絡通信
* @param chosen 已經選擇的節點 * @param returnChosenNodes 返回結果裏是否包含chosen的datanode * @param excludedNodes 不選的節點 * @param blocksize 塊大小 * @return 排序好的選擇結果 */ public abstract DatanodeStorageInfo[] chooseTarget(String srcPath, int numOfReplicas, Node writer,
List<DatanodeStorageInfo> chosen, boolean returnChosenNodes, Set<Node> excludedNodes, long blocksize, BlockStoragePolicy storagePolicy); /** * 判斷傳入的放置方式是否符合要求 */ abstract public BlockPlacementStatus verifyBlockPlacement( DatanodeInfo[] locs, int numOfReplicas); /** * 當副本數量較多時,選擇需要刪除的節點 */ abstract public List<DatanodeStorageInfo> chooseReplicasToDelete( Collection<DatanodeStorageInfo> candidates, int expectedNumOfReplicas, List<StorageType> excessTypes, DatanodeDescriptor addedNode, DatanodeDescriptor delNodeHint);

Hadoop 提供的 BlockPlacementPolicy 實現

Hadoop提供了BlockPlacementPolicyDefault、BlockPlacementPolicyWithNodeGroup、AvailableSpaceBlockPlacementPolicy三種實現(hadoop 2.7.7)。

其中BlockPlacementPolicyDefault是經典三副本策略的實現:第一個副本盡可能放在寫入數據的節點,第二個副本放在與第一個副本不在同一機架下的節點,第三個副本與第二副本放在同一個機架。

通過改變dfs.block.replicator.classname 能夠選擇具體的實現類,默認值為org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault。(Hadoop 2.7.7下,貌似不同版本的Hadoop的命名還不一樣,而且2.7.7默認的配置文件裏還沒有,需要在源碼中查)

BlockPlacementPolicyDefault 源碼閱讀

  public abstract DatanodeStorageInfo[] chooseTarget(String srcPath,
                                             int numOfReplicas,
                                             Node writer,
                                             List<DatanodeStorageInfo> chosen,
                                             boolean returnChosenNodes,
                                             Set<Node> excludedNodes,
                                             long blocksize,
                                             BlockStoragePolicy storagePolicy);

chooseTarget函數實現了具體的三副本策略。各種特殊情況(如只有1個副本、datanode數量不夠、集群拓撲不滿足要求等)的考慮讓代碼看起來比較復雜,常規情況直接跟著調試代碼走會跳過很多異常處理部分,便於裂解正常流程。

在副本的選擇上用了各種帶chooseTarget函數,註意有幾個函數結果是通過參數傳出而不是返回值。

主要實現思路:

  1. 各種變量初始化
  2. 考慮favoredNodes的放置
  3. 除滿足條件的favoredNodes後的副本放置策略(三副本)
  4. 結果排序

首先

srcPath沒有被考慮,被直接舍棄:

return chooseTarget(numOfReplicas, writer, chosenNodes, returnChosenNodes,
        excludedNodes, blocksize, storagePolicy, flags); // ignore srcPath

因此默認的副本放置策略,在同一文件包含多個block時,每個block的存儲位置獨立考慮,並非存儲在同一datanode

處理favoredNodes

上傳文件時可以指定favoredNodes(默認為空),首先對favoredNodes所在的節點判斷是否合適。如果滿足條件的節點數還低於副本數,則添加新的副本。

 // --------------Choose favored nodes ---------------
 // 從favored nodes中選擇,在上傳文件時可以指定
 List<DatanodeStorageInfo> results = new ArrayList<>();
 boolean avoidStaleNodes = stats != null
     && stats.isAvoidingStaleDataNodesForWrite();

 int maxNodesAndReplicas[] = getMaxNodesPerRack(0, numOfReplicas);
 numOfReplicas = maxNodesAndReplicas[0];
 int maxNodesPerRack = maxNodesAndReplicas[1];

 chooseFavouredNodes(src, numOfReplicas, favoredNodes,
     favoriteAndExcludedNodes, blocksize, maxNodesPerRack, results,
     avoidStaleNodes, storageTypes);

 // ---------------如果滿足要求的favored nodes數量不足-----------
 if (results.size() < numOfReplicas) {
   // Not enough favored nodes, choose other nodes, based on block
   // placement policy (HDFS-9393).
   numOfReplicas -= results.size();
   for (DatanodeStorageInfo storage : results) {
     // add localMachine and related nodes to favoriteAndExcludedNodes
     addToExcludedNodes(storage.getDatanodeDescriptor(),
         favoriteAndExcludedNodes);
   }
   DatanodeStorageInfo[] remainingTargets =
       chooseTarget(src, numOfReplicas, writer,
           new ArrayList<DatanodeStorageInfo>(numOfReplicas), false,
           favoriteAndExcludedNodes, blocksize, storagePolicy, flags);
   for (int i = 0; i < remainingTargets.length; i++) {
     results.add(remainingTargets[i]);
   }
 }

三副本選擇

實現邏輯在 chooseTargetInOrder(…) 函數中

// 第一個副本的選擇
if (numOfResults == 0) {
  writer = chooseLocalStorage(writer, excludedNodes, blocksize,
      maxNodesPerRack, results, avoidStaleNodes, storageTypes, true)
      .getDatanodeDescriptor();
  if (--numOfReplicas == 0) {
    return writer;
  }
}

// 選擇與第一個副本不在同一Rack下的第二個副本
final DatanodeDescriptor dn0 = results.get(0).getDatanodeDescriptor();
if (numOfResults <= 1) {
  chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,
      results, avoidStaleNodes, storageTypes);
  if (--numOfReplicas == 0) {
    return writer;
  }
}

// 第三個副本
if (numOfResults <= 2) {
  final DatanodeDescriptor dn1 = results.get(1).getDatanodeDescriptor();
  // 第一、二副本在同一Rack下時選第三個副本 
  // (前面的favoredNodes以及集群條件可能造成這種情況)
  if (clusterMap.isOnSameRack(dn0, dn1)) {
    chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,
        results, avoidStaleNodes, storageTypes);
  } else if (newBlock){ // 正常情況,第二副本的localRack下選第三副本
    chooseLocalRack(dn1, excludedNodes, blocksize, maxNodesPerRack,
        results, avoidStaleNodes, storageTypes);
  } else {  // 其它的以外
    chooseLocalRack(writer, excludedNodes, blocksize, maxNodesPerRack,
        results, avoidStaleNodes, storageTypes);
  }
  if (--numOfReplicas == 0) {
    return writer;
  }
}

// 如果副本數量還沒到0,剩下的副本隨機選擇
chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize,
    maxNodesPerRack, results, avoidStaleNodes, storageTypes);
return writer;

再到具體的選擇

選擇具體的存儲位置被上面包裝到了 chooseRemoteRack 和 chooseLocalRack 兩個函數。

實際調用時只是 chooseRandom 函數,在限定的rack下選擇一個隨機的節點。

源碼閱讀的幾個註意

代碼在直接閱讀時各種跳,但主線思路比較明確。主要帶來閱讀困難的位置:

  1. 很多函數調用不是通過返回值傳出結果,而是通過參數。
  2. 註意某些if後的return會直接返回結果,後面的代碼不會被調用。
  3. 遞歸的形式多次調用同一個函數以選擇多個副本。
  4. 很多代碼為了避免一些特殊情況,可以暫時略過(如catch裏的異常處理)。

修改HDFS默認的副本放置機制

可以選擇直接復制或繼承BlockPlacementPolicyDefault的實現,或者直接繼承BlockPlacementPolicy類編寫對應的接口具體實現。

將編譯好的jar包放入$HADOOP_PREFIX/share/hadoop/common下(或者其它的Hadoop jar包路徑)。

改變dfs.block.replicator.classname 為上面的實現類,要帶包的名稱。

RackAwareness 機架感知

Hadoop 並不能自動檢測集群的機架狀態,而是要預先設置機架的狀態,通過腳本或java類將datanode的ip轉換成具體的機架上的位置。

官方文檔介紹了基本思路,雖然實現上介紹得不是太清楚,只要將輸入的ip轉換成”/rackNum”的形式即可。

https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/RackAwareness.html

Hadoop 副本放置策略的源碼閱讀和設置