1. 程式人生 > >HDFS之DataNode啟動過程分析

HDFS之DataNode啟動過程分析

建立DataNode的入口DataNode.main(String[] args),主要的處理邏輯在方法【DataNode.createDataNode(String[] args, Configuration conf, SecureResources resources)】中,分別初始化DataNode和啟動DataNode的守護執行緒兩大步。
一.【DataNode.instantiateDataNode(String[] args,Configuration conf, SecureResources resources) 】初始化DataNode:
1.讀取"dfs.data.dir"配置引數,放入dataDirs:String[]中;
2.【DataNode.makeInstance(String[] dataDirs,Configuration conf, SecureResources resources)】
  2.1)【FileSystem.getLocal(Configuration conf)】初始化一個LocalFileSystem物件,即本地檔案系統物件;
  2.2)遍歷dataDirs:
      2.2.1)呼叫【DiskChecker.checkDir(LocalFileSystem localFS, Path dir, FsPermission expected)】:
        1)【DiskChecker.mkdirsWithExistsAndPermissionCheck(LocalFileSystem localFS,Path dir,FsPermission expected)】:建立${dfs.data.dir}目錄並賦上許可權;
        2)檢查此目錄是否有讀寫許可權以及是否為目錄,否則丟擲異常;
     2.2.2)對每個路徑初始化File物件,並存入dirs:ArrayList<File>佇列中;
  2.3)【DataNode.DataNode(Configuration conf, AbstractList<File> dataDirs, SecureResources resources)】,初始化DataNode物件;內部呼叫【DataNode.startDataNode(Configuration conf,AbstractList<File> dataDirs, SecureResources resources)】方法,具體實現步驟如下:
   2.3.1)獲取NameNode的訪問地址,並初始化為InetSocketAddress物件:nameNodeAddr;
        1)讀取配置檔案中的dfs.namenode.servicerpc-address引數的值;若有值則返回;否則進入第2步;
        2)讀取配置檔案中的dfs.namenode.rpc-address引數的值;若有值則返回;否則進入第3步;
        3)讀取配置檔案中的fs.default.name引數的值並返回;
   2.3.2)讀取配置檔案中的引數“dfs.socket.timeout”,“dfs.datanode.socket.write.timeout”,“dfs.datanode.transferTo.allowed”,“dfs.write.packet.size”等配置資訊;
   2.3.3)獲取DataNode的資料塊流的讀寫的地址,配置檔案中的引數dfs.datanode.address值或'dfs.datanode.bindAddress':'dfs.datanode.port'值
   2.3.4)【new DataStorage()】初始化DataNode.storage變數,主要用於管理資料目錄的類,完成格式化,升級,回滾等功能;
   2.3.5)讀取配置"slave.host.name“;若不為空,則賦值給DataNode.machineName:String;若為空,則讀取"dfs.datanode.dns.interface" 和 "dfs.datanode.dns.nameserver",此兩個配置預設均為default,通過【DNS.getDefaultHost(String strInterface, String nameserver)】生成DataNode.machineName變數的值;
   2.3.6)通過DataNode.machineName和第2.3.3步的port值 初始化DataNode.dnRegistration:DatanodeRegistration物件;
   2.3.7)呼叫RPC.waitForProxy(...)方法,通過nameNodeAddr網路地址獲取與NameNode通訊的客戶端DataNode.namenode;
   2.3.8)【DataNode.handshake()】通過RPC方式呼叫【NameNode.versionRequest()】,從NameNode獲取namespaceid,ctime資訊然後封裝成NamespaceInfo物件返回;在NameNode端呼叫【FSNamesystem.getNamespaceInfo()】
   2.3.9)【DataStorage.recoverTransitionRead(NamespaceInfo nsInfo, Collection<File> dataDirs, StartupOption startOpt)】
     2.3.9.1)遍歷形參dataDirs(在第2.2.2步中生成):
           1)根據集合中的File物件初始化StorageDirectory物件並存入Storage.storageDirs:List<StorageDirectory>中;
           2)【StorageDirectory.analyzeStorage(StartupOption startOpt)】檢查所有${dfs.data.dir}目錄下檔案的一致性,大致思路如下:
             2.1)檢測${dfs.data.dir}資料夾是否存在,是否有寫許可權;
             2.2)根據VERSION,previous,previous.tmp,finalized.tmp,lastcheckpoint.tmp檔案的存在與否來判斷檔案的狀態;
     2.3.9.2)遍歷Storage.storageDirs集合,呼叫【DataStorage.doTransition(StorageDirectory sd,NamespaceInfo nsInfo, StartupOption startOpt)】:
           1)如果啟動引數為“-rollback”,則呼叫【DataStorage.doRollback(StorageDirectory sd,NamespaceInfo nsInfo)】進行回滾,大致思路:刪掉current檔案,並將previous檔名改為current;
           2)讀取VERSION檔案的資訊;
     2.3.9.3)更新所有${dfs.data.dir}目錄下寫VERSION檔案資訊;
  2.3.10)【DatanodeRegistration.setStorageInfo(DataStorage storage)】將DataNode.storage變數設定到DatanodeRegistration.storageInfo和DatanodeRegistration.storageID中,此時storageID為"",在向NameNode註冊的時候分配;
  2.3.11)【FSDataset.FSDataset(DataStorage storage, Configuration conf)】生成FSDatasetInterface介面的實現類FSDataset,並賦值給DataNode.data變數,FSDataset類是所有資料塊讀寫的實際操作類
     2.3.11.1)讀取"dfs.datanode.numblocks"引數值,表示每個block塊的最大值;
     2.3.11.2)初始化FSVolume[]陣列,其中陣列大小為Storage.storageDirs集合中StorageDirectory的個數,即list.size()的值;
     2.3.11.3)遍歷Storage.storageDirs集合,呼叫【FSDataset.FSVolume.FSVolume(File currentDir, Configuration conf)】初始化FSVolume物件,並存入陣列FSVolume[]中;其中currentDir='${dfs.data.dir}/current'的File物件;FSVolume的成員初始化如下:
          1)【FSDataset.FSDir.FSDir(File dir)】初始化FSVolume.dataDir:FSDir,通過這個FSDir的初始化過程可以生成以${dfs.data.dir}/current為根目錄的樹形結構,結構中每個節點的資料成員如下:
              FSDir.dir 為當前檔案或資料夾的File物件;
              FSDir.children表示當前目錄下面存在的資料夾的File物件集合;
              FSDir.numBlocks表示當前目錄下面block檔案的個數;   
          2)FSVolume.currentDir是current檔案的File物件;
          3)FSVolume.detachDir是對'${dfs.data.dir}/detach'檔案生成的File物件;
          4)若存在detach資料夾,則呼叫【FSVolume.recoverDetachedBlocks(File dataDir, File dir)】,若detach裡面的檔案在current中不存在,則將此檔案恢復到current下面;若存在,則刪除detach下面的此檔案;
          5)若存在'${dfs.data.dir}/tmp'檔案,則刪除;
          6)若存在'${dfs.data.dir}/blocksBeingWritten'檔案,並且引數"dfs.durable.sync"為true;則呼叫【FSVolume.recoverBlocksBeingWritten(File bbw)】, 將blocksBeingWritten檔案下面的內容放入FSDataset.volumeMap和FSDataset.ongoingCreates中;若引數"dfs.durable.sync"為false,則直接刪除blocksBeingWritten檔案;
          7)建立detach,tmp,blocksBeingWritten檔案;
          8)初始化FSVolume.usage:DF 和FSVolume.dfsUsage:DU;DF和DU是用於在linux上執行shell命令"df"和"du"檢查磁碟空間等資訊(df命令可以獲取硬碟佔用了多少空間,還剩下多少空間;du是面向檔案的命令,檢視目錄大小),在NameNode.sendHeartbeat()的時候會用到;
     2.3.11.4)【FSVolumeSet(FSVolume[] volumes)】根據上一步中生成的FSVolume[]陣列初始化FSVolumeSet物件;並將此FSVolumeSet物件賦值給FSDataset.volumes變數;
     2.3.11.5)【FSVolumeSet.getVolumeMap(HashMap<Block,DatanodeBlockInfo> volumeMap)】生成HashMap<Block,DatanodeBlockInfo> volumeMap,將所有dfs.data.dir/current目錄下的所有block塊存入FSDataset.volumeMap中;{##註釋:current中block由blk_blockId和blk_blockId_GENSTAMP.meta兩個檔案組成##}
         1)遍歷FSVolume[]陣列,呼叫【FSVolume.getVolumeMap(HashMap<Block,DatanodeBlockInfo> volumeMap)】;內部遞迴呼叫【FSVolume.dataDir.getVolumeMap(HashMap<Block,DatanodeBlockInfo> volumeMap, FSVolume volume)】,以${dfs.data.dir}/current根節點開始遍歷整個目錄樹,將所有block塊存入FSDataset.volumeMap中;生成Block的邏輯如下:
          1.1)獲取generationStamp,此值是檔名blk_blockId_GENSTAMP.meta中GENSTAMP的值;
          1.2)根據blockId,blk_blockId檔案的長度,第1.1步中的generationStamp生成Block物件;
          1.3)根據FSVolume和blk_blockId檔案的File物件生成DatanodeBlockInfo物件;
          1.4)以Block物件為Key,DatanodeBlockInfo物件為Value,存入volumeMap中;
     2.3.11.6)啟動FSDataset.AsyncBlockReport執行緒,生成FSDataset.scan:遍歷FSVolume[]陣列,將所有的blk_blockId檔案存入AsyncBlockReport.scan:HashMap<Block, File>中;將此scan賦值給FSDataset.scan;
     2.3.11.7)以'${dfs.data.dir}/current'生成的File佇列初始化FSDatasetAsyncDiskService物件,並賦值給FSDataset.asyncDiskService變數;         
   2.3.12)初始化執行緒DataNode.dataXceiverServer:DataXceiverServer;
   2.3.13)初始化DataNode.ipcServer:Server,此Server註冊的是BlockTokenSecretManager類;
   2.3.13)初始化DataNode.blockScanner:DataBlockScanner物件;
   2.3.14)初始化執行緒池DataNode.readaheadPool:ReadaheadPool;
   2.3.15)初始化DataNode.infoServer:HttpServer服務,並開啟此HTTP服務;註冊Servlet包括:StreamFile,FileChecksumServlets.GetServlet,DataBlockScanner.Servlet;
  至此DataNode的初始化工作結束。
