1. 程式人生 > >GraphX實現N度關系

GraphX實現N度關系

att targe else 傳播 ndt 準備 合成 -a dsm

背景

本文給出了一個簡單的計算圖中每一個點的N度關系點集合的算法,也就是N跳關系。

之前通過官方文檔學習和理解了一下GraphX的計算接口。

N度關系

目標:
在N輪裏。找到某一個點的N度關系的點集合。

實現思路:
1. 準備好邊數據集。即”1 3”, “4, 1” 這種點關系。

使用GraphLoader 的接口load成Graph
2. 初始化每一個Vertice的屬性為空Map
3. 使用aggregateMessagesVerticeIDtotalRounds傳播出度點上,出度點把收集到的信息合成一個大Map
4. 更新後的Vertice

與原圖進行”Join”,更新圖中的變化過的點屬性
5. 反復步驟3和4,最後輸出更新了N輪之後的有關系的Vertice

spark-shell下可運行的代碼:

import org.apache.spark._
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD

val friendsGraph = GraphLoader.edgeListFile(sc, "data/friends.txt")
val totalRounds: Int = 3 // total N round
var targetVerticeID: Long = 6
// target vertice // round one var roundGraph = friendsGraph.mapVertices((id, vd) => Map()) var roundVertices = roundGraph.aggregateMessages[Map[Long, Integer]]( ctx => { if (targetVerticeID == ctx.srcId) { // only the edge has target vertice should send msg ctx.sendToDst(Map(ctx.srcId -> totalRounds)) } }, _ ++ _ ) for
(i <- 2 to totalRounds) { val thisRoundGraph = roundGraph.outerJoinVertices(roundVertices){ (vid, data, opt) => opt.getOrElse(Map[Long, Integer]()) } roundVertices = thisRoundGraph.aggregateMessages[Map[Long, Integer]]( ctx => { val iterator = ctx.srcAttr.iterator while (iterator.hasNext) { val (k, v) = iterator.next if (v > 1) { val newV = v - 1 ctx.sendToDst(Map(k -> newV)) ctx.srcAttr.updated(k, newV) } else { // do output and remove this entry } } }, (newAttr, oldAttr) => { if (oldAttr.contains(newAttr.head._1)) { // optimization to reduce msg oldAttr.updated(newAttr.head._1, 1) // stop sending this ever } else { oldAttr ++ newAttr } } ) } val result = roundVertices.map(_._1).collect

數據和輸出

2 1
4 1
1 2
6 3
7 3
7 6
6 7
3 7
4 3
1 6
6 1
Array(6, 1, 3, 7)

總結

實現的比較naive。還有很多能夠優化的地方。

全文完 :)

GraphX實現N度關系