1. 程式人生 > >Giraph源代碼分析(九)—— Aggregators 原理解析

Giraph源代碼分析(九)—— Aggregators 原理解析

需要 which 詳細 當前 推斷 part waiting ted class

HamaWhite 原創。轉載請註明出處!歡迎大家增加Giraph 技術交流群228591158

Giraph中Aggregator的基本使用方法請參考官方文檔:http://giraph.apache.org/aggregators.html 。本文重點在解析Giraph怎樣實現Aggregators後文用圖示的方法描寫敘述了Aggregator的運行過程。

基本原理:在每一個超級步中,每一個Worker計算本地的聚集值。

超級步計算完畢後,把本地的聚集值發送給Master匯總。在MasterCompute()運行後,把全局的聚集值回發給全部的Workers。

缺點:當某個應用(或算法)使用了多個聚集器(Aggregators),Master要完畢全部聚集器的計算。由於Master要接受、處理、發送大量的數據,不管是在計算方面還是網絡通信層次,都會導致Master成為系統瓶頸。

改進:採用分片聚集 (sharded aggregators) . 在每一個超級步的最後。每一個聚集器被派發給一個Worker。該Worker接受和聚集其它Workers發送給該聚集器的值。

然後Workers把自己的全部的聚集器發送給Master。這樣Master就無需運行不論什麽聚集,僅僅是接收每一個聚集器的終於值。在MasterCompute.compute運行後,Master不是直接把全部的聚集器發送給全部的Workers,而是發送給聚集器所屬的Worker。然後每一個Worker再把其上的聚集器發送給全部的Workers.

首先給出Master <-- > Worker間, Worker <--> Worker間通信協議,在每一個類中的doRequest(ServerData serverData)方法中會解析並存儲收到的消息。
1). org.apache.giraph.comm.requests.SendWorkerAggregatorsRequest 類 . Worker --> Worker Owner
功能:每一個worker把當前超步的局部 aggregated values 發送到該Aggregator的擁有者。
2). org.apache.giraph.comm.requests.SendAggregatorsToMasterRequest 類. Worker Owner--> Master
功能:每一個Worker把自己所擁有的Aggregator的終於 aggregated values 發送給 master。
3). org.apache.giraph.comm.requests.SendAggregatorsToOwnerRequest 類. Master --> Worker Owner.
功能:master把終於的 aggregated values 或aggregators 發送給該Aggregator的擁有者。
4). org.apache.giraph.comm.requests.SendAggregatorsToWorkerRequest 類。 Worker Owner--> Worker
功能: 發送終於的 aggregated values 到 其它workers。發送者為該Aggregator的擁有者。接受者為除發送者之外的全部workers。

技術分享

技術分享

Aggregator分類和 註冊

Giraph中把Aggregator分為兩類:regular aggregators和persistent aggregators。

regular aggregators的值在每一個超級步開始會被重置為初始值,然而persistent aggregators的值在整個應用(算法)中一直保持。

舉例來說。若LongSumAggregator在每一個頂點的compute()方法中加1。假設使用regular aggregators,在每一個超級步中就能夠讀取前一個超級步的參與計算的頂點總數;假設使用persistent aggregators,就能夠獲取前面全部超級步中參與計算的頂點總和。

在使用aggregator之前,必需要在mastes上Registering aggregators。做法:繼承org.apache.giraph.master.DefaultMasterCompute類,重寫 void initalize() 方法。

在該方法中註冊aggregators。語法例如以下:

registerAggregator(aggregatorName, aggregatorClass)
registerPersistentAggregator(aggregatorName, aggregatorClass)

說明:MasterCompute.initalize()方法僅僅在第 INPUT_SUPERSTEP (-1) 超級步中運行一次。詳細在 BSPServiceMaster.runMasterCompute(long superstep)方法中。在MasterCompute.compute()方法中,能夠使用下述方法讀取或改動聚集器的值。

getAggregatedValue(aggregatorName) //獲取前一個超級步的聚集器值
setAggregatedValue(aggregatorName, aggregatedValue) //改動聚集器的值

