1. 程式人生 > >graphx中Pregel函式詳解

graphx中Pregel函式詳解

1、PregelAPI

圖本質上是一種遞迴的資料結構,其頂點的屬性值依賴於其鄰接頂點,而其鄰接頂點屬性又依賴於其鄰接頂點,許多重要的圖演算法通過迭代計算每個頂點的屬性直到到達定點條件,這些迭代的圖演算法被抽象成一系列圖並行操作。

2、Pregel的計算模型

主要分為三個函式:
1、vertexProgram函式

2、sendMessage函式

3、messageCombiner函式

在進行了解之前,先對相關知識進行粗略的瞭解:

知識點1:在第一次迭代的時候,所有的頂點都會接收到initialMsg訊息,在次輪迭代的時候,如果頂點沒有接收到訊息,verteProgram就不會被呼叫。

知識點2:對相關引數的瞭解(詳細看黑體部分)

VD:頂點的資料型別。

ED:邊的資料型別

A:Pregel message的型別。

graph:輸入的圖

initialMsg:在第一次迭代的時候頂點收到的訊息。

maxIterations:迭代的次數

vprog:使用者定義的頂點程式執行在每一個頂點中,負責接收進來的資訊,和計算新的頂點值。在第一次迭代的時候,所有的頂點程式將會被預設的defaultMessage呼叫,在次輪迭代中,頂點程式只有接收到message才會被呼叫。

sendMsg:使用者提供的函式,應用於邊緣頂點在當前迭代中接收message

mergeMsg:

使用者提供定義的函式,將兩個型別為A的message合併為一個型別為A的message。(thisfunction must be commutative and associative and ideally the size of A shouldnot increase)

其中用到的Graph類的API

mapReduceTriplets():計算每個節點的相鄰的邊緣和頂點的值,使用者定義的mapFunc函式會在圖的每一條邊呼叫,產生0或者多個message傳送到這條邊兩個頂點其中一個當中,reduceFunc函式用來合併map階段的輸出到每個節點。

3、例項

以下通過spark1.0.1上的最短路徑來舉個例子.

原始碼的路徑為graphx包的lib資料夾內。

以下為計算最短路徑的基本的圖資訊(圖為有向圖)。



主要函式:

SPMap:定義一個Map[VertexId,Int]型別的Map函式,別名為SPMap,函式的屬性Key為VertexId型別,其實也就是Scala中的Long型別,它在圖中的別名是VertexId,還有Int型別的路徑的長度。

makeMap函式:用來初始化圖的屬性資訊。

incrementMap函式:主要用於將自身的屬性值(即源頂點屬性值)中路徑的長度加1,然後和目標定點的屬性值比較,下面會詳細描述。

addMaps函式:比較源頂點屬性和傳送資訊過來頂點的屬性取最小值。

下面是ShortestPaths.scala的run函式:


run函式傳入的引數為:已經構造好的圖graph和landmarks

landmarks:是我們要求最短路徑的頂點的集合。

初始化節點屬性:

程式的第一步初始化圖的節點的屬性,為了方便舉例,假設我給定的landmarks的集合為{1}。

注意:mapVertices函式是圖的基本常用函式之一,它作用於graph中的每一個頂點。

在Graph類中,它是這麼定義的:


程式的API上的定義為:通過map函式轉換圖中每個節點的屬性值,VD2代表的是新的資料型別。(嗯,我英文不是很好,就大概這個意思哈)。

根據程式的意思,初始化圖,將landmarks中的頂點初始化為Map(1-> 0),即自身到自身的距離為0,其餘的頂點屬性初始化為Map()。

接下來定義一個initMessage它的值為Map(),作用是在Pregel第一次執行的時候,所有圖中的頂點都會接收到initMessage。

在接下來定義了一個vertexProgramProgram函式和sendMessage函式。

在這裡,vertexProgram函式呼叫了addMaps函式,通過程式顯而易見的是:兩個訊息來的時候,取它們當中路徑的最小值。其實在下面也就是相當於messageCombiner函式。

SendMessage函式的原理是:

1、通過incrementMap函式把源頂點的距離屬性加1得到新的屬性值newAttr。

2、新的屬性newAttr和原來的屬性比較取最小值,如果新的屬性是最小的,則通過Iterator傳送該資訊到目標頂點的函式。該資訊的結構為(dstId,newAttr),否則不傳送。

最後把以上資訊傳遞給Pregel去執行。

執行的流程簡圖如下:

1、初始化圖的屬性值


2、呼叫sendMessage函式:

呼叫sendMessage函式,包含出度的頂點才能傳送訊息。

首先第一步,假設從頂點1開始:

步驟和上面說的一樣,頂點1的距離屬性值加1即從(1,0)變為(1,1),和頂點2的屬性值比較(具體看程式碼吧),得出頂點1的屬性值最小,滿足傳送的條件,傳送訊息Iterator(2,(1,1))。

頂點4、5類似,頂點2、4、5由於他們之間的屬性值為Map(),所以不滿足傳送條件,頂點3沒有出度,所以不傳送訊息。

點1->2:(2,(1,1))

點1->4:(4,(1,1))

點2->3:empty

….

 

3、呼叫vertexProgram函式:

在看API文件或者程式碼可以知道,vertexProgram在第一次在初始化的時候,會在所有頂點上執行,之後,只有接收到訊息的頂點才會執行vertexProgram,所以接下來容易可以知道,只有頂點2,4,5執行程式,並改變他們自身的屬性值。


4、然後類似的重複步驟2、3直到圖中的message為0,或者 滿足我們給定的迭代次數,

嗯,怎麼知道會有迭代次數呢?往下看

接下來就簡略的看看Pregel的程式碼,看看他是怎麼執行的,由於我還在初步的學習階段,僅供參考,如果有好的理解可以交流交流。

從傳入的引數可以知道,我們可以通過maxIterations指定迭代次數,mergeMsg函式也就是剛才所說的addMaps函式。

 

接下來就是主要的實現邏輯:


看了第一句,通過呼叫graph的mapVertices函式就初始化所有圖的屬性資訊,然後呼叫mapReduceTriplets函式,它返回一個VertexRDD[A]型別的RDD,mapReduceTriplets也是常用的函式之一。

注意:由於mapReduceTriplets裡面的程式碼個人覺得過於複雜,看了很久沒看懂,如果有興趣希望可以交流一下。

接下來就是我個人的假設階段了:

通過註釋可以看出

message應該就是接收到訊息的頂點,那activeMessages就是頂點的數量了。

接下來通過迭代

通過圖的所有頂點和接收到訊息的頂點進行內連線,然後執行頂點的vertexProgram函式,即剛才我們所說的只有接受到訊息的頂點才會執行vertexProgram函式。得到新的newVerts集合。

圖和newVerts進行outerJoinVertices把newVerts的新資訊update到圖中。

然後繼續傳送新的訊息。

判斷activeMessages 和 指定的迭代次數

繼續迭代直到activeMessages為零和滿足設定的迭代次數值為止。