----------------------------------------------------       
二.啟動DataNode的第二大步就是啟動DataNode的守護執行緒【DataNode.runDatanodeDaemon(DataNode dn) 】,在此大步中要完成兩個工作:
1.【DataNode.register()】向NameNode註冊此DataNode:
   1.1)若DataNode.dnRegistration(在一.2.3.6和2.3.10步中建立的)的storageID為"",則建立一個storageID,規則為["DS-" + rand + "-"+ ip + "-" +dnRegistration.getPort() + "- +System.currentTimeMillis()];
   1.2)設定dnRegistration.name等於[machineName(一.2.3.5步生成的)+":"+dnRegistration.getPort()];
   1.3)【NameNode.register(DatanodeRegistration nodeReg)】通過RPC呼叫NameNode的register方法,把DataNode.dnRegistration物件傳遞給NameNode;內部呼叫【FSNamesystem.registerDatanode(DatanodeRegistration nodeReg)】完成DataNode的註冊工作:
       1.3.1)獲取此遠端DataNode的IP地址,通過FSNamesystem.hostsReader中的訪問或禁止列表檢查是否執行此DataNode訪問;
       1.3.2)用傳入的nodeReg物件更新DatanodeRegistration物件的name,infoPort,ipcPort欄位值;(感覺有點多餘了)
       1.3.3)根據dnRegistration.getStorageID()在FSNamesystem.datanodeMap中查詢nodeS:DatanodeDescriptor物件;
       1.3.4)根據dnRegistration.getName()在FSNamesystem.host2DataNodeMap中查詢nodeN:DatanodeDescriptor物件;
       1.3.5)若nodeN != null && nodeN != nodeS,則:
            1)從FSNamesystem.heartbeats中刪除nodeN物件;
            2)從FSNamesystem.blocksMap刪除nodeN物件中的所有block塊(即此DataNode上的所有block塊資訊);
            3)從網路拓撲圖FSNamesystem.clusterMap:NetworkTopology中刪除nodeN節點;
            4)從FSNamesystem.datanodeMap和FSNamesystem.host2DataNodeMap中刪除nodeN節點;
       1.3.6)若nodeS==null,則進入1.3.7至1.3.11步;若nodeS!=null則進入1.3.12至1.3.16步;
       1.3.7)【DatanodeDescriptor(DatanodeID nodeID, NetworkTopology.DEFAULT_RACK, String hostName)】初始化DatanodeDescriptor物件;
       1.3.8)【FSNamesystem.resolveNetworkLocation(DatanodeDescriptor node)】通過FSNamesystem.dnsToSwitchMapping物件(在建立NameNode時初始化)將DataNdoe的hostname轉換成網路路徑並設定給(DatanodeInfo)dnRegistration.location變數;此轉換的目的是便於生成一個DataNode的網路拓撲圖。
       1.3.9)【FSNamesystem.unprotectedAddDatanode(DatanodeDescriptor nodeDescr)】,新增FSNamesystem.datanodeMap和FSNamesystem.host2DataNodeMap內容;
             1)以StorageID為Key,dnRegistration物件為Value,存入FSNamesystem.datanodeMap中;
             2)以hostname為Key,DatanodeDescriptor物件為Value,存入FSNamesystem.host2DataNodeMap中;Host2NodesMap實質的資料結構是HashMap<String, DatanodeDescriptor[]>;
       1.3.10)【NetworkTopology.add(Node node)】將此DataNode節點新增到一個以DataNode的網路位置形成的網路拓撲樹狀圖(用FSNamesystem.clusterMap表示)中;此樹狀結構的葉子節點為DatanodeDescriptor物件,非葉子節點為InnerNode物件,表示轉換器Switch或機架Rack的路由器Router.具體邏輯參考部落格《http://blog.csdn.net/zqhxuyuan/article/details/10198639?reload》;
       1.3.11)將DatanodeDescriptor物件存入FSNamesystem.heartbeats中;作用待分析;
       1.3.12)從網路拓撲圖FSNamesystem.clusterMap:NetworkTopology中刪除nodeS節點;
       1.3.13)用傳入的nodeReg物件更新nodeS物件的name,infoPort,ipcPort欄位值;
       1.3.14)【FSNamesystem.resolveNetworkLocation(DatanodeDescriptor node)】:hostname轉換成網路路徑,見1.3.7步;
       1.3.15)【NetworkTopology.add(Node node)】,參考1.3.9步;
       1.3.16)將DatanodeDescriptor物件存入FSNamesystem.heartbeats中;作用待分析;
  1.4)若'dfs.durable.sync'為true,
       1.4.1)【FSDataset.getBlocksBeingWrittenReport()】獲取blocksBeingWritten目錄下面的所有Block封裝成Block[]陣列;
       1.4.2)【BlockListAsLongs.convertToArrayLongs(Block[] blockArray)】將Block[]陣列轉換成long[]陣列,利於傳輸;;轉換規則如下:
              1)long[]陣列的長度=Block[]陣列長度*3;
              2)long[index*3]=Block[index].BlockId
              3)long[index*3+1]=Block[index].NumBytes  
          4)long[index*3+2]=Block[index].GenerationStamp
       1.4.3)【NameNode.blocksBeingWrittenReport(DatanodeRegistration nodeReg, long[] blocks)】向NameNode報告正在寫入的檔案;
  1.5)【FSDataset.requestAsyncBlockReport()】喚醒FSDataset.AsyncBlockReport執行緒;