MasterCompute.compute()總是在Vertex.compute()前運行。 因為第 INPUT_SUPERSTEP ( -1)個超級步進行的是數據的載入和重分布過程,不計算Vertex.compute()。第0個超級步Vertex.compute()又是在MasterCompute.compute()方法後運行。故對第 -1 、 0個超級步MasterCompute.compute()方法中獲得的聚集器值均為其初始值。從第1個超級步開始。MasterCompute.compute()方法才獲得了全部Vertex.compute()在第0個超級步聚集的值。

1. 從第0個超級步開始。BspServiceMaster調用MasterAggregatorHandler類的finishSuperStep(MasterClient masterClient) 方法把聚集器派發給Worker。聚集器的value為上一個超級步的全局聚集值(final aggregated values)。第一次為初始值。先給出MasterAggregatorHandler的類繼承關系。例如以下:

技術分享

finishSuperStep(MasterClient masterClient) 方法核心內容例如以下:

  /**
   * Finalize aggregators for current superstep and share them with workers
   */
  public void finishSuperstep(MasterClient masterClient) {
    for (AggregatorWrapper<Writable> aggregator : aggregatorMap.values()) {
      if (aggregator.isChanged()) {
        // if master compute changed the value, use the one he chose
        aggregator.setPreviousAggregatedValue(
            aggregator.getCurrentAggregatedValue());
        // reset aggregator for the next superstep
        aggregator.resetCurrentAggregator();
      }
    }
    
    /**
     * 把聚集器發送給所屬的Worker。發送內容:
     * 1). Name of the aggregator
     * 2). Class of the aggregator
     * 3). Value of the aggretator
     */
    try {
      for (Map.Entry<String, AggregatorWrapper<Writable>> entry :
          aggregatorMap.entrySet()) {
        masterClient.sendAggregator(entry.getKey(),
            entry.getValue().getAggregatorClass(),
            entry.getValue().getPreviousAggregatedValue());
      }
      masterClient.finishSendingAggregatedValues();
    } catch (IOException e) {
      throw new IllegalStateException("finishSuperstep: " +
          "IOException occurred while sending aggregators", e);
    }
  }
問題1:怎樣確定aggregator的Worker Owner ?
答:依據aggregator的Name來確定它所屬的Worker。計算方法例如以下:
/**
 * 依據aggregatorName和全部的workers列表來計算aggregator所屬的Worker
 * 參數aggregatorName:Name of the aggregator
 * 參數workers: Workers的list列表
 * 返回值:Worker which owns the aggregator
 */
public static WorkerInfo getOwner(String aggregatorName,List<WorkerInfo> workers) {
    //用aggregatorName的HashCode()值模以 Workers的總數目
    int index = Math.abs(aggregatorName.hashCode() % workers.size());
    return workers.get(index);  //返回aggregator所屬的Worker
}
問題2:Worker 怎樣推斷自身是否接收完自己所擁有的aggregators?
答:Master給某個Worker發送aggregators時。同一時候發送到該Worker的aggregators數目。使用的 SendAggregatorsToOwnerRequest類對消息進行封裝和解析。

2. Worker接受Master發送的Aggregator,Worker把接收到的聚集體值發送給其它全部Workers,然後每一個Workers就會得到上一個超級步的全局聚集值。
由前文知道,每一個Worker都有一個ServerData對象,ServerData類中關於Aggregator的兩個成員變量例如以下:

// 保存Worker在當前超步擁有的aggregators
private final OwnerAggregatorServerData ownerAggregator;
// 保存前一個超步的aggregators
private final AllAggregatorServerData allAggregatorData;
能夠看到,ownerAggregatorData用來存儲在當前超步Master發送給Worker的聚集器,allAggregatorData用來保存上一個超級步全局的聚集值。ownerAggregatorData和allAggregatorData值的初始化在SendAggregatorsToOwnerRequest 類中的doRequest(ServerData serverData)方法中,例如以下:

