1. 程式人生 > >SparkGraphX加權最短路徑演算法實現

SparkGraphX加權最短路徑演算法實現

版本:Spark 1.6 
該版本自帶的最短路徑演算法shortestPaths沒辦法自定義權重(預設每條邊的權重都一樣),不符合現實生活,比如在地圖中計算兩個位置的最短路線,要考慮線路的長度,線路的擁堵情況等多方面,所以寫了個加權最短路徑演算法加入距離屬性

演算法說明: 
加權最短路徑演算法計算所有的頂點到特定點的距離: 
1.初始化開始頂點的距離為0,所有其他節點距離設定為無窮大 
2. 將當前頂點設為初始頂點 
3. 所有與當前頂點相鄰的頂點,將距離設定為當前頂點的距離加上將當前頂點連線到該其他頂點的邊的長度之和的較小值。 
4.將當前頂點標記為已訪問 
5.迭代停止條件, 如果沒有未訪問的頂點 
6.迭代第三步 
程式碼:

 def dijkstra[VD](g:Graph[VD,Double],origin:VertexId)={
    var g2=g.mapVertices((vid,vd)=> (false,if(vid == origin) 0 else Double.MaxValue)) //修改點的屬性,如果是特殊頂點屬性為0,其他為無窮,false標誌著是否被訪問過
    //每次迴圈都會訪問一個點並標記為true,共迴圈vertices.count-1次(特定節點除外)
    for(i <- 1L to g.vertices.count-1){
      val currentVertexId
= g2.vertices.filter(!_._2._1) .fold((0L,(false,Double.MaxValue)))((a,b)=> if(a._2._2 < b._2._2) a else b)._1 //迭代每一個頂點,取出所有頂點的屬性最小值(路徑長度),currentVertexId,是屬性最小值的頂點ID val newDistances=g2.aggregateMessages[Double]( ctx=> if (ctx.srcId == currentVertexId)//
找到srcId是最小屬性的那條關係 ctx.sendToDst(ctx.srcAttr._2+ctx.attr), //把最小路徑和邊的路徑合在一起發給dstId (a,b)=>math.min(a,b))
//收到訊息取最小 g2=g2.outerJoinVertices(newDistances)((vid,vd,newSum)=> (vd._1 || vid ==currentVertexId,math.min(vd._2,newSum.getOrElse(Double.MaxValue)))) //這裡走過的點這設為true,把每個點的路徑長度屬性更新 } //g2裡面存的都是每個頂點到指定頂點的最小距離 g.outerJoinVertices(g2.vertices)((vid,vd,dist)=> (vd,dist.getOrElse((false,Double.MaxValue))._2)) }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21

準備資料 
vertices 
(1L, “A”), 
(2L, “B”), 
(3L, “C”), 
(4L, “D”), 
(5L, “E”), 
(6L, “F”), 
(7L, “G”) 
edges(邊屬性是兩點間距離) 
Edge(1L, 2L, 7.0), 
Edge(1L, 4L, 5.0), 
Edge(2L, 3L, 8.0), 
Edge(2L, 4L, 9.0), 
Edge(2L, 5L, 7.0), 
Edge(3L, 5L, 5.0), 
Edge(4L, 5L, 15.0), 
Edge(4L, 6L, 6.0), 
Edge(5L, 6L, 8.0), 
Edge(5L, 7L, 9.0), 
Edge(6L, 7L, 11.0) 
用Gephi繪圖(後面附贈graph物件生成.gexf檔案方法): 
這裡寫圖片描述 
步驟: 
1.把邊和頂點資料載入生成graph物件 
2.呼叫dijkstra方法傳入圖物件和要算的頂點(本例用的頂點”1L”) 
例: 
dijkstra(myGraph, 1L) 
3.取出頂點,最終的長度在頂點屬性的第二個引數 
.vertices.map(_._2).collect 
結果為每個點到A(“1L”)的最短距離 
這裡寫圖片描述 
大家可以根據上面的gephi繪圖檢視結果(最短距離)是否正確

最短路徑演算法只需在頂點屬性中加一個list記錄路徑就好,剩下其他的和之前的一樣 
程式碼:

def dijkstra[VD](g:Graph[VD,Double], origin:VertexId) = {
    var g2 = g.mapVertices(
      (vid,vd) => (false, if (vid == origin) 0 else Double.MaxValue,
        List[VertexId]()))//新加入了一個欄位用來記錄路線
    for (i <- 1L to g.vertices.count-1) {
      val currentVertexId =
        g2.vertices.filter(!_._2._1)
          .fold((0L,(false,Double.MaxValue,List[VertexId]())))((a,b) =>
            if (a._2._2 < b._2._2) a else b)
          ._1 //找到最短路徑的節點
      val newDistances = g2.aggregateMessages[(Double,List[VertexId])](
        ctx => if (ctx.srcId == currentVertexId)
          ctx.sendToDst((ctx.srcAttr._2 + ctx.attr,
            ctx.srcAttr._3 :+ ctx.srcId)), //這裡把頂點id加入list
        (a,b) => if (a._1 < b._1) a else b)
      g2 = g2.outerJoinVertices(newDistances)((vid, vd, newSum) => {
        val newSumVal =
          newSum.getOrElse((Double.MaxValue,List[VertexId]()))
        (vd._1 || vid == currentVertexId,
          math.min(vd._2, newSumVal._1),
          if (vd._2 < newSumVal._1) vd._3 else newSumVal._2)})
    }
    g.outerJoinVertices(g2.vertices)((vid, vd, dist) =>
      (vd, dist.getOrElse((false,Double.MaxValue,List[VertexId]()))
        .productIterator.toList.tail))
  }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26

最終結果 
這裡寫圖片描述 
list中記載的是每個頂點到達A(“1L”)的最短路徑

.gexf檔案生成方式

def toGexf[VD,ED](g:Graph[VD,ED])=
    "<?xml version=\"1.0\" encoding=\"UTF-8\"?>\n" +"<gexf xmlns=\"http://www.gexf.net/1.2draft\" version=\"1.2\">\n" +" <graph mode=\"static\" defaultedgetype=\"directed\">\n" +" <nodes>\n" +g.vertices.map(v => " <node id=\"" + v._1 + "\" label=\"" +v._2 + "\" />\n").collect.mkString +" </nodes>\n" +" <edges>\n" +g.edges.map(e => " <edge source=\"" + e.srcId +"\" target=\"" + e.dstId + "\" label=\"" + e.attr +"\" />\n").collect.mkString +" </edges>\n" +" </graph>\n" +"</gexf>"

val pw=new java.io.PrintWriter("myGraph.gexf")
pw.write(toGexf(toGexf(myGraph)))
pw.close
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

.gexf可用gephi直接使用