1. 程式人生 > >spark primer 計算每個每個頂點之間最短路徑

spark primer 計算每個每個頂點之間最短路徑

這裡寫圖片描述
edge.txt (邊資料)
1 2 2
1 3 5
1 4 1
2 3 2
3 4 2
4 5 3
5 1 2
vertex.txt(頂點資料)
1 2
1 3
1 4
2 3
3 4
4 6
5 1

package main.scala.com.spark.demo.com.spark.graphx


import java.io.PrintWriter
import java.io.File
import org.apache.spark.graphx.{VertexRDD, Edge, Graph}
import org.apache.spark
.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} import scala.collection.immutable.HashSet /** * Created by root on 15-10-18. */ object MultiPointshortestPathFinal { def main(args: Array[String]) { val conf = new SparkConf().setAppName("shortestpath").setMaster("local") val sc = new SparkContext(conf) val edgeFile:RDD[String] = sc.textFile
("hdfs://127.0.0.1:9000/data01/edge.txt") val vertexFile:RDD[String] = sc.textFile("hdfs://127.0.0.1:9000/data01/vertex.txt") //edge val edge = edgeFile.map { e => val fields = e.split(" ") Edge(fields(0).toLong,fields(1).toLong,fields(2).toLong) } //vertex val vertex = vertexFile.map
{e=> val fields = e.split(" ") (fields(0).toLong,fields(1)) } val graph = Graph(vertex,edge,"").persist() // println(graph.edges.collect.mkString("\n")) //用pregel計算最短路徑 // Initialize the graph such that all vertices except the root have distance infinity. //初始化各節點到原點的距離 val vset = scala.collection.mutable.Set(1,2,3,4,5); // var verticesArr = VertexRDD[10] for(sid <- vset){ val sourceId = sid val initialGraph = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity) //begin val sssp = initialGraph.pregel(Double.PositiveInfinity)( // Vertex Program,節點處理訊息的函式,dist為原節點屬性(Double),newDist為訊息型別(Double) (id, dist, newDist) => math.min(dist, newDist), // Send Message,傳送訊息函式,返回結果為(目標節點id,訊息(即最短距離)) triplet => { if (triplet.srcAttr + triplet.attr < triplet.dstAttr) { Iterator((triplet.dstId, triplet.srcAttr + triplet.attr)) } else { Iterator.empty } }, //Merge Message,對訊息進行合併的操作,類似於Hadoop中的combiner (a, b) => math.min(a, b) )//over println(sssp.vertices.collect.mkString("\n")) sssp.vertices.saveAsTextFile("hdfs://127.0.0.1:9000/data01/short1"+sid+".txt") println("vertexInfo") println(vertex.collect.mkString("\n")) println("sid==>>"+sid) } } } val edgeFile:RDD[String] = sc.textFile("hdfs://127.0.0.1:9000/data01/edge.txt") val vertexFile:RDD[String] = sc.textFile("hdfs://127.0.0.1:9000/data01/vertex.txt") //edge val edge = edgeFile.map { e => val fields = e.split(" ") Edge(fields(0).toLong,fields(1).toLong,fields(2).toLong) } //vertex val vertex = vertexFile.map{e=> val fields = e.split(" ") (fields(0).toLong,fields(1)) } val graph = Graph(vertex,edge,"").persist() // println(graph.edges.collect.mkString("\n")) //用pregel計算最短路徑 // Initialize the graph such that all vertices except the root have distance infinity. //初始化各節點到原點的距離 val vset = scala.collection.mutable.Set(1,2,3,4,5); // var verticesArr = VertexRDD[10] for(sid <- vset){ val sourceId = sid val initialGraph = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity) //begin val sssp = initialGraph.pregel(Double.PositiveInfinity)( // Vertex Program,節點處理訊息的函式,dist為原節點屬性(Double),newDist為訊息型別(Double) (id, dist, newDist) => math.min(dist, newDist), // Send Message,傳送訊息函式,返回結果為(目標節點id,訊息(即最短距離)) triplet => { if (triplet.srcAttr + triplet.attr < triplet.dstAttr) { Iterator((triplet.dstId, triplet.srcAttr + triplet.attr)) } else { Iterator.empty } }, //Merge Message,對訊息進行合併的操作,類似於Hadoop中的combiner