2.啟動DataNode執行緒,此執行緒的主要工作如下:
 2.1)啟動DataXceiverServer執行緒;此執行緒用於監聽其他DataNode或DFSClient客戶端的請求,用於傳送和接受block資料;
 2.2)啟動ipcServer服務;用於處理DFSClient的RPC呼叫請求;
 2.3)迴圈執行【DataNode.offerService()】;DataNode提供服務,定時傳送心跳給NameNode,響應NameNode返回的命令並執行。具體工作如下:
   2.3.1)間隔DataNode.heartBeatInterval時間向NameNode傳送心跳訊息;【NameNode.sendHeartbeat(DatanodeRegistration nodeReg, long capacity, long dfsUsed, long remaining, int xmitsInProgress, int xceiverCount) 】,其中capacity,dfsUsed,remaining是呼叫DF和DU中的方法得到的;
   2.3.2)【DataNode.processCommand(DatanodeCommand[] cmds)】在DataNode上根據第2.3.1步中NameNode對心跳資訊的返回結果執行相關操作;
   2.3.3)檢測是否有新的block到達,即DataNode.receivedBlockList佇列的大小大於0;若新block達到,則呼叫【NameNode.blockReceived(DatanodeRegistration nodeReg, Block[] blocks, String[] delHints)】,在此方法中將Block對應的DataNode資訊放入對應的BlockInfo.Object[]中,即完善存放此Block的DataNode資訊(NameNode記憶體中目錄樹結構的葉子節點INode物件上BlockInfo的成員變數Object[];
   2.3.4)間隔DataNode.blockReportInterval時間向NameNode傳送block報告;    
      2.3.4.1)【FSDataset.retrieveAsyncBlockReport()】內部呼叫FSDataset.reconcileRoughBlockScan(HashMap<Block,File> seenOnDisk) 其中seenOnDisk= FSDataset.scan;找到需要報告的block塊;
      2.3.4.2)【BlockListAsLongs.convertToArrayLongs(Block[] blockArray)】將Block[]陣列轉換成long[]陣列;轉換規則如下:
              1)long[]陣列的長度=Block[]陣列長度*3;
              2)long[index*3]=Block[index].BlockId
              3)long[index*3+1]=Block[index].NumBytes  
          4)long[index*3+2]=Block[index].GenerationStamp
      2.3.4.2)【NameNode.blockReport(DatanodeRegistration nodeReg, long[] blocks)】,NameNode內部呼叫【FSNamesystem.processReport(DatanodeID nodeID,
 BlockListAsLongs newReport)】,具體實現如下:
          2.3.4.2.1)【FSNamesystem.getDatanode(DatanodeID nodeID) 】(DatanodeRegistration為DatanodeID的子類);根據DatanodeRegistration物件的StorageID值在FSNamesystem.datanodeMap中查詢DatanodeDescriptor物件:node;
          2.3.4.2.2)【node.reportDiff(BlocksMap blocksMap, BlockListAsLongs newReport, Collection<Block> toAdd, Collection<Block> toRemove,Collection<Block> toInvalidate)】
                 1)遍歷newReport即Block[]陣列轉換成的long[]陣列;
                   1.1)根據傳來的long[]生成Block物件block,在FSNamesystem.blocksMap中根據block查詢BlockInfo物件storedBlock;若storedBlock==null,則以long[index*3],long[index*3+1],generationStamp=1為引數初始化Block物件,然後在FSNamesystem.blocksMap中查詢storedBlock物件;
                   1.2)若storedBlock==null,則將Block物件放入toInvalidate集合中;繼續遍歷下一組long[];
                   1.3)【BlockInfo.findDatanode(DatanodeDescriptor dn)】查詢當前DatanodeDescriptor物件是否在blockinfo(第a.步查詢到的)的Object[]陣列的Object[index*3]中;若不在:
                     1.3.1)若storedBlock.getNumBytes()等於新生成的block.getNumBytes(),則將storedBlock放入toAdd集合中;
                     1.3.2)若不相等,則將新生成的block物件放入toAdd集合中;
                     1.3.3)繼續遍歷下一組long[];
                   1.4)【DatanodeDescriptor.moveBlockToHead(BlockInfo b)】:若當前的DatanodeDescriptor物件已經存入blockinfo的Object[]陣列Object[index*3]中;則將該blockinfo調整為此DataNode上由BlockInfo構成的雙向連結串列的頭節點;並將DatanodeDescriptor.blockList指向此blockinfo,用於指向連結串列的頭節點;繼續遍歷下一組long[];
                 2)遍歷此DataNode上的雙向連結串列,依次往下遍歷讀取BlockInfo物件storedBlock,若storedBlock.inode==null或storedBlock.inode.isUnderConstruction()=false,則將此storedBlock物件放入toRemove集合中;
          2.3.4.2.3)遍歷toRemove集合,呼叫【FSNamesystem.removeStoredBlock(Block block,DatanodeDescriptor node)】從FSNamesystem.blocksMap中刪除此block
          2.3.4.2.4)遍歷toAdd集合,呼叫【FSNamesystem.addStoredBlock(Block block,DatanodeDescriptor node, DatanodeDescriptor delNodeHint)】
                 1)從BlocksMap.getStoredBlock(Block b)中獲取BlockInfo類的物件storedBlock;
                 2)【node.addBlock(BlockInfo b)】將BlockInfo放入此DataNode的雙向連結串列的頭節點中,具體實現如下:
                     2.1)將DatanodeDescriptor物件存入BlockInfo物件的Object[lastindex*3]中;將Object[lastindex*3+1]和Object[lastindex*3+2]設定成null;
                     2.2)將此BlockInfo物件的Object[lastindex*3+2]設定為DatanodeDescriptor.blockList,在第一次加入此DataNode的雙向連結串列時blockList=null;
                     2.3)將此BlockInfo物件賦值給DatanodeDescriptor.blockList;
                 3)餘下的工作是對正在建立的檔案的處理,未分析;
          2.3.4.2.5)遍歷toInvalidate集合,將toInvalidate集合內容放入FSNamesystem.recentInvalidateSets中;
          2.3.4.2.6)設定DatanodeDescriptor.firstBlockReport=false;  
   2.3.5)【DataNode.processCommand(DatanodeCommand cmd)】在DataNode上根據第2.3.4步中NameNode對block報告的返回結果執行相關操作;