1. 程式人生 > >hdfs 機架感知

hdfs 機架感知

stats rop name stat 設計時 const val p地址 ont

一、背景

分布式的集群通常包含非常多的機器,由於受到機架槽位和交換機網口的限制,通常大型的分布式集群都會跨好幾個機架,由多個機架上的機器共同組成一個分布式集群。機架內的機器之間的網絡速度通常都會高於跨機架機器之間的網絡速度,並且機架之間機器的網絡通信通常受到上層交換機間網絡帶寬的限制。

Hadoop在設計時考慮到數據的安全與高效,數據文件默認在HDFS上存放三份,存儲策略為:

第一個block副本放在客戶端所在的數據節點裏(如果客戶端不在集群範圍內,則從整個集群中隨機選擇一個合適的數據節點來存放)。

第二個副本放置在與第一個副本所在節點相同機架內的其它數據節點上

第三個副本放置在不同機架的節點上

這樣如果本地數據損壞,節點可以從同一機架內的相鄰節點拿到數據,速度肯定比從跨機架節點上拿數據要快;
同時,如果整個機架的網絡出現異常,也能保證在其它機架的節點上找到數據。
為了降低整體的帶寬消耗和讀取延時,HDFS會盡量讓讀取程序讀取離它最近的副本。
如果在讀取程序的同一個機架上有一個副本,那麽就讀取該副本。
如果一個HDFS集群跨越多個數據中心,那麽客戶端也將首先讀本地數據中心的副本。
那麽Hadoop是如何確定任意兩個節點是位於同一機架,還是跨機架的呢?答案就是機架感知。

默認情況下,hadoop的機架感知是沒有被啟用的。所有的機器hadoop都默認在同一個默認的機架下,名為 “/default-rack”,這種情況下,任何一臺datanode機器,不管物理上是否屬於同一個機架,都會被認為是在同一個機架下,此時,就很容易出現增添機架間網絡負載的情況。因為此時hadoop集群的HDFS在選機器的時候,是隨機選擇的,也就是說,
很有可能在寫數據時,hadoop將第一塊數據block1寫到了rack1上,然後隨機的選擇下將block2寫入到了rack2下,
此時兩個rack之間產生了數據傳輸的流量,再接下來,在隨機的情況下,又將block3重新又寫回了rack1,此時,兩個rack之間又產生了一次數據流量。


在job處理的數據量非常的大,或者往hadoop推送的數據量非常大的時候,這種情況會造成rack之間的網絡流量成倍的上升,成為性能的瓶頸,
進而影響作業的性能以至於整個集群的服務。

二、配置

默認情況下,namenode啟動時候日誌是這樣的:
INFO org.apache.hadoop.net.NetworkTopology: Adding a new node: /default-rack/ 172.16.145.35:50010
每個IP 對應的機架ID都是 /default-rack ,說明hadoop的機架感知沒有被啟用。
要將hadoop機架感知的功能啟用,配置非常簡單,在 NameNode所在節點的/etc/hadoop/conf下的core-site.xml配置文件中配置一個選項:

<property> 
<name>topology.script.file.name</name> 
<value>/etc/hadoop/conf/RackAware.py</value> 
</property>

  


這個配置選項的value指定為一個可執行程序,通常為一個腳本,該腳本接受一個參數,輸出一個值。
接受的參數通常為某臺datanode機器的ip地址,而輸出的值通常為該ip地址對應的datanode所在的rack,例如”/rack1”。
Namenode啟動時,會判斷該配置選項是否為空,如果非空,則表示已經啟用機架感知的配置,此時namenode會根據配置尋找該腳本,
並在接收到每一個datanode的heartbeat時,將該datanode的ip地址作為參數傳給該腳本運行,並將得到的輸出作為該datanode所屬的機架ID,保存到內存的一個map中.

至於腳本的編寫,就需要將真實的網絡拓樸和機架信息了解清楚後,通過該腳本能夠將機器的ip地址和機器名正確的映射到相應的機架上去。
一個簡單的實現如下:

#!/usr/bin/python
#-*-coding:UTF-8 -*-
import sys

rack = {"NN01":"rack2",
        "NN02":"rack3",
        "DN01":"rack4",
        "DN02":"rack4",
        "DN03":"rack1",
        "DN04":"rack3",
        "DN05":"rack1",
        "DN06":"rack4",
        "DN07":"rack1",
        "DN08":"rack2",
        "DN09":"rack1",
        "DN10":"rack2",
        "172.16.145.32":"rack2",
        "172.16.145.33":"rack3",
        "172.16.145.34":"rack4",
        "172.16.145.35":"rack4",
        "172.16.145.36":"rack1",
        "172.16.145.37":"rack3",
        "172.16.145.38":"rack1",
        "172.16.145.39":"rack4",
        "172.16.145.40":"rack1",
        "172.16.145.41":"rack2",
        "172.16.145.42":"rack1",
        "172.16.145.43":"rack2",
        }

if __name__=="__main__":
    print "/" + rack.get(sys.argv[1],"rack0")

  這樣配置後,namenode啟動時候日誌是這樣的:
INFO org.apache.hadoop.net.NetworkTopology: Adding a new node: /rack4/ 172.16.145.35:50010
說明hadoop的機架感知已經被啟用了。
查看HADOOP機架信息命令: hdfs dfsadmin -printTopology

[[email protected] hadoop-hdfs]$ hdfs  dfsadmin  -printTopology
Rack: /rack1
   172.16.145.36:50010 (DN03)
   172.16.145.38:50010 (DN05)
   172.16.145.40:50010 (DN07)
   172.16.145.42:50010 (DN09)
   172.16.145.44:50010 (DN11)
   172.16.145.54:50010 (DN17)
   172.16.145.56:50010 (DN19)
   172.16.145.58:50010 (DN21)

