1. 程式人生 > >Giraph原始碼分析(八)—— 統計每個SuperStep中參與計算的頂點數目

Giraph原始碼分析(八)—— 統計每個SuperStep中參與計算的頂點數目

作者|白松

目的:科研中,需要分析在每次迭代過程中參與計算的頂點數目,來進一步優化系統。比如,在SSSP的compute()方法最後一行,都會把當前頂點voteToHalt,即變為InActive狀態。所以每次迭代完成後,所有頂點都是InActive狀態。在大同步後,收到訊息的頂點會被啟用,變為Active狀態,然後呼叫頂點的compute()方法。本文的目的就是統計每次迭代過程中,參與計算的頂點數目。下面附上SSSP的compute()方法:

@Override
  public void compute(Iterable messages) {
    if (getSuperstep() == 0) {
      setValue(new DoubleWritable(Double.MAX_VALUE));
    }
    double minDist = isSource() ? 0d : Double.MAX_VALUE;
    for (DoubleWritable message : messages) {
      minDist = Math.min(minDist, message.get());
    }
    if (minDist < getValue().get()) {
      setValue(new DoubleWritable(minDist));
      for (Edge edge : getEdges()) {
        double distance = minDist + edge.getValue().get();
        sendMessage(edge.getTargetVertexId(), new DoubleWritable(distance));
      }
    }
	//把頂點置為InActive狀態
    voteToHalt();
  }

附:giraph中演算法的終止條件是:沒有活躍頂點且worker間沒有訊息傳遞。

hama-0.6.0中演算法的終止條件只是:判斷是否有活躍頂點。不是真正的pregel思想,半成品。

修改過程如下:

  1. org.apache.giraph.partition. PartitionStats 類

新增變數和方法,用來統計每個Partition在每個超步中參與計算的頂點數目。新增的變數和方法如下:

/** computed vertices in this partition */
private long computedVertexCount=0;
 
/**
* Increment the computed vertex count by one.
*/
public void incrComputedVertexCount() {
    ++ computedVertexCount;
}
 
/**
 * @return the computedVertexCount
 */
public long getComputedVertexCount() {
	return computedVertexCount;
}

修改readFields()和write()方法,每個方法追加最後一句。當每個Partition計算完成後,會把自己的computedVertexCount傳送給Master,Mater再讀取彙總。

@Override
public void readFields(DataInput input) throws IOException {
    partitionId = input.readInt();
    vertexCount = input.readLong();
    finishedVertexCount = input.readLong();
    edgeCount = input.readLong();
    messagesSentCount = input.readLong();
    //新增下條語句
    computedVertexCount=input.readLong();
}
 
@Override
public void write(DataOutput output) throws IOException {
    output.writeInt(partitionId);
    output.writeLong(vertexCount);
    output.writeLong(finishedVertexCount);
    output.writeLong(edgeCount);
    output.writeLong(messagesSentCount);
    //新增下條語句
    output.writeLong(computedVertexCount);
}
  1. org.apache.giraph.graph. GlobalStats 類

    新增變數和方法,用來統計每個超步中參與計算的頂點總數目,包含每個Worker上的所有Partitions。

 /** computed vertices in this partition 
  *  Add by BaiSong 
  */
  private long computedVertexCount=0;
	 /**
	 * @return the computedVertexCount
	 */
	public long getComputedVertexCount() {
		return computedVertexCount;
	}

修改addPartitionStats(PartitionStats partitionStats)方法,增加統計computedVertexCount功能。

/**
  * Add the stats of a partition to the global stats.
  *
  * @param partitionStats Partition stats to be added.
  */
  public void addPartitionStats(PartitionStats partitionStats) {
    this.vertexCount += partitionStats.getVertexCount();
    this.finishedVertexCount += partitionStats.getFinishedVertexCount();
    this.edgeCount += partitionStats.getEdgeCount();
    //Add by BaiSong,新增下條語句
    this.computedVertexCount+=partitionStats.getComputedVertexCount();
 }

當然為了Debug方便,也可以修改該類的toString()方法(可選),修改後的如下:

public String toString() {
		return "(vtx=" + vertexCount + ", computedVertexCount="
				+ computedVertexCount + ",finVtx=" + finishedVertexCount
				+ ",edges=" + edgeCount + ",msgCount=" + messageCount
				+ ",haltComputation=" + haltComputation + ")";
	}
  1. org.apache.giraph.graph. ComputeCallable<I,V,E,M>

新增統計功能。在computePartition()方法中,新增下面一句。

