HDFS怎樣檢測並刪除多余副本塊
前言
在HDFS中,每時每刻都在進行著大量block塊的創建和刪除操作,這些龐大的block塊構建起了這套復雜的分布式系統.普通block的讀寫刪除操作一般人都或多或少了解過一些,可是過量的副本清理機制是否有人知道呢,就是overReplicatedBlock的處理,針對過量的副本塊,HDFS怎麽處理,何時處理,處理的策略機制怎樣,本文就給大家分享HDFS在這方面的知識.
過量副本塊以及發生的場景
過量副本塊的意思通俗解釋就是集群中有A副本3個,滿足標準的3副本策略,可是此時發生了某種場景後,A副本塊突然變為5個了,為了達到副本塊的標準系數3個,系統就會進行多余2塊副本的清除動作,而這個清除動作就是本文所要重點描寫敘述的.過量副本塊的現象是比較好解釋的,那麽問題來了,究竟有哪些潛在的原因或條件會觸發多余副本塊的發生呢(在此指的是HDFS中)?
本人通過對HDFS源代碼的閱讀,總結出一下3點
ReCommission節點又一次上線.這類操作是運維操作引起的.節點下線操作會導致大量此節點的block塊在集群中大量拷貝,一旦此節點取消下線,之前已拷貝的大量塊必定會成為多余的副本塊.
人為又一次設置block replication副本數.還是以A副本舉例,A副本當前滿足標準副本數3個,此時用戶張三通過使用hdfs的API方法setReplication人為設置副本數為1.此時也會早A副本數多余2個的情況,即使說HDFS中的副本標準系數還是3個.
新增加的block塊記錄在系統中被丟失.這種可能相對於前2種case的情況,是內部因素造成.這些新增加的丟失的block塊記錄會在BlockManager進行再次掃描檢測,防止出現過量副本的現象.
OK,以上3種情形就是可能發生過量副本塊的原因.至於這3種情況是怎樣一步步的終於調用到處理多余副本塊的過程在後面的描寫敘述中會再給出,先來看下多余副本塊是怎樣被選出並處理掉的.
OverReplication多余副本塊處理
多余副本塊的處理分為2個子過程:
- 多余副本塊的選出
- 選出的多余副本塊的處理
我們從源代碼中進行一一尋找原因,首先副本塊的選出.
多余副本塊的選擇
進入blockManager的processOverReplicatedBlock方法,非常顯然,方法名已經表明了方法操作的本意了.
/**
* Find how many of the containing nodes are "extra", if any.
* If there are any extras, call chooseExcessReplicates() to
* mark them in the excessReplicateMap.
*/
private void processOverReplicatedBlock(final Block block, final short replication, final DatanodeDescriptor addedNode, DatanodeDescriptor delNodeHint) {
此方法的凝視的意思是找出存在”多余”的節點,假設他們是多余的,調用chooseExcessReplicates並標記他們,增加增加到excessReplicateMap中.以下進行細節的處理
// 節點列表變量的聲明
Collection<DatanodeStorageInfo> nonExcess = new ArrayList<DatanodeStorageInfo>();
// 從corruptReplicas變量中獲取是否存在壞的block所在的節點
Collection<DatanodeDescriptor> corruptNodes = corruptReplicas.getNodes(block);
繼續後面的處理
// 遍歷此過量副本塊所在的節點列表
for(DatanodeStorageInfo storage : blocksMap.getStorages(block, State.NORMAL)) {
final DatanodeDescriptor cur = storage.getDatanodeDescriptor();
...
LightWeightLinkedSet<Block> excessBlocks = excessReplicateMap.get(cur
.getDatanodeUuid());
// 假設在當前過量副本圖對象excessReplicateMap中不存在
if (excessBlocks == null || !excessBlocks.contains(block)) {
//而且所在節點不是已下線或下線中的節點
if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
// 而且這個副本塊不是損壞的副本塊
// exclude corrupt replicas
if (corruptNodes == null || !corruptNodes.contains(cur)) {
// 將此過濾副本塊的一個所在節點增加候選節點列表中
nonExcess.add(storage);
}
}
}
}
所以從這裏看出nonExcess對象事實上是一個候選節點的概念,將block副本塊所在的節點列表進行多種條件的再推斷和剔除.最後就調用到選擇終於過量副本塊節點的方法
chooseExcessReplicates(nonExcess, block, replication,
addedNode, delNodeHint, blockplacement);
進入chooseExcessReplicates方法
// first form a rack to datanodes map and
// 首先會形成機架對datanode節點的映射關系圖
BlockCollection bc = getBlockCollection(b);
final BlockStoragePolicy storagePolicy = storagePolicySuite.getPolicy(bc.getStoragePolicyID());
final List<StorageType> excessTypes = storagePolicy.chooseExcess(
replication, DatanodeStorageInfo.toStorageTypes(nonExcess));
// 初始化機架->節點列表映射圖對象
final Map<String, List<DatanodeStorageInfo>> rackMap
= new HashMap<String, List<DatanodeStorageInfo>>();
// 超過1個副本數的節點列表
final List<DatanodeStorageInfo> moreThanOne = new ArrayList<DatanodeStorageInfo>();
// 恰好1個副本數的節點列表
final List<DatanodeStorageInfo> exactlyOne = new ArrayList<DatanodeStorageInfo>();
為什麽要劃分不同節點列表的選擇呢.由於在這裏設計者做了優先選擇,在相同擁有多余副本塊的節點列表中,優先選擇節點中副本數多於1個的,其次再是副本數恰好有1個的節點.這個設計非常好理解,由於你上面的多余副本數很多其它嘛,我當然要先從多的開始刪.
// 節點劃分成相應2個集合
// split nodes into two sets
// moreThanOne contains nodes on rack with more than one replica
// exactlyOne contains the remaining nodes
replicator.splitNodesWithRack(nonExcess, rackMap, moreThanOne, exactlyOne);
進入劃分方法
public void splitNodesWithRack(
final Iterable<DatanodeStorageInfo> storages,
final Map<String, List<DatanodeStorageInfo>> rackMap,
final List<DatanodeStorageInfo> moreThanOne,
final List<DatanodeStorageInfo> exactlyOne) {
// 遍歷候選節點列表,形成機架->節點列表的相應關系
for(DatanodeStorageInfo s: storages) {
final String rackName = getRack(s.getDatanodeDescriptor());
List<DatanodeStorageInfo> storageList = rackMap.get(rackName);
if (storageList == null) {
storageList = new ArrayList<DatanodeStorageInfo>();
rackMap.put(rackName, storageList);
}
storageList.add(s);
}
以下給出的劃分算法
// split nodes into two sets
for(List<DatanodeStorageInfo> storageList : rackMap.values()) {
if (storageList.size() == 1) {
// exactlyOne contains nodes on rack with only one replica
// 假設機架中相應的節點數量僅僅有1個,則節點上副本數就為1,否則就為多個
exactlyOne.add(storageList.get(0));
} else {
// moreThanOne contains nodes on rack with more than one replica
moreThanOne.addAll(storageList);
}
}
上面劃分的原理應該是與相應的block副本存放策略原理相關,這個我到沒有細致去了解原因,讀者能夠自行閱讀相關BlockPlacementPolicy相關代碼進行了解.於是在這段代碼過後,節點組就被分為了2大類,exactlyOne和moreThanOne.至此chooseExcessReplicates的上半段代碼運行完畢,接下來看下半段代碼的運行過程
// 選擇一個待刪除的節點會偏向delNodeHintStorage的節點
// pick one node to delete that favors the delete hint
// 否在會從節點列表中選出一個可用空間最小
// otherwise pick one with least space from priSet if it is not empty
// otherwise one node with least space from remains
boolean firstOne = true;
final DatanodeStorageInfo delNodeHintStorage
= DatanodeStorageInfo.getDatanodeStorageInfo(nonExcess, delNodeHint);
final DatanodeStorageInfo addedNodeStorage
= DatanodeStorageInfo.getDatanodeStorageInfo(nonExcess, addedNode);
上面這3行凝視傳達出2個意思
- 能夠直接傳入要刪除的節點,假設能夠,則優選選擇傳入的delHint節點
- 在每一個節點的內部列表中,優選會選擇出可用空間最少的,這個也好理解,相同的副本數的節點列表中,當然要選擇可用空間盡可能少的,以便釋放出多的空間.
// 假設眼下過量副本所在節點數大於標準副本數,則進行循環移除
while (nonExcess.size() - replication > 0) {
final DatanodeStorageInfo cur;
// 推斷能否夠使用delNodeHintStorage節點進行取代
if (useDelHint(firstOne, delNodeHintStorage, addedNodeStorage,
moreThanOne, excessTypes)) {
cur = delNodeHintStorage;
} else { // regular excessive replica removal
// 否則進行常規的節點選擇
cur = replicator.chooseReplicaToDelete(bc, b, replication,
moreThanOne, exactlyOne, excessTypes);
}
推斷能否夠使用delNodeHintStorage節點的推斷邏輯這裏就忽略了,主要看一下關鍵的chooseReplicaToDelete方法,這個分支處理才是最經經常使用到的處理方式.
// 選擇的節點要麽是心跳時間最老的或者是可用空間最少的
// Pick the node with the oldest heartbeat or with the least free space,
// if all hearbeats are within the tolerable heartbeat interval
for(DatanodeStorageInfo storage : pickupReplicaSet(first, second)) {
if (!excessTypes.contains(storage.getStorageType())) {
continue;
}
first和second的節點選擇邏輯例如以下,非常的簡單
/**
* Pick up replica node set for deleting replica as over-replicated.
* First set contains replica nodes on rack with more than one
* replica while second set contains remaining replica nodes.
* So pick up first set if not empty. If first is empty, then pick second.
*/
protected Collection<DatanodeStorageInfo> pickupReplicaSet(
Collection<DatanodeStorageInfo> first,
Collection<DatanodeStorageInfo> second) {
return first.isEmpty() ? second : first;
}
在節點列表的每次叠代循環中會進行以下2個指標的對照
// 進行心跳時間的對照
if (lastHeartbeat < oldestHeartbeat) {
oldestHeartbeat = lastHeartbeat;
oldestHeartbeatStorage = storage;
}
// 進行可用空間的對照
if (minSpace > free) {
minSpace = free;
minSpaceStorage = storage;
}
然後進行選擇,優先選擇心跳時間最老的
final DatanodeStorageInfo storage;
if (oldestHeartbeatStorage != null) {
storage = oldestHeartbeatStorage;
} else if (minSpaceStorage != null) {
storage = minSpaceStorage;
} else {
return null;
}
然後進行以下2個操作
// 又一次進行rackMap對象關系的調整
// adjust rackmap, moreThanOne, and exactlyOne
replicator.adjustSetsWithChosenReplica(rackMap, moreThanOne,
exactlyOne, cur);
// 將選出的節點從候選節點列表中移除
nonExcess.remove(cur);
能夠說,到了這裏,多余副本塊所在節點就被選出了.
多余副本塊的處理
多余副本塊的處理就顯得非常easy了,反正目標對象以及所在節點已經找到了,增加到相應的對象中就可以.
// 增加到excessReplicateMap對象中
addToExcessReplicate(cur.getDatanodeDescriptor(), b);
//
// The ‘excessblocks‘ tracks blocks until we get confirmation
// that the datanode has deleted them; the only way we remove them
// is when we get a "removeBlock" message.
//
// The ‘invalidate‘ list is used to inform the datanode the block
// should be deleted. Items are removed from the invalidate list
// upon giving instructions to the namenode.
//
// 將此節點上的b block增加到無效節點中
addToInvalidates(b, cur.getDatanodeDescriptor());
增加到invalidates無效block列表後不久,此block就將被清除.
多余副本塊清除的場景調用
又一次回到之前提到過的多余副本塊的3大場景調用.有人可能會好奇我是怎麽找到這3種使用場景的,通過查看chooseExcessReplicates哪裏被調用就能夠了,例如以下圖所看到的
針對上述的5種調用情況,於是我總結了3種使用場景.以下一一進行對照.
場景1: ReCommission又一次上線過程
在方法processOverReplicatedBlocksOnReCommission中調用了清除過量副本塊的方法
/**
* Stop decommissioning the specified datanode.
* @param node
*/
@VisibleForTesting
public void stopDecommission(DatanodeDescriptor node) {
if (node.isDecommissionInProgress() || node.isDecommissioned()) {
// Update DN stats maintained by HeartbeatManager
hbManager.stopDecommission(node);
// Over-replicated blocks will be detected and processed when
// the dead node comes back and send in its full block report.
if (node.isAlive()) {
blockManager.processOverReplicatedBlocksOnReCommission(node);
}
// Remove from tracking in DecommissionManager
pendingNodes.remove(node);
decomNodeBlocks.remove(node);
} else {
LOG.trace("stopDecommission: Node {} in {}, nothing to do." +
node, node.getAdminState());
}
}
下線操作又一次恢復,必定要停止正在下線的動作,所以會在這種方法中進行調用.
場景2: SetReplication人為設置副本數
人為設置副本數是一個主動因素,調用的直接方法例如以下:
/** Set replication for the blocks. */
public void setReplication(final short oldRepl, final short newRepl,
final String src, final Block... blocks) {
if (newRepl == oldRepl) {
return;
}
// update needReplication priority queues
for(Block b : blocks) {
updateNeededReplications(b, 0, newRepl-oldRepl);
}
// 當設置的新的副本數值比原有的小的時候,須要進行過量副本的清除操作
if (oldRepl > newRepl) {
// old replication > the new one; need to remove copies
LOG.info("Decreasing replication from " + oldRepl + " to " + newRepl
+ " for " + src);
for(Block b : blocks) {
processOverReplicatedBlock(b, newRepl, null, null);
}
} else { // replication factor is increased
LOG.info("Increasing replication from " + oldRepl + " to " + newRepl
+ " for " + src);
}
}
這個API方法是能夠被外面的Client端程序調用觸發的.
場景3: 丟失新增加的block記錄信息
丟失新增加的block信息導致集群中存在多余的副本.官方的解釋是這種:
/*
* Since the BlocksMapGset does not throw the ConcurrentModificationException
* and supports further iteration after modification to list, there is a
* chance of missing the newly added block while iterating. Since every
* addition to blocksMap will check for mis-replication, missing mis-replication
* check for new blocks will not be a problem.
*/
由於存在丟失block信息的可能性,所以會開單獨的線程去又一次檢測是否存在過量副本的現象.
private void processMisReplicatesAsync() throws InterruptedException {
...
while (namesystem.isRunning() && !Thread.currentThread().isInterrupted()) {
int processed = 0;
namesystem.writeLockInterruptibly();
try {
while (processed < numBlocksPerIteration && blocksItr.hasNext()) {
BlockInfoContiguous block = blocksItr.next();
MisReplicationResult res = processMisReplicatedBlock(block);
...
場景4: 其它場景的檢測
其它場景有的時候也會調用到processOverReplicatedBlock的方法,但不是外界的因素導致,而是出於一種慎重性的考慮,比方在addStoredBlock,當新增加的塊被增加到blockMap中時,會再次進行塊的檢測.還有1種是在文件終於寫入完畢的時候,也會調用一次checkReplication此方法,來確認集群中沒有多余的相同塊的情況.這2種情況的調用如上圖所看到的,這裏就不放出詳細的代碼了,可見,HDFS的設計者在細節方面的處理真的是非常用心啊.
HDFS怎樣檢測並刪除多余副本塊