1. 程式人生 > >Spark元件之GraphX學習7--隨機圖生成和reduce最大或最小出度/入度/度

Spark元件之GraphX學習7--隨機圖生成和reduce最大或最小出度/入度/度

1解釋

通過自定義函式

reduce最大或最小出度/入度/度

2.程式碼:

/**
 * @author xubo
 * ref http://spark.apache.org/docs/1.5.2/graphx-programming-guide.html
 * time 20160503
 */

package org.apache.spark.graphx.learning

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.graphx.Graph
import org.apache.spark.graphx.VertexId
import org.apache.spark.graphx.VertexRDD
import org.apache.spark.graphx.util.GraphGenerators

object GraphGeneratorsAndMaxMin {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("GraphOperatorsStructuralMask").setMaster("local[4]")
    // Assume the SparkContext has already been constructed
    val sc = new SparkContext(conf)

    // Import random graph generation library
    // Create a graph with "age" as the vertex property.  Here we use a random graph for simplicity.
    val graph: Graph[Double, Int] =
      GraphGenerators.logNormalGraph(sc, numVertices = 5).mapVertices((id, _) => id.toDouble)
    // Compute the number of older followers and their total age

    println("Graph:");
    println("sc.defaultParallelism:" + sc.defaultParallelism);
    println("vertices:");
    graph.vertices.collect.foreach(println(_))
    println("edges:");
    graph.edges.collect.foreach(println(_))
    println("count:" + graph.edges.count);
    println("\ndegrees");
    graph.degrees.foreach(println)
    println("\ninDegrees");
    graph.inDegrees.foreach(println)
    println("\noutDegrees");
    graph.outDegrees.foreach(println)

    // Define a reduce operation to compute the highest degree vertex
    def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {
      if (a._2 > b._2) a else b
    }

    // Define a reduce operation to compute the highest degree vertex
    def min(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {
      if (a._2 < b._2) a else b
    }

    // Compute the max degrees
    val maxInDegree: (VertexId, Int) = graph.inDegrees.reduce(max)
    val maxOutDegree: (VertexId, Int) = graph.outDegrees.reduce(max)
    val maxDegrees: (VertexId, Int) = graph.degrees.reduce(max)

    println("\nmax:");
    println("maxDegree:" + (graph.degrees.reduce(max)));
    println("maxInDegree:" + graph.inDegrees.reduce(max));
    println("maxoutDegree:" + graph.outDegrees.reduce(max));
     println("\nmin:");
    println("minDegree:" + (graph.degrees.reduce(min)));
    println("minInDegree:" + graph.inDegrees.reduce(min));
    println("minoutDegree:" + graph.outDegrees.reduce(min));
    println("end");

  }
}


3.結果:

Graph:
sc.defaultParallelism:4
vertices:
(4,4.0)
(0,0.0)
(1,1.0)
(2,2.0)
(3,3.0)
edges:
Edge(0,2,1)
Edge(0,3,1)
Edge(0,3,1)
Edge(0,3,1)
Edge(1,0,1)
Edge(1,0,1)
Edge(1,2,1)
Edge(2,0,1)
Edge(2,2,1)
Edge(2,4,1)
Edge(3,1,1)
Edge(3,1,1)
Edge(3,2,1)
Edge(4,0,1)
Edge(4,4,1)
count:15

degrees
(2,7)
(1,5)
(3,6)
(4,4)
(0,8)

inDegrees
(1,2)
(4,2)
(0,4)
(3,3)
(2,4)

outDegrees
(3,3)
(1,3)
(2,3)
(4,2)
(0,4)

max:
maxDegree:(0,8)
maxInDegree:(0,4)
maxoutDegree:(0,4)

min:
maxDegree:(4,4)
maxInDegree:(1,2)
maxoutDegree:(4,2)
end



參考

【1】 http://spark.apache.org/docs/1.5.2/graphx-programming-guide.html