if (!vertex.isHalted()) {
        context.progress();
        TimerContext computeOneTimerContext = computeOneTimer.time();
        try {
            vertex.compute(messages);
	    //新增下面一句,當頂點呼叫完compute()方法後,就把該Partition的computedVertexCount加1
            partitionStats.incrComputedVertexCount();
        } finally {
           computeOneTimerContext.stop();
        }
……
  1. 新增Counters統計,和我的部落格Giraph原始碼分析(七)—— 新增訊息統計功能 類似,此處不再詳述。新增的類為:org.apache.giraph.counters.GiraphComputedVertex,下面附上該類的原始碼:
package org.apache.giraph.counters;
 
import java.util.Iterator;
import java.util.Map;
 
import org.apache.hadoop.mapreduce.Mapper.Context;
import com.google.common.collect.Maps;
 
/**
 * Hadoop Counters in group "Giraph Messages" for counting every superstep
 * message count.
 */
 
public class GiraphComputedVertex extends HadoopCountersBase {
	/** Counter group name for the giraph Messages */
	public static final String GROUP_NAME = "Giraph Computed Vertex";
 
	/** Singleton instance for everyone to use */
	private static GiraphComputedVertex INSTANCE;
 
	/** superstep time in msec */
	private final Map superstepVertexCount;
 
	private GiraphComputedVertex(Context context) {
		super(context, GROUP_NAME);
		superstepVertexCount = Maps.newHashMap();
	}
 
	/**
	 * Instantiate with Hadoop Context.
	 * 
	 * @param context
	 *            Hadoop Context to use.
	 */
	public static void init(Context context) {
		INSTANCE = new GiraphComputedVertex(context);
	}
 
	/**
	 * Get singleton instance.
	 * 
	 * @return singleton GiraphTimers instance.
	 */
	public static GiraphComputedVertex getInstance() {
		return INSTANCE;
	}
 
	/**
	 * Get counter for superstep messages
	 * 
	 * @param superstep
	 * @return
	 */
	public GiraphHadoopCounter getSuperstepVertexCount(long superstep) {
		GiraphHadoopCounter counter = superstepVertexCount.get(superstep);
		if (counter == null) {
			String counterPrefix = "Superstep: " + superstep+" ";
			counter = getCounter(counterPrefix);
			superstepVertexCount.put(superstep, counter);
		}
		return counter;
	}
 
	@Override
	public Iterator iterator() {
		return superstepVertexCount.values().iterator();
	}
}
  1. 實驗結果,執行程式後。會在終端輸出每次迭代參與計算的頂點總數目。 測試SSSP(SimpleShortestPathsVertex類),輸入圖中共有9個頂點和12條邊。輸出結果如下:

上圖測試中,共有6次迭代。紅色框中,顯示出了每次迭代過沖參與計算的頂點數目,依次是:9,4,4,3,4,0

解釋:在第0個超步,每個頂點都是活躍的,所有共有9個頂點參與計算。在第5個超步,共有0個頂點參與計算,那麼就不會向外傳送訊息,加上每個頂點都是不活躍的,所以演算法迭代終止。

【閱讀更多文章請訪問

相關推薦

Giraph原始碼分析—— 統計每個SuperStep參與計算頂點數目

作者|白松 目的:科研中,需要分析在每次迭代過程中參與計算的頂點數目,來進一步優化系統。比如,在SSSP的compute()方法最

mochiweb原始碼分析

前面說到了mochiweb_request:ok/2這個函式,接下來說下里面的response/3這個函式 Length是讀寫裝置讀寫資料的位元組數長度, Response是呼叫start_response_length/2函式 由註釋可以看出start_response_l

Glide原始碼分析,Glide的自定義模組擴充套件與實踐

Generated API Glide v4使用註解處理器(Annotation Processor)來生成出一個API,在Application模組中可使用該流式API一次性呼叫到RequestBuilder,RequestOptions和整合庫中所有的選項

Giraph原始碼分析—啟動Master/Worker服務

作者 | 白松 注:本文為原創,引用轉載需與數瀾聯絡。 1、org.apache.giraph.bsp.CentralizedSe

Giraph原始碼分析—— Master 如何檢查Worker啟動成功

#### 本文的目的 說明Giraph如何藉助ZooKeeper來實現Master與Workers間的同步(不太確定)。 #

Giraph原始碼分析—— 訊息通訊

由前文知道每個BSPServiceWorker有一個WorkerServer物件,WorkerServer物件裡面又有Server

Giraph 原始碼分析—— 載入資料+同步總結

作者|白松 關於Giraph 共有九個章節,本文第五個章節。 環境:在單機上(機器名:giraphx)啟動了2個workers。

Tomcat原始碼分析 ----- HTTP請求處理過程

終於進行到Connector的分析階段了,這也是Tomcat裡面最複雜的一塊功能了。Connector中文名為聯結器,既然是聯結器,它肯定會連線某些東西,連線些什麼呢? Connector用於接受請求並將請求封裝成Request和Response,然後交給Container進行處理,Containe

mybatis 原始碼分析ResultSetHandler 詳解

本篇部落格就是 myabtis 系列的最後一篇了,還剩 ResultSetHandler 沒有分析;作為整個 mybatis 最複雜最繁瑣的部分,我不打算按步驟一次詳解,因為裡面的主要內容就是圍繞 resultMap 按層次結構依次解析的,其中運用最多的就是反射,所以我這裡將圍繞延遲載入重點分析,另外本文使用

Netty原始碼分析 ----- write過程 原始碼分析

上一篇文章主要講了netty的read過程,本文主要分析一下write和writeAndFlush。 主要內容 本文分以下幾個部分闡述一個java物件最後是如何轉變成位元組流,寫到socket緩衝區中去的 pipeline中的標準連結串列結構 java物件編碼過程 write:寫佇列 flus

Spring 源碼分析--容器的功能擴展

use abs 提取 ext troy sha 根據 idc owb 經過前面幾篇的分析,相信大家對Spring中容器功能有了簡單的了解,在前面的章節中我們一直以BeanFactory接口以及它的默認實現類XmlBeanFactory為例進行分析。但是,Spring

Spring源碼分析AbstractBeanDefinition屬性

strac code 出現 shm main candidate 靜態變量 col aof 摘要:本文結合《Spring源碼深度解析》來分析Spring 5.0.6版本的源代碼。若有描述錯誤之處,歡迎指正。 在上一篇中已經完成了XML文檔到GenericBeanDe

Android ADB 原始碼分析

前言 之前分析的兩篇文章 Android Adb 原始碼分析(一) 嵌入式Linux:Android root破解原理(二)   寫完之後,都沒有寫到相關的實現程式碼,這篇文章寫下ADB的通訊流程的一些細節 看這篇文章之前,請先閱讀 Linux的SOCKET

Mybatis 原始碼分析2—— 引數處理

Mybatis對引數的處理是值得推敲的,不然在使用的過程中對發生的一系列錯誤直接懵逼了。 以前遇到引數繫結相關的錯誤我就是直接給加@param註解,也稀裡糊塗地解決了,但是後來遇到了一些問題推翻了我的假設:單個引數不需要使用 @param 。由此產生了一個疑問,Mybatis到底是怎

Mybatis 原始碼分析9—— 事物管理

Mybatis 提供了事物的頂層介面: public interface Transaction { /** * Retrieve inner database connection * @return DataBase connection * @throw

Mybatis 原始碼分析8—— 一二級快取

一級快取 其實關於 Mybatis 的一級快取是比較抽象的,並沒有什麼特別的配置,都是在程式碼中體現出來的。 當呼叫 Configuration 的 newExecutor 方法來建立 executor: public Executor newExecutor(Transac

Mybatis原始碼分析7—— 結果集處理

解析封裝 ResultMap 是和結果集相關的東西,最初在解析 XML 的時候,於 parseStatementNode 方法中,針對每一個 select 節點進行解析,轉換為 MappedStatement(類似 Spring 的 bean 配置和 BeanDefinition 的

Mybatis原始碼分析6—— 從JDBC看Mybatis的設計

Java資料庫連線,(Java Database Connectivity,簡稱JDBC)是Java語言中用來規範客戶端程式如何來訪問資料庫的應用程式介面,提供了諸如查詢和更新資料庫中資料的方法。 六步流程: 載入驅動(5.x驅動包不需要這步了) 建立

Mybatis原始碼分析5—— 外掛的原理

MyBatis 允許你在已對映語句執行過程中的某一點進行攔截呼叫。 預設情況下,可以使用外掛來攔截的方法呼叫包括: Executor (update, query, flushStatements, commit, rollback, getTransaction, cl

Mybatis原始碼分析4—— Mapper的建立和獲取

Mybatis我們一般都是和Spring一起使用的,它們是怎麼融合到一起的,又各自發揮了什麼作用? 就拿這個Mapper來說,我們定義了一個介面,聲明瞭一個方法,然後對應的xml寫了這個sql語句, 它怎麼就執行成功了?這傢伙是怎麼實現的,帶著這個好奇心,我一步步跟蹤,慢慢揭開了它的