1. 程式人生 > >用GraphX分析伴生網絡(二)

用GraphX分析伴生網絡(二)

math 需要 期望 在一起 pregel 測試 個數 maps shuf

8. 過濾噪聲邊

在當前的伴生關系中,邊的權重是基於一對概念同時出現在一篇論文中的頻率來計算的。這種簡單的權重機制的問題在於:它並沒有對一對概念同時出現的原因加以區分,有時一對概念同時出現是由於它們具有某種值得我們關註的語義關系,但有時一對概念同時出現只是因為都頻繁地出現在所有文檔中,同時出現只是碰巧而已。我們需要使用一種新的權重機制,在給定概念在數據中的總體頻繁度的情況下,它需要考慮給定的兩個概念對於一個文檔的“意義”或是“新穎度”。我們將使用皮爾遜卡方測試(Pearson’s chi-squared test)來嚴格計算這種“意義”,也就是說,我們要測試一個概念的出現與其他概念的出現是否是獨立的。

對任何概念對 A 和 B,我們可以建立一個 2x2 的相關表,它包含了這兩個概念同時出現在 MEDLINE 文檔中的次數:

技術分享圖片

該表中 YY、YN、NY和NN 分別代表概念A和B在文檔中 出現/沒出現 的原始次數。T 是文檔總數,YY + NY 為 B 出現在 T 中的總數,YN + NN 為 B 不出現在 T 中的總數,YB + NB為T。YY + YN 為A 出現在T 中的總數,NY + NN 為 A 不出現在 T 中的總數,YA + NA 為T。

卡方測試時,我們可以把 YY、YN、NY和 NN 看成某個未知分布的觀測,可以根據這些值計算卡方統計量:

技術分享圖片

如果樣本實際上是獨立的,那麽我們期望該統計量服從適當自由度的卡方分布。假定 r 和 c 是待比較的兩個隨機變量的基數,則自由度為 (r-1)(c-1)=1。卡方統計量大則表明隨機變量相互獨立的可能性小,因此兩個概念同時出現是有意義的。更具體的講,自由度為 1 的卡方分布的 CDF(累積分布函數)給出一個 p-value,它是我們拒絕變量是獨立的這個被擇假設的置信水平。下面我們使用GraphX來計算伴生圖中每個概念對的卡方統計量。

1. 處理 EdgeTriplet

求卡方統計量時最簡單的部分就是計算T,也就是需要考慮的文檔的總個數,可以直接用 count方法獲取:

val T = medline.count()

計算每個概念在多少篇文檔中出現也相對簡單,直接計算每個概念出現的次數即可:

scala> val topicCountsRdd = topics.map( x => (hashId(x), 1)).reduceByKey(_+_)

topicCounts: org.apache.spark.rdd.RDD[(Long, Int)] = ShuffledRDD[150] at reduceByKey at <console>:37

有了這個表示主題詞出現次數的 VertexRDD,就可以將它作為頂點集合,再加上以後的 edges RDD 來創建一個新圖:

val topicCountGraph = Graph(topicCountsRdd, topicGraph.edges)

計算卡方統計量,需要組合頂點數據(比如每個概念在一個文檔中出現的次數)和邊數據(比如兩個概念同時出現在一個文檔中的次數)。為了支持這種計算,GraphX 提供了一個數據結構EdgeTriplet[VD, ED],該數據結構將頂點和邊的屬性連同兩個頂點的 ID 一起包裝進一個對象。給定topicCountGraph 上的一個 EdgeTriplet,就能算出卡方統計量。首先定義計算卡方統計量的方法:

def chiSq(YY: Int, YB: Int, YA: Int, T: Long): Double = {

val NB = T - YB

val NA = T - YA

val YN = YA - YY

val NY = YB - YY

val NN = T - NY - YN - YY

val inner = (YY * NN - YN * NY) - T / 2.0

T * math.pow(inner, 2) / (YA * NA * YB * NB)

}

然後可以用該方法通過 mapTriplets 算子轉換邊的值。mapTriplets 算子返回一個新圖,這個圖的邊的屬性就是每個伴生對的卡方統計量。於是我們就可以大概知道該統計量在所有邊上的分布情況:

val chiSquaredGraph = topicCountGraph.mapTriplets( triplet => {

chiSq(triplet.attr, triplet.srcAttr, triplet.dstAttr, T)

})

chiSquaredGraph.edges.map(x => x.attr).stats()

res10: org.apache.spark.util.StatCounter = (count: 213745, mean: 877.957047, stdev: 5094.935102, max: 198668.408387, min: 0.000000)

計算完卡方統計量,我們想用它去過濾那些沒有意義的伴生概念對。從邊的分布可以看出,數據中卡方統計量的範圍很大,所以應該過濾掉更多的噪聲邊。對一個 2x2 的相關性表,如果變量沒有相關性,我們期望卡方指標的值服從自由度為 1 的卡方分布。通過查閱自由度為 1 的卡方分布表,可以看到兩個隨機變量 99.999% 不相關的閾值為 10.83,也就是說:如果卡方統計量在大於 10.83 時,兩個隨機變量有 99.999% 的概率相關。