Rack: /rack2
   172.16.145.41:50010 (DN08)
   172.16.145.43:50010 (DN10)
   172.16.145.45:50010 (DN12)
   172.16.145.60:50010 (DN23)
   172.16.145.62:50010 (DN25)

Rack: /rack3
   172.16.145.37:50010 (DN04)
   172.16.145.51:50010 (DN14)
   172.16.145.53:50010 (DN16)
   172.16.145.55:50010 (DN18)
   172.16.145.57:50010 (DN20)

Rack: /rack4
   172.16.145.34:50010 (DN01)
   172.16.145.35:50010 (DN02)
   172.16.145.39:50010 (DN06)
   172.16.145.50:50010 (DN13)
   172.16.145.52:50010 (DN15)
   172.16.145.59:50010 (DN22)
   172.16.145.61:50010 (DN24)

hdfs 三個副本的這種存放策略減少了機架間的數據傳輸,提高了寫操作的效率。機架的錯誤遠遠比節點的錯誤少,所以這種策略不會影響到數據的可靠性和可用性。與此同時,因為數據塊只存放在兩個不同的機架上,所以此策略減少了讀取數據時需要的網絡傳輸總帶寬。在這種策略下,副本並不是均勻的分布在不同的機架上:三分之一的副本在一個節點上,三分之二的副本在一個機架上,其它副本均勻分布在剩下的機架中,這種策略在不損害數據可靠性和讀取性能的情況下改進了寫的性能。

三、網絡拓撲機器之間的距離

這裏基於一個網絡拓撲案例,介紹在復雜的網絡拓撲中hadoop集群每臺機器之間的距離

技術分享

有了機架感知,NameNode就可以畫出上圖所示的datanode網絡拓撲圖。D1,R1都是交換機,最底層是datanode。則H1的rackid=/D1/R1/H1,H1的parent是R1,R1的是D1。這些rackid信息可以通過topology.script.file.name配置。有了這些rackid信息就可以計算出任意兩臺datanode之間的距離。

distance(/D1/R1/H1,/D1/R1/H1)=0  相同的datanode
distance(/D1/R1/H1,/D1/R1/H2)=2  同一rack下的不同datanode
distance(/D1/R1/H1,/D1/R1/H4)=4  同一IDC下的不同datanode
distance(/D1/R1/H1,/D2/R3/H7)=6  不同IDC下的datanode

四、如何判斷是否是合適的數據節點

上面說到如果客戶端是數據節點,則會把正在寫入的數據的一個副本保存在這個客戶端的數據節點上。我們把它看做是本地節點。但是如果這個客戶端上的數據節點空間不足或者是當前負載過重,則應該從該數據節點所在的機架中選擇一個合適的數據節點作為此時這個數據塊的本地節點。另外,如果客戶端上沒有一個數據節點的話,則從整個集群中隨機選擇一個合適的數據節點作為此時這個數據塊的本地節點。那麽,如何判定一個數據節點合不合適呢,通過查看源碼知道它是通過isGoodTarget方法來確定的:

private boolean isGoodTarget(DatanodeStorageInfo storage,
                               long blockSize, int maxTargetPerRack,
                               boolean considerLoad,
                               List<DatanodeStorageInfo> results,
                               boolean avoidStaleNodes,
                               StorageType storageType) {
    if (storage.getStorageType() != storageType) {
      logNodeIsNotChosen(storage,
          "storage types do not match, where the expected storage type is "
              + storageType);
      return false;
    }
    if (storage.getState() == State.READ_ONLY_SHARED) {
      logNodeIsNotChosen(storage, "storage is read-only");
      return false;
    }
    DatanodeDescriptor node = storage.getDatanodeDescriptor();
    // check if the node is (being) decommissioned   //判斷節點是否退役(不可用)
    if (node.isDecommissionInProgress() || node.isDecommissioned()) {
      logNodeIsNotChosen(storage, "the node is (being) decommissioned ");
      return false;
    }

    if (avoidStaleNodes) {
      if (node.isStale(this.staleInterval)) {
        logNodeIsNotChosen(storage, "the node is stale ");
        return false;
      }
    }
    
    final long requiredSize = blockSize * HdfsConstants.MIN_BLOCKS_FOR_WRITE;
    final long scheduledSize = blockSize * node.getBlocksScheduled();
	//節點磁盤剩余空間夠不夠
    if (requiredSize > storage.getRemaining() - scheduledSize) {
      logNodeIsNotChosen(storage, "the node does not have enough space ");
      return false;
    }

    // check the communication traffic of the target machine
    if (considerLoad) {
      final double maxLoad = 2.0 * stats.getInServiceXceiverAverage();
      final int nodeLoad = node.getXceiverCount();
	  //節點當前的負載情況
      if (nodeLoad > maxLoad) {
        logNodeIsNotChosen(storage,
            "the node is too busy (load:"+nodeLoad+" > "+maxLoad+") ");
        return false;
      }
    }
      
    // check if the target rack has chosen too many nodes
    String rackname = node.getNetworkLocation();
    int counter=1;
    for(DatanodeStorageInfo resultStorage : results) {
      if (rackname.equals(
          resultStorage.getDatanodeDescriptor().getNetworkLocation())) {
        counter++;
      }
    }
	// 該節點所在的機架被選擇存放當前數據塊副本的數據節點過多
    if (counter>maxTargetPerRack) {
      logNodeIsNotChosen(storage, "the rack has too many chosen nodes ");
      return false;
    }
    return true;
  }

  

hdfs 機架感知