跟我一起學Spark之——資料分割槽
前言
控制資料分佈以獲得最少的網路傳輸可以極大地提升整體效能。
如果給定RDD只需要被掃描一次(例如大小表join中的小表),我們完全沒有必要對其預先進行分割槽處理,只有當資料集多次在諸如連線這種基於鍵的操作中使用時(大表),分割槽才有幫助。
儘管Spark沒有給出顯示控制每個鍵具體落在哪一個工作節點上的方法,但是Spark可以確保同一組的鍵出現在同一個節點上
(例1:使用hash分割槽將一個RDD分成了100個分割槽,此時鍵的雜湊值對100取模的結果相同的記錄會被放在一個節點上。)
(例2:使用範圍分割槽,將鍵在一定範圍區間內的記錄都放在一個節點上。)
1.獲取RDD的分割槽方式
注意:如果要在後續操作中使用partitioned,那就應該在定義partitioned時,在第三行輸入的最後加上persist()。
原因:如果不呼叫persist(),後續的RDD操作會對partitioned整個譜系重新求值,這會導致pairs一遍又一遍地進行雜湊分割槽操作。
import org.apache.spark import org.apache.spark.{SparkConf, SparkContext} object rdd_fqfs { def main(args: Array[String]): Unit = { val conf = new SparkConf() .setMaster("local[*]") .setAppName("RDD-Join") val sc = SparkContext.getOrCreate(conf) val pairs =sc.parallelize(List((1,1),(2,2),(3,3))) val partitioned = pairs.partitionBy(new spark.HashPartitioner(8)).persist() println(partitioned.partitioner) } }
2.從分割槽中獲益的操作
就Spark1.0而言,能從分割槽中獲益的操作有:cogroup()、groupwith()、join()、leftOuterJoin()、rightOuterJoin()、groupByKey()、reduceByKey()、combinerByKey()、lookup()
如果兩個RDD使用同樣的分割槽方式。並且他們還快取在同樣的機器上(比如一個RDD是通過mapValues()從另一個RDD中創建出來的,這兩個RDD就會擁有相同的鍵和分割槽方式),或者其中一個RDD還沒有倍計算出來,那麼跨節點的資料混洗就不會發生了。
3.影響分割槽方式的操作
Spark內部知道各操作會如何影響分割槽方式,並將會對資料進行分割槽的操作的結果RDD自動設定為對應的分割槽器。
列出所有會為生成的結果RDD設好分割槽方式的操作:cogroup()、groupwith()、join()、leftOuterJoin()、rightOuterJoin()、groupByKey()、reduceByKey()、combinerByKey()、partitionByKey()、sort()、
[mapValues()、flatMapValues()、filter()][這三個-如果父RDD有分割槽方式的話]
對於二元操作,如果其中的一個父RDD已經設定過分割槽方式,那麼結果就會採用那種分割槽方式;如果兩個父RDD都設定過分割槽方式,結果RDD採用第一個父RDD的分割槽方式。(簡單說:一個隨父,兩個隨前父)
4.PageRank
PageRank是一種從RDD分割槽中獲益的執行多次連線的迭代演算法。
可用於對網頁排序,也可以用於排序科技文章或社交網路中有影響的使用者。
計算步驟:
(1)將每個頁面的排序值初始化為1.0
(2)在每次迭代中,對頁面p,向其每個相鄰頁面(有直接連結的頁面)傳送一個值為rank(p)/numNeighbors(p)的貢獻值
(3)將每個頁面的排序值設為0.15+0.85*contributionsReceived
最後兩步會重複幾個迴圈,在此過程中,演算法會逐漸收斂於每個頁面的實際PageRank值,在實際操作中,收斂需要10個迭代
import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}
object pagerank_test2 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster("local[*]")
.setAppName("RDD-Join")
val sc = SparkContext.getOrCreate(conf)
//假設相鄰頁面列表以Spark objectFile的形式儲存
val links = sc.objectFile[(String,Seq[String])]("links")
.partitionBy(new HashPartitioner(100))
.persist()
//將每個頁面的排序值初始化為1.0;由於使用mapValues,生成的RDD的分割槽方式會和“links”的一樣
var ranks = links.mapValues(v=>1.0)
//執行10輪迭代
for(i <-0 until 10){
val contributions = links.join(ranks).flatMap{
case (pageId,(links,rank)) =>
links.map(dest=>(dest,rank/links.size))
}
ranks = contributions.reduceByKey((x,y) => x+y).mapValues(v => 0.15+0.85*v)
}
//寫出最終排名
ranks.saveAsTextFile("ranks")
}
}
import org.apache.log4j.{Level,Logger}
import org.apache.spark.graphx.{Graph, GraphLoader}
import org.apache.spark.{SparkConf, SparkContext}
object pagrank_test3 {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("PageRankTest").setMaster("local[2]")
val sc = new SparkContext(conf)
val graph = GraphLoader.edgeListFile(sc,"E:\\Users\\11046\\IdeaProjects\\SparkFly\\followers.txt")
val ranks = graph.pageRank(0.001).vertices
val users = sc.textFile("E:\\Users\\11046\\IdeaProjects\\SparkFly\\users.txt").
map {
line =>
val fields = line.split(",")
(fields(0).toLong, fields(1))
}
val ranksByUsername = users.join(ranks).map {
case(id , (username,rank)) => (username,rank)
}
println(ranksByUsername.collect().mkString("\n"))
}
}
感謝:
https://blog.csdn.net/a1234h/article/details/77962536《Scala 中 var 和 val 的區別》
https://blog.csdn.net/mmake1994/article/details/79966145《Spark之GraphX案例-PageRank演算法與分析》
https://www.jianshu.com/p/27d23bc29914《Spark實現PageRank演算法》
5.自定義分割槽方式