技術分享圖片

根據原書,我們仍使用 19.5 作為閾值,這樣過濾後圖中就只剩下那些置信度非常高的有意義的伴生關系。我們將在圖上利用 subgraph 方法進行過濾,這個方法接受 EdgeTriplet 的一個布爾函數,用以判斷子圖應該包含哪些邊:

> val interesting = chiSquaredGraph.subgraph( triplet => triplet.attr > 19.5)

interesting: org.apache.spark.graphx.Graph[Int,Double] = org.apache.spark.graphx.impl.GraphImpl@18484c41

> interesting.edges.count

res16: Long = 140575

采用以上規則,我們過濾掉了原始伴生關系圖中約三分之一以上的邊。該規則沒有將更多的邊過濾掉,這不是壞事,因為我們預期圖中大多數伴生概念是語義相關,才因此同時出現的次數較多,而不是碰巧。

2. 去掉噪聲邊子圖

我們在過濾後的子圖上運行連通性算法:

> val interestingComponentCounts = sortedConnectedComponents(interesting.connectedComponents())

> interestingComponentCounts.size

res19: Int = 878

可以發現,在過濾掉接近三分之一左右的邊後,連通組件總數並未發生改變。

> interestingComponentCounts.take(10).foreach(println)

(-9222594773437155629,13610)

(-6100368176168802285,5)

(-1043572360995334911,4)

(-8641732605581146616,3)

(-8082131391550700575,3)

(-5453294881507568143,3)

(-6561074051356379043,3)

(-2349070454956926968,3)

(-8186497770675508345,3)

(-858008184178714577,2)

且最大連通組件也並沒有被瓦解。這說明圖的連通結構對噪聲邊過濾有較好的魯棒性。檢查一下過濾後的度分布:

> val interestingDegrees = interesting.degrees.cache()

> interestingDegrees.map(_._2).stats()

res22: org.apache.spark.util.StatCounter = (count: 13721, mean: 20.490489, stdev: 29.864223, max: 863.000000, min: 1.000000)

原始分布為:(count: 13721, mean: 31.155892, stdev: 65.497591, max: 2596.000000, min: 1.000000)

可以看到,過濾後平均值變小了。但是更為人註意的是,最大度數下降了非常多,過濾前為 2596,過濾後為863。我們看一下過濾之後概念和度的關系:

scala> topNamesAndDegrees(interestingDegrees, topicGraph).foreach(println)

(Research,863)

(Disease,637)

(Pharmacology,509)

(Neoplasms,453)

(Toxicology,381)

(Metabolism,321)

(Drug Therapy,304)

(Blood,302)

(Public Policy,279)

(Social Change,277)

從結果來看,這次卡方過濾準則還比較理想:它在清楚對應普遍概念的邊的同時,保留了代表概念之間有意義並且有值得註意的關系的那些邊。我們可以繼續用不同的卡方過濾準則進行試驗,並且觀察它們對圖的連通性和度分布的影響。如果能找到卡方分布的某個值,並使用它作為過濾準則時,圖中大型連通圖組件開始瓦解,這種嘗試會是很有意義的。

9. 小世界網絡

現實生活中的圖具有如下兩個“小世界”屬性:

  1. 網絡中的節點的度普遍不高,但是他們與其他節點形成相對稠密的簇。也就是說,一個節點的鄰接節點大部分也是相連的
  2. 雖然圖中大部分節點的度不高,且形成相對稠密的簇,但是只需要經過少數幾條邊就可能從一個網絡節點快速到達另一節點

對上述兩個屬性,小世界的提出者 “Watts“ 和 “Strogatz” 分別定義了一個指標。通過計算指標並和理想隨機圖的指標進行比較,我們可以測試概念網絡是否具有小世界的屬性。

1. cliques和聚類系數

如果每個頂點都存在一條邊與其他任何節點都相連,那這個圖就是個完全圖。給定一個圖,可能有多個子圖是完全圖,我們可以將這些子圖稱為派系(cliques)。如果途中存在這種許多大型的 cliques,表示這個圖具有某種局部稠密結構,而真實的小世界網絡也具有這種局部稠密結構。

不幸的是,在給定的圖中尋找cliques 是非常困難的。判斷一個圖是否有給定大小的cliques

是一個NP-complete問題,也就是說,即使在一個小圖裏找cliques也是一個計算復雜度非常高的問題。

計算科學家提出了一些簡單的指標,用於給出一個圖裏局部稠密性的情況,而不需要做大量的計算,在一個給定的途中找出所有 cliques。其中一個指標就是 triangle count(三角計數)。

三角形是一個完成圖,頂點V的三角計數就是包含該頂點的三角形個數。三角計數度量了V有多少個鄰接點是相互連接的。Watts 和 Strogatz 定義了一個新的指標,稱為局部聚類系數,它是一個頂點的實際三角計數與其鄰接點可能的三角級數的比率。對無向圖來說,有 k 個鄰接點和 t 個三角計數的頂點,其局部聚類系數C為:

