1. 程式人生 > >Giraph 原始碼分析(五)—— 載入資料+同步總結

Giraph 原始碼分析(五)—— 載入資料+同步總結

作者|白松

關於Giraph 共有九個章節,本文第五個章節。

環境:在單機上(機器名:giraphx)啟動了2個workers。

輸入:SSSP資料夾,裡面有1.txt和2.txt兩個檔案。

1、在Worker向Master彙報健康狀況後,就開始等待Master建立InputSplit。

方法:每個Worker通過檢某個Znode節點是否存在,同時在此Znode上設定Watcher。若不存在,就通過BSPEvent的waitForever()方法釋放當前執行緒的鎖,陷入等待狀態。一直等到master建立該znode。此步驟位於BSPServiceWorker類中的startSuperStep方法中,等待程式碼如下:

2、Master呼叫createInputSplits()方法建立InputSplit。

在generateInputSplits()方法中,根據使用者設定的VertexInputFormat獲得InputSplits。程式碼如下:

其中minSplitCountHint為建立split的最小數目,其值如下:

minSplitCountHint = Workers數目 * NUM_INPUT_THREADS

NUM_INPUT_THREADS表示 每個Input split loading的執行緒數目,預設值為1 。 經查證,在TextVertexValueInputFormat抽象類中的getSplits()方法中的minSplitCountHint引數被忽略。使用者輸入的VertexInputFormat繼承TextVertexValueInputFormat抽象類。

如果得到的splits.size小於minSplitCountHint,那麼有些worker就沒被用上。

得到split資訊後,要把這些資訊寫到Zookeeper上,以便其他workers訪問。上面得到的split資訊如下:

[hdfs://giraphx:9000/user/root/SSSP/1.txt:0+66, hdfs://giraphx:9000/user/root/SSSP/2.txt:0+46]

遍歷splits List,為每個split建立一個Znode,值為split的資訊。如為split-0建立Znode,值為:hdfs://giraphx:9000/user/root/SSSP/1.txt:0+66

/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDir/0

為split-1建立znode(如下),值為:hdfs://giraphx:9000/user/root/SSSP/2.txt:0+46

/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDir/1

最後建立znode: /_hadoopBsp/job_201404102333_0013/_vertexInputSplitsAllReady 表示所有splits都建立好了。

3、Master根據splits建立Partitions。首先確定partition的數目。

BSPServiceMaster中的MasterGraphPartitioner<I.V,E,M>物件預設為HashMasterPartitioner。它的createInitialPartitionOwners()方法如下:

上面程式碼中是在工具類PartitionUtils計算Partition的數目,計算公式如下:

partitionCount=PARTITION_COUNT_MULTIPLIER * availableWorkerInfos.size() * availableWorkerInfos.size() ,其中PARTITION_COUNT_MULTIPLIER表示Multiplier for the current workers squared,預設值為1 。

可見,partitionCount值為4(122)。建立的partitionOwnerList資訊如下:

[(id=0,cur=Worker(hostname=giraphx, MRtaskID=1, port=30001),prev=null,ckpt_file=null),

(id=1,cur=Worker(hostname=giraphx, MRtaskID=2, port=30002),prev=null,ckpt_file=null),

(id=2,cur=Worker(hostname=giraphx, MRtaskID=1, port=30001),prev=null,ckpt_file=null),

(id=3,cur=Worker(hostname=giraphx, MRtaskID=2, port=30002),prev=null,ckpt_file=null)]

4、Master建立Znode:/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_partitionExchangeDir,用於後面的exchange partition。

5、Master最後在assignPartitionOwners()方法中

把masterinfo,chosenWorkerInfoList,partitionOwners等資訊寫入Znode中(作為Znode的data),該Znode的路徑為: /_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_addressesAndPartitions 。

Master呼叫barrierOnWorkerList()方法開始等待各個Worker完成資料載入。呼叫關係如下:

barrierOnWorkerList中建立znode,path=/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDoneDir 。然後檢查該znode的子節點數目是否等於workers的數目,若不等於,則執行緒陷入等待狀態。後面某個worker完成資料載入後,會建立子node(如 /_hadoopBsp/job_201404102333_0013/_vertexInputSplitDoneDir/giraphx_1)來啟用該執行緒繼續判斷。

6、當Master建立第5步的znode後,會啟用worker。

每個worker從znode上讀出data,data包含masterInfo,WorkerInfoList和partitionOwnerList,然後各個worker開始載入資料。

把partitionOwnerList複製給BSPServiceWorker類中的workerGraphPartitioner(預設為HashWorkerPartitioner型別)物件的partitionOwnerList變數,後續每個頂點把根據vertexID通過workerGraphPartitioner物件獲取其對應的partitionOwner。

每個Worker從znode: /_hadoopBsp/job_201404102333_0013/_vertexInputSplitDir獲取子節點,得到inputSplitPathList,內容如下:

[/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDir/1,

/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDir/0]

然後每個Worker建立N個InputsCallable執行緒讀取資料。N=Min(NUM_INPUT_THREADS,maxInputSplitThread),其中NUM_INPUT_THREADS預設值為1,maxInputSplitThread=(InputSplitSize-1/maxWorkers +1

那麼,預設每個worker就是建立一個執行緒來載入資料。

在InputSplitsHandler類中的reserveInputSplit()方法中,每個worker都是遍歷inputSplitPathList,通過建立znode來保留(標識要處理)的split。程式碼及註釋如下:

當用reserveInputSplit()方法獲取某個znode後,loadSplitsCallable類的loadInputSplit方法就開始通過該znode獲取其HDFS的路徑資訊,然後讀入資料、重分佈資料。

VertexInputSplitsCallable類的readInputSplit()方法如下:

7、每個worker載入完資料後,呼叫waitForOtherWorkers()方法等待其他workers都處理完split。

策略如下,每個worker在/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDoneDir目錄下建立子節點,後面追加自己的worker資訊,如worker1、worker2建立的子節點分別如下:

/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDoneDir/giraphx_1

/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDoneDir/giraphx_2

建立完後,然後等待master建立/_hadoopBsp/job_201404102333_0013/_vertexInputSplitsAllDone。

8、從第5步驟可知,若master發現/_hadoopBsp/job_201404102333_0013/_vertexInputSplitDoneDir下的子節點數目等於workers的總數目,就會在coordinateInputSplits()方法中建立

_hadoopBsp/job_201404102333_0013/_vertexInputSplitsAllDone,告訴每個worker,所有的worker都處理完了split。

9、最後就是就行全域性同步。

master建立znode,path=/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_workerFinishedDir ,然後再呼叫barrierOnWorkerList方法檢查該znode的子節點數目是否等於workers的數目,若不等於,則執行緒陷入等待狀態。等待worker建立子節點來啟用該執行緒繼續判斷。

每個worker獲取自身的Partition Stats,進入finishSuperStep方法中,等待所有的Request都被處理完;把自身的Aggregator資訊傳送給master;建立子節點,如/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_workerFinishedDir/giraphx_1,data為該worker的partitionStatsList和workerSentMessages統計量;

最後呼叫waitForOtherWorkers()方法等待master建立/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_superstepFinished 節點。

master發現/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_workerFinishedDir的子節點數目等於workers數目後,根據/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_workerFinishedDir子節點上的data收集每個worker傳送的aggregator資訊,彙總為globalStats。

Master若發現全域性資訊中(1)所有頂點都voteHalt且沒有訊息傳遞,或(2)達到最大迭代次數 時,設定 globalStats.setHaltComputation(true)。告訴works結束迭代。

master建立/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_superstepFinished 節點,data為globalStats。告訴所有workers當前超級步結束。

每個Worker檢測到master建立/_hadoopBsp/job_201404102333_0013/_applicationAttemptsDir/0/_superstepDir/-1/_superstepFinished 節點後,讀出該znode的資料,即全域性的統計資訊。然後決定是否繼續下一次迭代。

10、同步之後開始下一個超級步。

11、master和workers同步過程總結。

(1)master建立znode A,然後檢測A的子節點數目是否等於workers數目,不等於就陷入等待。某個worker建立一個子節點後,就會喚醒master進行檢測一次。

(2)每個worker進行自己的工作,完成後,建立A的子節點A1。然後等待master建立znode B。

(3)若master檢測到A的子節點數目等於workers的數目時,建立Znode B

(4)master建立B 節點後,會啟用各個worker。同步結束,各個worker就可以開始下一個超步。

本質是通過znode B來進行