1. 程式人生 > >Hadoop與Spark演算法分析(四)——PageRank演算法

Hadoop與Spark演算法分析(四)——PageRank演算法

PageRank是用於解決網頁重要性排序的關鍵技術之一,其基於網頁之間連結關係構建一個有向圖結構,實現各個網頁級別的劃分。一個網頁的PageRank值(後面簡稱PR值),取決於其他網頁對該網頁的貢獻和,以公式形式表示為這裡寫圖片描述,其中U表示所有網頁指向網頁b的網頁集合,L(a)表示網頁a的出度,d表示使用者瀏覽一個網頁的隨機概率,用於解決網頁關係間的陷阱問題。根據公式遞迴計算,各網頁的PR值將最終趨於穩定。可以發現,該演算法的執行實質是一個概率矩陣的迭代乘法運算。

1. 實驗準備

由於Hadoop與Spark對於PageRank演算法的實現過程不同,這裡分別對Hadoop與Spark演算法輸入檔案進行說明。
對於Hadoop輸入檔案,每行的資料資訊包含網頁ID、網頁初始PR值1.0以及該網頁連結的其他網頁ID,以製表符隔開,如

A   1   B,C
B   1   C
C   1   A,D
D   1   B,E
E   1   A

對於Spark輸入檔案,以網頁ID以及該網頁連結的每一個網頁ID,作為單獨一行儲存,如

A   B
A   C
B   C
C   A
C   D
D   B
D   E
E   A

2. Hadoop實現

為了完成後續的迭代計算,map過程需要將連結關係圖和對其他網頁的貢獻值分別傳遞給reduce端。
reduce過程根據key將最終計算的PR值與連結關係圖合併輸出,用於下次迭代的map端。
測試以10次為收斂標準迭代進行,具體程式碼實現如下:

package org.hadoop.test;

import
java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import
org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class PageRank { private static final double d = 0.85; public static class PRMapper extends Mapper<Object, Text, Text, Text>{ Text link_key = new Text(); Text pr_value = new Text(); Text id_key = new Text(); Text link_value = new Text(); public void map(Object key, Text value, Context context) throws IOException, InterruptedException{ StringTokenizer line = new StringTokenizer(value.toString()); String id = line.nextToken(); double pr = Double.parseDouble(line.nextToken()); String elements = line.nextToken(); //為連結的網頁組作標記 link_value.set("@"+elements); String[] links = elements.split(","); int count = links.length; for (String link : links){ String prValue = String.valueOf(pr/count); link_key.set(link); pr_value.set(prValue); //傳遞所貢獻的pr值 context.write(link_key, pr_value); } id_key.set(id); //傳遞拓撲圖 context.write(id_key, link_value); } } public static class PRReducer extends Reducer<Text, Text, Text, Text>{ Text result = new Text(); public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException{ double rank = 0; String pages = ""; for (Text value : values){ String tmp = value.toString(); if (tmp.startsWith("@")){ pages = "\t"+tmp.substring(tmp.indexOf("@")+1); continue; } rank += Double.parseDouble(tmp); } rank = 1-d+d*rank; result.set(rank+pages); context.write(key, result); } } public static void main(String[] args) throws Exception{ if (args.length != 2){ System.err.println("Usage: <in> <out>"); System.exit(2); } Configuration conf = new Configuration(); Job job1 = new Job(conf, "PageRank_tmp"); job1.setJarByClass(PageRank.class); job1.setMapperClass(PRMapper.class); job1.setReducerClass(PRReducer.class); job1.setOutputKeyClass(Text.class); job1.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job1, new Path(args[0])); FileOutputFormat.setOutputPath(job1, new Path(args[1]+"_tmp/output1")); job1.waitForCompletion(true); for (int i=0;i<8;i++){ String inpath = args[1]+"_tmp/output"+String.valueOf(i+1); String outpath = args[1]+"_tmp/output"+String.valueOf(i+2); Job job2 = new Job(conf, "PageRank_tmp"); job2.setJarByClass(PageRank.class); job2.setMapperClass(PRMapper.class); job2.setReducerClass(PRReducer.class); job2.setOutputKeyClass(Text.class); job2.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job2, new Path(inpath)); FileOutputFormat.setOutputPath(job2, new Path(outpath)); job2.waitForCompletion(true); } String inpath = args[1]+"_tmp/output9"; String outpath = args[1]; Job job3 = new Job(conf, "PageRank"); job3.setJarByClass(PageRank.class); job3.setMapperClass(PRMapper.class); job3.setReducerClass(PRReducer.class); job3.setOutputKeyClass(Text.class); job3.setOutputValueClass(Text.class); FileInputFormat.addInputPath(job3, new Path(inpath)); FileOutputFormat.setOutputPath(job3, new Path(outpath)); job3.waitForCompletion(true); } }

3. Spark實現

Spark實現過程將該網頁的連結網頁RDD與PR值RDD合併為一個RDD執行運算元操作,並將連結關係圖緩存於記憶體,便於後續迭代計算。
具體程式碼實現如下:

import org.apache.spark.{SparkConf, SparkContext}

/**
  * Created by rose on 16-4-26.
  */
object PageRank {
  def main(args:Array[String]): Unit ={
    if(args.length != 2){
      println("Usage: <in> <out>")
      return
    }

    val conf = new SparkConf().setAppName("PageRank")
    val sc = new SparkContext(conf)
    val lines = sc.textFile(args(0))
    val links = lines.map(line=>{
      val parts = line.split("\\s")
      (parts(0), parts(1))
    }).distinct().groupByKey().cache()
    var ranks = links.mapValues(v => 1.00)

    for(i <- 1 to 10){
      val contrib = links.join(ranks).values.flatMap {
        case (urls, rank) => {
          val size = urls.size
          urls.map(url => (url, rank/size))
        }
      }
      ranks = contrib.reduceByKey(_+_).mapValues(0.15+0.85*_)
    }
    ranks.saveAsTextFile(args(1))
  }
}

4. 執行過程

1)上傳本地檔案到HDFS目錄下
在HDFS上建立輸入資料夾

$hadoop fs -mkdir -p pageRank/input

上傳本地測試檔案到叢集的input目錄下

$hadoop fs -put ~/file* pageRank/input

檢視叢集檔案目錄

$hadoop fs -ls pageRank/input

2)執行程式
將矩陣乘法演算法程式PageRank打包為字尾名為jar的壓縮檔案PageRank.jar,進入到壓縮檔案所在資料夾(這裡以一個file輸入檔案和一個output輸出資料夾為例說明)。
Hadoop程式執行如下命令執行

$hadoop jar ~/hadoop/PageRank.jar org.hadoop.test.PageRank pageRank/input/file pageRank/hadoop/output

Spark程式執行如下命令執行

$spark-submit --master yanr-client --class PageRank ~/spark/PageRank.jar hdfs://master:9000/pageRank/input/file hdfs://master:9000/pageRank/spark/output

3)檢視執行結果
檢視Hadoop執行結果

$hadoop fs -ls pageRank/hadoop/output

檢視Spark執行結果

$hadoop fs -ls pageRank/spark/output

5. 測試對比

這裡寫圖片描述

如圖所示為PageRank測試對比圖,Hadoop計算需要將每次迭代計算結果儲存在HDFS中,以供下次迭代作為輸入資料進行計算,而Spark只需要每次迭代計算上一次快取在記憶體中的結果資料即可。對比Hadoop平臺,Spark對於迭代式運算有著顯著的優勢,隨著測試集複雜度的上升,優勢更加明顯。