Hadoop 副本放置策略的源碼閱讀和設置
大多數的叫法都是副本放置策略,實質上是HDFS對所有數據的位置放置策略,並非只是針對數據的副本。因此Hadoop的源碼裏有block replicator(configuration)、 BlockPlacementPolicy(具體邏輯源碼)兩種叫法。
主要用途:上傳文件時決定文件在HDFS上存儲的位置(具體到datanode上的具體存儲介質,如具體到存儲在哪塊硬盤);rebalance、datanode退出集群、副本數量更改等導致數據移動的操作中,數據移動的具體位置。
BlockPlacementPolicy
BlockPlacementPolicy 作為虛基類提供了基本的接口,具體的子類重點實現下面 選擇副本 、 驗證副本放置是否滿足要求 、 選擇能夠刪除的副本 三個函數:
/**
* 核心的副本放置策略實現,返回副本放置數量的存儲位置
* **如果有效節點數量不夠(少於副本數),返回盡可能多的節點,而非失敗**
*
* @param srcPath 上傳文件的路徑
* @param numOfReplicas 除下面chosen參數裏已經選擇的datanode,還需要的副本數量
* @param writer 寫數據的機器, null if not in the cluster. 一般用於放置第一個副本以降低網絡通信
* @param chosen 已經選擇的節點
* @param returnChosenNodes 返回結果裏是否包含chosen的datanode
* @param excludedNodes 不選的節點
* @param blocksize 塊大小
* @return 排序好的選擇結果
*/
public abstract DatanodeStorageInfo[] chooseTarget(String srcPath,
int numOfReplicas,
Node writer,
List<DatanodeStorageInfo> chosen,
boolean returnChosenNodes,
Set<Node> excludedNodes,
long blocksize,
BlockStoragePolicy storagePolicy);
/**
* 判斷傳入的放置方式是否符合要求
*/
abstract public BlockPlacementStatus verifyBlockPlacement(
DatanodeInfo[] locs, int numOfReplicas);
/**
* 當副本數量較多時,選擇需要刪除的節點
*/
abstract public List<DatanodeStorageInfo> chooseReplicasToDelete(
Collection<DatanodeStorageInfo> candidates, int expectedNumOfReplicas,
List<StorageType> excessTypes, DatanodeDescriptor addedNode,
DatanodeDescriptor delNodeHint);
Hadoop 提供的 BlockPlacementPolicy 實現
Hadoop提供了BlockPlacementPolicyDefault、BlockPlacementPolicyWithNodeGroup、AvailableSpaceBlockPlacementPolicy三種實現(hadoop 2.7.7)。
其中BlockPlacementPolicyDefault是經典三副本策略的實現:第一個副本盡可能放在寫入數據的節點,第二個副本放在與第一個副本不在同一機架下的節點,第三個副本與第二副本放在同一個機架。
通過改變dfs.block.replicator.classname
能夠選擇具體的實現類,默認值為org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault
。(Hadoop 2.7.7下,貌似不同版本的Hadoop的命名還不一樣,而且2.7.7默認的配置文件裏還沒有,需要在源碼中查)
BlockPlacementPolicyDefault 源碼閱讀
public abstract DatanodeStorageInfo[] chooseTarget(String srcPath,
int numOfReplicas,
Node writer,
List<DatanodeStorageInfo> chosen,
boolean returnChosenNodes,
Set<Node> excludedNodes,
long blocksize,
BlockStoragePolicy storagePolicy);
chooseTarget函數實現了具體的三副本策略。各種特殊情況(如只有1個副本、datanode數量不夠、集群拓撲不滿足要求等)的考慮讓代碼看起來比較復雜,常規情況直接跟著調試代碼走會跳過很多異常處理部分,便於裂解正常流程。
在副本的選擇上用了各種帶chooseTarget函數,註意有幾個函數結果是通過參數傳出而不是返回值。
主要實現思路:
- 各種變量初始化
- 考慮favoredNodes的放置
- 除滿足條件的favoredNodes後的副本放置策略(三副本)
- 結果排序
首先
srcPath沒有被考慮,被直接舍棄:
return chooseTarget(numOfReplicas, writer, chosenNodes, returnChosenNodes,
excludedNodes, blocksize, storagePolicy, flags); // ignore srcPath
因此默認的副本放置策略,在同一文件包含多個block時,每個block的存儲位置獨立考慮,並非存儲在同一datanode。
處理favoredNodes
上傳文件時可以指定favoredNodes(默認為空),首先對favoredNodes所在的節點判斷是否合適。如果滿足條件的節點數還低於副本數,則添加新的副本。
// --------------Choose favored nodes ---------------
// 從favored nodes中選擇,在上傳文件時可以指定
List<DatanodeStorageInfo> results = new ArrayList<>();
boolean avoidStaleNodes = stats != null
&& stats.isAvoidingStaleDataNodesForWrite();
int maxNodesAndReplicas[] = getMaxNodesPerRack(0, numOfReplicas);
numOfReplicas = maxNodesAndReplicas[0];
int maxNodesPerRack = maxNodesAndReplicas[1];
chooseFavouredNodes(src, numOfReplicas, favoredNodes,
favoriteAndExcludedNodes, blocksize, maxNodesPerRack, results,
avoidStaleNodes, storageTypes);
// ---------------如果滿足要求的favored nodes數量不足-----------
if (results.size() < numOfReplicas) {
// Not enough favored nodes, choose other nodes, based on block
// placement policy (HDFS-9393).
numOfReplicas -= results.size();
for (DatanodeStorageInfo storage : results) {
// add localMachine and related nodes to favoriteAndExcludedNodes
addToExcludedNodes(storage.getDatanodeDescriptor(),
favoriteAndExcludedNodes);
}
DatanodeStorageInfo[] remainingTargets =
chooseTarget(src, numOfReplicas, writer,
new ArrayList<DatanodeStorageInfo>(numOfReplicas), false,
favoriteAndExcludedNodes, blocksize, storagePolicy, flags);
for (int i = 0; i < remainingTargets.length; i++) {
results.add(remainingTargets[i]);
}
}
三副本選擇
實現邏輯在 chooseTargetInOrder(…) 函數中
// 第一個副本的選擇
if (numOfResults == 0) {
writer = chooseLocalStorage(writer, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes, storageTypes, true)
.getDatanodeDescriptor();
if (--numOfReplicas == 0) {
return writer;
}
}
// 選擇與第一個副本不在同一Rack下的第二個副本
final DatanodeDescriptor dn0 = results.get(0).getDatanodeDescriptor();
if (numOfResults <= 1) {
chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,
results, avoidStaleNodes, storageTypes);
if (--numOfReplicas == 0) {
return writer;
}
}
// 第三個副本
if (numOfResults <= 2) {
final DatanodeDescriptor dn1 = results.get(1).getDatanodeDescriptor();
// 第一、二副本在同一Rack下時選第三個副本
// (前面的favoredNodes以及集群條件可能造成這種情況)
if (clusterMap.isOnSameRack(dn0, dn1)) {
chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,
results, avoidStaleNodes, storageTypes);
} else if (newBlock){ // 正常情況,第二副本的localRack下選第三副本
chooseLocalRack(dn1, excludedNodes, blocksize, maxNodesPerRack,
results, avoidStaleNodes, storageTypes);
} else { // 其它的以外
chooseLocalRack(writer, excludedNodes, blocksize, maxNodesPerRack,
results, avoidStaleNodes, storageTypes);
}
if (--numOfReplicas == 0) {
return writer;
}
}
// 如果副本數量還沒到0,剩下的副本隨機選擇
chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes, storageTypes);
return writer;
再到具體的選擇
選擇具體的存儲位置被上面包裝到了 chooseRemoteRack 和 chooseLocalRack 兩個函數。
實際調用時只是 chooseRandom 函數,在限定的rack下選擇一個隨機的節點。
源碼閱讀的幾個註意
代碼在直接閱讀時各種跳,但主線思路比較明確。主要帶來閱讀困難的位置:
- 很多函數調用不是通過返回值傳出結果,而是通過參數。
- 註意某些if後的return會直接返回結果,後面的代碼不會被調用。
- 遞歸的形式多次調用同一個函數以選擇多個副本。
- 很多代碼為了避免一些特殊情況,可以暫時略過(如catch裏的異常處理)。
修改HDFS默認的副本放置機制
可以選擇直接復制或繼承BlockPlacementPolicyDefault的實現,或者直接繼承BlockPlacementPolicy類編寫對應的接口具體實現。
將編譯好的jar包放入$HADOOP_PREFIX/share/hadoop/common
下(或者其它的Hadoop jar包路徑)。
改變dfs.block.replicator.classname
為上面的實現類,要帶包的名稱。
RackAwareness 機架感知
Hadoop 並不能自動檢測集群的機架狀態,而是要預先設置機架的狀態,通過腳本或java類將datanode的ip轉換成具體的機架上的位置。
官方文檔介紹了基本思路,雖然實現上介紹得不是太清楚,只要將輸入的ip轉換成”/rackNum”的形式即可。
https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-common/RackAwareness.html
Hadoop 副本放置策略的源碼閱讀和設置