1. 程式人生 > >跟我一起學Spark之——資料分割槽

跟我一起學Spark之——資料分割槽

前言

        控制資料分佈以獲得最少的網路傳輸可以極大地提升整體效能。

        如果給定RDD只需要被掃描一次(例如大小表join中的小表),我們完全沒有必要對其預先進行分割槽處理,只有當資料集多次在諸如連線這種基於鍵的操作中使用時(大表),分割槽才有幫助。

        儘管Spark沒有給出顯示控制每個鍵具體落在哪一個工作節點上的方法,但是Spark可以確保同一組的鍵出現在同一個節點上

例1:使用hash分割槽將一個RDD分成了100個分割槽,此時鍵的雜湊值對100取模的結果相同的記錄會被放在一個節點上。)

例2:使用範圍分割槽,將鍵在一定範圍區間內的記錄都放在一個節點上。)

 1.獲取RDD的分割槽方式

注意:如果要在後續操作中使用partitioned,那就應該在定義partitioned時,在第三行輸入的最後加上persist()。

原因:如果不呼叫persist(),後續的RDD操作會對partitioned整個譜系重新求值,這會導致pairs一遍又一遍地進行雜湊分割槽操作。

import org.apache.spark
import org.apache.spark.{SparkConf, SparkContext}

object rdd_fqfs {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local[*]")
      .setAppName("RDD-Join")
    val sc = SparkContext.getOrCreate(conf)
    val pairs =sc.parallelize(List((1,1),(2,2),(3,3)))
    val partitioned = pairs.partitionBy(new spark.HashPartitioner(8)).persist()
    println(partitioned.partitioner)
  }
}

2.從分割槽中獲益的操作

        就Spark1.0而言,能從分割槽中獲益的操作有:cogroup()、groupwith()、join()、leftOuterJoin()、rightOuterJoin()、groupByKey()、reduceByKey()、combinerByKey()、lookup()

        如果兩個RDD使用同樣的分割槽方式。並且他們還快取在同樣的機器上(比如一個RDD是通過mapValues()從另一個RDD中創建出來的,這兩個RDD就會擁有相同的鍵和分割槽方式),或者其中一個RDD還沒有倍計算出來,那麼跨節點的資料混洗就不會發生了。

3.影響分割槽方式的操作

        Spark內部知道各操作會如何影響分割槽方式,並將會對資料進行分割槽的操作的結果RDD自動設定為對應的分割槽器。

        列出所有會為生成的結果RDD設好分割槽方式的操作:cogroup()、groupwith()、join()、leftOuterJoin()、rightOuterJoin()、groupByKey()、reduceByKey()、combinerByKey()、partitionByKey()、sort()、

[mapValues()、flatMapValues()、filter()][這三個-如果父RDD有分割槽方式的話]

        對於二元操作,如果其中的一個父RDD已經設定過分割槽方式,那麼結果就會採用那種分割槽方式;如果兩個父RDD都設定過分割槽方式,結果RDD採用第一個父RDD的分割槽方式。(簡單說:一個隨父,兩個隨前父

4.PageRank

        PageRank是一種從RDD分割槽中獲益的執行多次連線的迭代演算法。

        可用於對網頁排序,也可以用於排序科技文章社交網路中有影響的使用者

        計算步驟:

        (1)將每個頁面的排序值初始化為1.0

        (2)在每次迭代中,對頁面p,向其每個相鄰頁面(有直接連結的頁面)傳送一個值為rank(p)/numNeighbors(p)的貢獻值

        (3)將每個頁面的排序值設為0.15+0.85*contributionsReceived

最後兩步會重複幾個迴圈,在此過程中,演算法會逐漸收斂於每個頁面的實際PageRank值,在實際操作中,收斂需要10個迭代

import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
object pagerank_test2 {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local[*]")
      .setAppName("RDD-Join")
    val sc = SparkContext.getOrCreate(conf)
    //假設相鄰頁面列表以Spark objectFile的形式儲存
    val links = sc.objectFile[(String,Seq[String])]("links")
      .partitionBy(new HashPartitioner(100))
      .persist()
    //將每個頁面的排序值初始化為1.0;由於使用mapValues,生成的RDD的分割槽方式會和“links”的一樣
    var ranks = links.mapValues(v=>1.0)
    //執行10輪迭代
    for(i <-0 until 10){
      val contributions = links.join(ranks).flatMap{
        case (pageId,(links,rank)) =>
          links.map(dest=>(dest,rank/links.size))
      }
      ranks = contributions.reduceByKey((x,y) => x+y).mapValues(v => 0.15+0.85*v)
    }
    //寫出最終排名
    ranks.saveAsTextFile("ranks")
  }
}
import org.apache.log4j.{Level,Logger}
import org.apache.spark.graphx.{Graph, GraphLoader}
import org.apache.spark.{SparkConf, SparkContext}
object pagrank_test3 {
    Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
    def main(args: Array[String]) {
      val conf = new SparkConf().setAppName("PageRankTest").setMaster("local[2]")
      val sc = new SparkContext(conf)
      val graph = GraphLoader.edgeListFile(sc,"E:\\Users\\11046\\IdeaProjects\\SparkFly\\followers.txt")
      val ranks = graph.pageRank(0.001).vertices
      val users = sc.textFile("E:\\Users\\11046\\IdeaProjects\\SparkFly\\users.txt").
        map {
          line =>
            val fields = line.split(",")
            (fields(0).toLong, fields(1))
        }
      val ranksByUsername = users.join(ranks).map {
        case(id , (username,rank)) => (username,rank)
      }
      println(ranksByUsername.collect().mkString("\n"))
    }
  }

 感謝:

https://blog.csdn.net/a1234h/article/details/77962536《Scala 中 var 和 val 的區別》

https://blog.csdn.net/mmake1994/article/details/79966145《Spark之GraphX案例-PageRank演算法與分析》

https://www.jianshu.com/p/27d23bc29914《Spark實現PageRank演算法》

5.自定義分割槽方式