public void doRequest(ServerData serverData) {
    DataInput input = getDataInput();
    AllAggregatorServerData aggregatorData = serverData.getAllAggregatorData();
    try {
      //收到的Aggregators數目。在CountingOutputStream類中有計數器counter,
      //每向輸出流中加入一個聚集器對象,計數加1. 發送時,在flush方法中把該值插入到輸出流最前面。

int numAggregators = input.readInt(); for (int i = 0; i < numAggregators; i++) { String aggregatorName = input.readUTF(); String aggregatorClassName = input.readUTF(); if (aggregatorName.equals(AggregatorUtils.SPECIAL_COUNT_AGGREGATOR)) { LongWritable count = new LongWritable(0); //Master發送給該Worker的requests總數目. count.readFields(input); aggregatorData.receivedRequestCountFromMaster(count.get(), getSenderTaskId()); } else { Class<Aggregator<Writable>> aggregatorClass = AggregatorUtils.getAggregatorClass(aggregatorClassName); aggregatorData.registerAggregatorClass(aggregatorName, aggregatorClass); Writable aggregatorValue = aggregatorData.createAggregatorInitialValue(aggregatorName); aggregatorValue.readFields(input); //把收到的上一次全局聚集的值賦值給allAggregatorData aggregatorData.setAggregatorValue(aggregatorName, aggregatorValue); //ownerAggregatorData僅僅接受聚集器 serverData.getOwnerAggregatorData().registerAggregator( aggregatorName, aggregatorClass); } } } catch (IOException e) { throw new IllegalStateException("doRequest: " + "IOException occurred while processing request", e); } //接受一個 request,計數減1。同一時候把收到的Data加入到allAggregatorServerData的List<byte[]> masterData中 aggregatorData.receivedRequestFromMaster(getData()); }

每一個Worker在開始計算前。會調用BspServiceWorker類的prepareSuperStep()方法來進行聚集器值的派發和接受其它Workers發送的聚集器值。調用關系例如以下:

技術分享

BspServiceWorker類的prepareSuperStep()方法例如以下:

@Override
public void prepareSuperstep() {
   if (getSuperstep() != INPUT_SUPERSTEP) {
     /*
      * aggregatorHandler為WorkerAggregatorHandler類型,
      * 可參考上文中MasterAggregatorHandler的類繼承關系.
      * workerAggregatorRequestProcessor聲明為WorkerAggregatorRequestProcessor(接口)
      * 類型,實際為NettyWorkerAggregatorRequestProcessor的實例。
      * 用於Worker間發送聚集器的值。

*/ aggregatorHandler.prepareSuperstep(workerAggregatorRequestProcessor); } }


WorkerAggregatorHandler類的prepareSuperstep( WorkerAggregatorRequestProcessor requestProcessor)方法例如以下:

public void prepareSuperstep(WorkerAggregatorRequestProcessor requestProcessor) {
    AllAggregatorServerData allAggregatorData =
        serviceWorker.getServerData().getAllAggregatorData();
    /**
     * 等待直到Master發送給該Worker的聚集器都已接受完,
     * 返回值為Master發送給該Worker的全部Data(聚集器)
     */
    Iterable<byte[]> dataToDistribute =
        allAggregatorData.getDataFromMasterWhenReady(
            serviceWorker.getMasterInfo());
  
    // 把從Master收到的Data(聚集器)發送給其它全部Workers
    requestProcessor.distributeAggregators(dataToDistribute);

    // 等待直到接受完其它Workers發送給該Workers的聚集器
    allAggregatorData.fillNextSuperstepMapsWhenReady(
        getOtherWorkerIdsSet(), previousAggregatedValueMap,
        currentAggregatorMap);
    // 僅僅是清空allAggregatorServerData的List<byte[]> masterData對象
    // 為下一個超級步接受Master發送的聚集器做準備
    allAggregatorData.reset();
}
以下詳述Worker怎樣判定已接收全然部Master發送的全部Request ? 主要目的在於描寫敘述分布式環境下線程間怎樣協作。

在AllAggregatorServerData類中定義了TaskIdsPermitBarrier類型的變量masterBarrier,用來推斷是否接收完Master發送的Request. TaskIdsPermitBarrier類中主要使用wait()、notifyAll()等方法來控制。當獲得的aggregatorName等於AggregatorUtils.SPECIAL_COUNT_AGGREGATOR時,會調用requirePermits(long permits,int taskId)來添加接收的arrivedTaskIds和須要等待的request數目waitingOnPermits. 接受一個Request

  /**
   * Require more permits. This will increase the number of times permits
   * were required. Doesn't wait for permits to become available.
   *
   * @param permits Number of permits to require
   * @param taskId Task id which required permits
   */
  public synchronized void requirePermits(long permits, int taskId) {
    arrivedTaskIds.add(taskId);
    waitingOnPermits += permits;
    notifyAll();
  }

技術分享 接受一個Request後,會調用releaseOnePermit()方法把waitingOnPermits減1。 技術分享

3. 在Vertex.compute()方法中。每一個Worker聚集自身的值。

計算完畢後。調用WorkerAggregatorHandler類的finishSuperstep( WorkerAggregatorRequestProcessor requestProcessor)方法,把本地的聚集器的值給句聚集器的aggregatorName發送給該aggregator所屬的Worker. Aggregator的屬主Worker接受其它全部Workers發送的本地聚集值進行匯總,匯總完畢後發送給Master,供下一次超級步的MasterCompute.compute()方法使用。

finishSuperstep方法例如以下:

 /**
   * Send aggregators to their owners and in the end to the master
   *
   * @param requestProcessor Request processor for aggregators
   */
  public void finishSuperstep(
      WorkerAggregatorRequestProcessor requestProcessor) {
    OwnerAggregatorServerData ownerAggregatorData =
        serviceWorker.getServerData().getOwnerAggregatorData();
    // First send partial aggregated values to their owners and determine
    // which aggregators belong to this worker
    for (Map.Entry<String, Aggregator<Writable>> entry :
        currentAggregatorMap.entrySet()) {
        boolean sent = requestProcessor.sendAggregatedValue(entry.getKey(),
            entry.getValue().getAggregatedValue());
        if (!sent) {
          // If it's my aggregator, add it directly
          ownerAggregatorData.aggregate(entry.getKey(),
              entry.getValue().getAggregatedValue());
        }
    }
    // Flush
    requestProcessor.flush();
    // Wait to receive partial aggregated values from all other workers
    Iterable<Map.Entry<String, Writable>> myAggregators =
        ownerAggregatorData.getMyAggregatorValuesWhenReady(
            getOtherWorkerIdsSet());

    // Send final aggregated values to master
    AggregatedValueOutputStream aggregatorOutput =
        new AggregatedValueOutputStream();
    for (Map.Entry<String, Writable> entry : myAggregators) {
        int currentSize = aggregatorOutput.addAggregator(entry.getKey(),
            entry.getValue());
        if (currentSize > maxBytesPerAggregatorRequest) {
          requestProcessor.sendAggregatedValuesToMaster(
              aggregatorOutput.flush());
        }   
    }
    requestProcessor.sendAggregatedValuesToMaster(aggregatorOutput.flush());
    // Wait for master to receive aggregated values before proceeding
    serviceWorker.getWorkerClient().waitAllRequests();
    ownerAggregatorData.reset();
  }
調用關系例如以下:

技術分享

4. 大同步後,Master調用MasterAggregatorHandler類的prepareSusperStep(masterClient)方法。收集聚集器的值。方法內容例如以下:

  public void prepareSuperstep(MasterClient masterClient) {

    // 收集上次超級步的聚集值,為master compute 做準備
    for (AggregatorWrapper<Writable> aggregator : aggregatorMap.values()) {
	// 假設是 Persistent Aggregator,則累加
	if (aggregator.isPersistent()) {
        aggregator.aggregateCurrent(aggregator.getPreviousAggregatedValue());
      }
      aggregator.setPreviousAggregatedValue(
          aggregator.getCurrentAggregatedValue());
      aggregator.resetCurrentAggregator();
      progressable.progress();
    }
  }
然後調用MasterCompute.compute()方法(可能會改動聚集器的值),在該方法內若依據聚集器的值調用了MasterCompute類的haltCompute()方法來終止MaterCompute,則表明要結束整個Job。那麽Master就會通知全部Workers要結束整個作業;在該方法內若沒有調用MasterCompute類的haltCompute()方法。則回到步驟1繼續進行叠代。

說明:Job叠代結束條件有三,滿足其一即可:
1) 達到最大叠代次數
2) 沒有活躍頂點且沒有消息在傳遞
3) 終止MasterCompute計算

總結:為解決在多個Aggregator條件下,Master成為系統瓶頸的問題。採取了把全部Aggregator派發給某一部分Workers。由這些Workers完畢全局的聚集值的計算與發送,Master僅僅須要與這些Workers進行簡單數據通信就可以,大大減少了Master的工作量。

附加:以下用圖示方法說明上述運行過程。

實驗條件:
1). 一個Master,四個Worker
2). 兩個Aggregators,記為A1和A2。

