Giraph 原始碼分析(五)—— 載入資料+同步總結
作者|白松
關於Giraph 共有九個章節,本文第五個章節。
環境:在單機上(機器名:giraphx)啟動了2個workers。
輸入:SSSP資料夾,裡面有1.txt和2.txt兩個檔案。
1、在Worker向Master彙報健康狀況後,就開始等待Master建立InputSplit。
方法:每個Worker通過檢某個Znode節點是否存在,同時在此Znode上設定Watcher。若不存在,就通過BSPEvent的waitForever()方法釋放當前執行緒的鎖,陷入等待狀態。一直等到master建立該znode。此步驟位於BSPServiceWorker類中的startSuperStep方法中,等待程式碼如下:
2、Master呼叫createInputSplits()方法建立InputSplit。
在generateInputSplits()方法中,根據使用者設定的VertexInputFormat獲得InputSplits。程式碼如下:
其中minSplitCountHint為建立split的最小數目,其值如下:
minSplitCountHint = Workers數目 * NUM_INPUT_THREADS
NUM_INPUT_THREADS表示 每個Input split loading的執行緒數目,預設值為1 。 經查證,在TextVertexValueInputFormat抽象類中的getSplits()方法中的minSplitCountHint引數被忽略。使用者輸入的VertexInputFormat繼承TextVertexValueInputFormat抽象類。
如果得到的splits.size小於minSplitCountHint,那麼有些worker就沒被用上。
得到split資訊後,要把這些資訊寫到Zookeeper上,以便其他workers訪問。上面得到的split資訊如下:
[hdfs://giraphx:9000/user/root/SSSP/1.txt:0+66, hdfs://giraphx:9000/user/root/SSSP/2.txt:0+46]
遍歷splits List,為每個split建立一個Znode,值為split的資訊。如為split-0建立Znode,值為:hdfs://giraphx:9000/user/root/SSSP/1.txt:0+66
/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDir/0
為split-1建立znode(如下),值為:hdfs://giraphx:9000/user/root/SSSP/2.txt:0+46
/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDir/1
最後建立znode: /_hadoopBsp/job_201404102333_0013/_vertexInputSplitsAllReady 表示所有splits都建立好了。
3、Master根據splits建立Partitions。首先確定partition的數目。
BSPServiceMaster中的MasterGraphPartitioner<I.V,E,M>物件預設為HashMasterPartitioner。它的createInitialPartitionOwners()方法如下:
上面程式碼中是在工具類PartitionUtils計算Partition的數目,計算公式如下:
partitionCount=PARTITION_COUNT_MULTIPLIER * availableWorkerInfos.size() * availableWorkerInfos.size() ,其中PARTITION_COUNT_MULTIPLIER表示Multiplier for the current workers squared,預設值為1 。
可見,partitionCount值為4(122)。建立的partitionOwnerList資訊如下:
[(id=0,cur=Worker(hostname=giraphx, MRtaskID=1, port=30001),prev=null,ckpt_file=null),
(id=1,cur=Worker(hostname=giraphx, MRtaskID=2, port=30002),prev=null,ckpt_file=null),
(id=2,cur=Worker(hostname=giraphx, MRtaskID=1, port=30001),prev=null,ckpt_file=null),
(id=3,cur=Worker(hostname=giraphx, MRtaskID=2, port=30002),prev=null,ckpt_file=null)]
4、Master建立Znode:/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_partitionExchangeDir,用於後面的exchange partition。
5、Master最後在assignPartitionOwners()方法中
把masterinfo,chosenWorkerInfoList,partitionOwners等資訊寫入Znode中(作為Znode的data),該Znode的路徑為: /_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_addressesAndPartitions 。
Master呼叫barrierOnWorkerList()方法開始等待各個Worker完成資料載入。呼叫關係如下:
barrierOnWorkerList中建立znode,path=/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDoneDir 。然後檢查該znode的子節點數目是否等於workers的數目,若不等於,則執行緒陷入等待狀態。後面某個worker完成資料載入後,會建立子node(如 /_hadoopBsp/job_201404102333_0013/_vertexInputSplitDoneDir/giraphx_1)來啟用該執行緒繼續判斷。
6、當Master建立第5步的znode後,會啟用worker。
每個worker從znode上讀出data,data包含masterInfo,WorkerInfoList和partitionOwnerList,然後各個worker開始載入資料。
把partitionOwnerList複製給BSPServiceWorker類中的workerGraphPartitioner(預設為HashWorkerPartitioner型別)物件的partitionOwnerList變數,後續每個頂點把根據vertexID通過workerGraphPartitioner物件獲取其對應的partitionOwner。
每個Worker從znode: /_hadoopBsp/job_201404102333_0013/_vertexInputSplitDir獲取子節點,得到inputSplitPathList,內容如下:
[/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDir/1,
/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDir/0]
然後每個Worker建立N個InputsCallable執行緒讀取資料。N=Min(NUM_INPUT_THREADS,maxInputSplitThread),其中NUM_INPUT_THREADS預設值為1,maxInputSplitThread=(InputSplitSize-1/maxWorkers +1
那麼,預設每個worker就是建立一個執行緒來載入資料。
在InputSplitsHandler類中的reserveInputSplit()方法中,每個worker都是遍歷inputSplitPathList,通過建立znode來保留(標識要處理)的split。程式碼及註釋如下:
當用reserveInputSplit()方法獲取某個znode後,loadSplitsCallable類的loadInputSplit方法就開始通過該znode獲取其HDFS的路徑資訊,然後讀入資料、重分佈資料。
VertexInputSplitsCallable類的readInputSplit()方法如下:
7、每個worker載入完資料後,呼叫waitForOtherWorkers()方法等待其他workers都處理完split。
策略如下,每個worker在/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDoneDir目錄下建立子節點,後面追加自己的worker資訊,如worker1、worker2建立的子節點分別如下:
/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDoneDir/giraphx_1
/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDoneDir/giraphx_2
建立完後,然後等待master建立/_hadoopBsp/job_201404102333_0013/_vertexInputSplitsAllDone。
8、從第5步驟可知,若master發現/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDoneDir下的子節點數目等於workers的總數目,就會在coordinateInputSplits()方法中建立
_hadoopBsp/job_201404102333_0013/_vertexInputSplitsAllDone,告訴每個worker,所有的worker都處理完了split。
9、最後就是就行全域性同步。
master建立znode,path=/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_workerFinishedDir ,然後再呼叫barrierOnWorkerList方法檢查該znode的子節點數目是否等於workers的數目,若不等於,則執行緒陷入等待狀態。等待worker建立子節點來啟用該執行緒繼續判斷。
每個worker獲取自身的Partition Stats,進入finishSuperStep方法中,等待所有的Request都被處理完;把自身的Aggregator資訊傳送給master;建立子節點,如/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_workerFinishedDir/giraphx_1,data為該worker的partitionStatsList和workerSentMessages統計量;
最後呼叫waitForOtherWorkers()方法等待master建立/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_superstepFinished 節點。
master發現/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_workerFinishedDir的子節點數目等於workers數目後,根據/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_workerFinishedDir子節點上的data收集每個worker傳送的aggregator資訊,彙總為globalStats。
Master若發現全域性資訊中(1)所有頂點都voteHalt且沒有訊息傳遞,或(2)達到最大迭代次數 時,設定 globalStats.setHaltComputation(true)。告訴works結束迭代。
master建立/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_superstepFinished 節點,data為globalStats。告訴所有workers當前超級步結束。
每個Worker檢測到master建立/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_superstepFinished 節點後,讀出該znode的資料,即全域性的統計資訊。然後決定是否繼續下一次迭代。
10、同步之後開始下一個超級步。
11、master和workers同步過程總結。
(1)master建立znode A,然後檢測A的子節點數目是否等於workers數目,不等於就陷入等待。某個worker建立一個子節點後,就會喚醒master進行檢測一次。
(2)每個worker進行自己的工作,完成後,建立A的子節點A1。然後等待master建立znode B。
(3)若master檢測到A的子節點數目等於workers的數目時,建立Znode B
(4)master建立B 節點後,會啟用各個worker。同步結束,各個worker就可以開始下一個超步。
本質是通過znode B來進行