技術分享圖片

現在我們用 GraphX 來計算過濾後的概念圖的每個節點的局部聚類系數。GraphX 有個內置方法 triangleCount,它返回一個 Graph 對象,其中 VertexRDD 包含了每個頂點的三角計數。

> val triCountGraph = interesting.triangleCount()

> triCountGraph.vertices.map(_._2).stats

res27: org.apache.spark.util.StatCounter = (count: 14548, mean: 74.660159, stdev: 295.327094, max: 11023.000000, min: 0.000000)

要計算局部聚類系數,我們需要通過每個頂點可能的三角計數對該頂點的三角形計數進行歸一。每個頂點可能的三角計數可以從 degrees RDD 計算得出:

> val maxTrisGraph = interesting.degrees.mapValues( d => d * (d-1) / 2.0)

現在我們可以直接計算局部聚類系數(註意除 0 問題):

val clusterCoefGraph = triCountGraph.vertices.innerJoin(maxTrisGraph) { (vertexId, triCount, maxTris) => {

if (maxTris == 0) 0 else triCount / maxTris

}

}

然後對所有頂點局部聚類系數取平均值,就得到網絡平均聚類系數:

> clusterCoefGraph.map(_._2).sum() / interesting.vertices.count()

res29: Double = 0.3062462560518856

2. Pregel計算平均路徑長度

小世界網絡的第二個屬性就是任何兩個節點之間的最短路徑是較短的。這裏我們會計算過濾之後的概念圖中的大型連通組件節點的平均路徑長度。

計算圖中頂點之間的路徑長度是一個叠代過程,和我們之前尋找連通組件的叠代過程類似:

  1. 每個階段,每個頂點將保留它所接觸過的頂點列表並記錄到這些頂點的距離。
  2. 接著每個頂點都向其鄰接點查詢它對應的節點列表,如果發現該列表中有新的頂點,就用新節點更新自己的節點列表
  3. 查詢鄰接點並更新自己節點列表的過程一直繼續下去,直到所有節點都沒有發現有新節點需要添加為止

這個在大規模分布式圖上運行的以頂點為中心的叠代式並行計算方法,是基於谷歌的 Pregel。Pregel基於一個稱為“批量同步並行”(BSP,Bulk-Synchronous Parallel)的分布式計算模型。BSP程序將並行處理階段分成兩個步驟:計算和通信。

在計算環節,圖中每個頂點檢查自己的內部狀態並決定是否向圖中其他節點發送消息。

在通信環節,Pregel框架負責將計算環節得到的消息路由到相應的頂點,目標頂點處理接收到的消息後更新自己的內部狀態,並可能在下一個計算環節中產生新消息。計算和通信的過程會一直繼續下去,直到圖中所有頂點都一致投票同意停止運行,這時整個過程就結束了。

BSP 是最早的並行編程模型之一,它具有良好的通用性而且具有容錯性,因此設計 BSP 系統時捕捉並保持任何計算階段的系統狀態是可能的。有了這些狀態後,如果某臺機器發生故障,就可以從其他機器上復制出發生故障的機器的狀態,整個計算就可以回滾到故障發生前的狀態,這樣計算過程就可以繼續下去。

GraphX 提供了表達BSP運算的內置pregel運算符,這個算子是以圖為基礎的。接下來我們會使用這個運算符來實現對一個圖的平均路徑長度的計算,這是一個叠代式的圖並行計算,包括:

  1. 分析出每個頂點需要記錄的狀態
  2. 實現一個函數,需要考慮當前的狀態並根據兩個相連頂點決定下一階段要發送哪些消息
  3. 實現一個函數,匯總來自不同頂點的所有消息,然後將函數的結果傳遞給頂點以便更新其狀態

使用pregel實現分布式算法時需要確定三個問題。第一,要確定何種數據結構表示每個頂點狀態和頂點之間傳遞的消息。對我們要解決的平均路徑長度問題,我們希望每個頂點都有一個查詢表,這個查詢表包含當前頂點所知道的頂點的ID和它到這些頂點的距離。我們將為每個頂點建立一個Map[VertexId, Int] 並把這些信息存儲在其中。類似的,發送給每個頂點的消息也應該有一個查詢表,該表包含頂點ID和距離。這個距離是根據鄰接點傳遞過來的信息計算出來的,同樣可以用Map[VertexId, Int] 來表示這些信息。

確定了頂點狀態和消息內容的數據結構後,我們可以實現兩個函數。第一個函數是 mergeMaps,用於將新消息中的信息合並到頂點狀態之中。對我們討論的問題來說,頂點狀態和消息都是 Map[VertexId, Int] 類型的,因此需要把兩個 map 中的內容合並在一起並將每個 VertexId 關聯到兩個 map 中該 VertexId 對應兩個條目的最小值。

用GraphX分析伴生網絡(二)