1. Master把Aggregators發送給Workers,收到Aggregator的Worker就作為該Aggregator的Owner。

下圖中Master把A1發送給Worker1,A2發送給Worker3.那麽Worker1就作為A1的Owner,Worker3就是A2的Owner。該步驟在MasterAggregatorHandler類的finishSuperStep(MasterClient masterClient) 方法中完畢,使用的是SendAggregatorsToOwnerRequest 通信協議。註:每一個Owner Worker 可能有多個聚集器。

技術分享

圖1 Master分發Aggregator

2. Workers接受Master發送的Aggregator,然後把Aggregator發送給其它Workers。

Worker1要把A1分別發送給Worker2、Worker3和Worker4;Worker3要把A2分別發送給Worker1、Worker2和Worker4。該步驟在WorkerAggregatorHandler類的prepareSuperstep( WorkerAggregatorRequestProcessor requestProcessor)方法中完畢,使用的是SendAggregatorsToMasterRequest 通信協議。此步驟完畢後,每一個Worker上都有了聚集器A1和A2(詳細為上一個超步的全局終於聚集值)。

技術分享

3. 每一個Worker調用Vertex.compute()方法開始計算,收集本地的Aggregator聚集值。對聚集體A1來說,Worker1Worker2Worker3Worker4的本地聚集值依次記為:A11 A12 A13A14。對聚集器A2來說,Worker1Worker2Worker3Worker4的本地聚集值依次記為:A21 A22 A23A24。計算完畢後,每一個Worker就要把本地的聚集值發送給聚集器的Owner,聚集器的Owner在接收的時候會合並聚集。

那麽A11 A12 A13A14要發送給Worker1進行全局聚集得到A1’A21 A22 A23A24要發送給Worker3進行全局聚集得到A2’。計算公式例如以下:

技術分享

此部分採用的是SendWorkerAggregatorsRequest通信協議。Worker1和Worker3要把匯總的A1和A2的新值:A1’ 和A2’發送給Master,供下一次超級步的MasterCompute.compute()方法使用採用的是SendAggregatorsToMasterRequest通信協議。

此部分在WorkerAggregatorHandler類的finishSuperstep( WorkerAggregatorRequestProcessor requestProcessor)方法中完畢。步驟例如以下圖所看到的:

技術分享

4. Master收到Worker1發送的A1’ 和Woker3發送的A2’後,此步驟在MasterAggregatorHandler類的prepareSusperStep(masterClient)方法中完畢。然後調用MasterCompute.compute()方法,此方法可能會改動聚集器的值,如得到A1’’和A2’’。在masterCompute.compute()方法內若依據聚集器的值調用了MasterCompute類的haltCompute()方法來終止MaterCompute,則表明要結束整個Job。那麽Master就會通知全部Workers要結束整個作業;在該方法內若沒有調用MasterCompute類的haltCompute()方法。則回到步驟1繼續進行叠代,繼續把A1’’發送給Worker1。A2’’發送給Worker3。

技術分享

完。

本人原創,轉載請註明出處!

歡迎大家增加Giraph 技術交流群228591158

Giraph源代碼分析(九)—— Aggregators 原理解析