1. 程式人生 > >案例4-使用hadoop-mapreduce進行PageRank計算

案例4-使用hadoop-mapreduce進行PageRank計算

什麼是pagerank

  1. PageRank是Google專有的演算法,用於衡量特定網頁相對於搜尋引擎索引中的其他網頁而言的重要程度。
  2. 是Google創始人拉里·佩奇和謝爾蓋·布林於1997年創造的
  3. PageRank實現了將連結價值概念作為排名因素。

計算環境

  • Hadoop-2.5.2
  • 四臺主機
  • 兩臺NN的HA
  • 兩臺RM的HA
  • 離線計算框架MapReduce

 

演算法原理(1)

  • 入鏈 ====投票

                   PageRank讓連結來“投票“,到一個頁面的超連結相當於對該頁投一票

  • 入鏈數量

                  如果一個頁面節點接收到的其他網頁指向的入鏈數量越多,那麼這個頁面越重要

  • 入鏈質量

                     指向頁面A的入鏈質量不同,質量高的頁面會通過連結向其他頁面傳遞更多的權重。所以越是質量高的頁面指向頁面A,則頁面A越重要

 

網路上各個頁面的連結圖

假設網路上有ABCD四個網頁,A指向B和D,B指向C,C指向A和B,D指向B和C。

如上圖所示。

 

演算法原理(2)

  • 初始值
    • 每個頁面設定相同的PR值
    • Google的pagerank演算法給每個頁面的PR初始值為1。
  • 迭代遞迴計算(收斂)
    • Google不斷的重複計算每個頁面的PageRank。那麼經過不斷的重複計算,這些頁面的PR值會趨向於穩定,也就是收斂的狀態。
    • 在具體企業應用中怎麼樣確定收斂標準
      • 1、每個頁面的PR值和上一次計算的PR相等
      • 2、設定一個差值指標(0.0001)。當所有頁面和上一次計算的PR差值平均小於該標準時,則收斂
      • 3、設定一個百分比(99%),當99%的頁面和上一次計算的PR相等

在本例子中我們使用第二種方法來作為收斂標準。

 

演算法原理(3)

  • 修正PageRank計算公式
    •  由於存在一些出鏈為0,也就是那些不連結任何其他網頁的網, 也稱為孤立網頁,使得很多網頁能被訪問到。因此需要對 PageRank公式進行修正,即在簡單公式的基礎上增加了阻尼係數(damping factorq q一般取值q=0.85
  • 完整PageRank計算公式

N表示網頁的個數

Pj表示某個網頁,PageRank(Pj)表示當前網頁的PR值,L(Pj)表示網頁的超連結個數。

執行結果:

執行了29次完成:

ABCD四個網頁的pageRank值如圖所示。

程式碼:

package com.jeff.mr.pagerank;


import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class RunJob {
	
	public static enum Mycounter{
		my
	}

	public static void main(String[] args) {
		Configuration config =new Configuration();
		config.set("fs.defaultFS", "hdfs://node4:8020");
		config.set("yarn.resourcemanager.hostname", "node4");
		double d =0.001;
		int i=0;
		while(true){
			i++;
			try {
				config.setInt("runCount", i);
				FileSystem fs =FileSystem.get(config);
				Job job =Job.getInstance(config);
				job.setJarByClass(RunJob.class);
				job.setJobName("pr"+i);
				job.setMapperClass(PageRankMapper.class);
				job.setReducerClass(PageRankReducer.class);
				job.setMapOutputKeyClass(Text.class);
				job.setMapOutputValueClass(Text.class);
				job.setInputFormatClass(KeyValueTextInputFormat.class);
				Path inputPath =new Path("/usr/input/pagerank.txt");
				if(i>1){
					inputPath =new Path("/usr/output/pr"+(i-1));
				}
				FileInputFormat.addInputPath(job, inputPath);
				
				Path outpath =new Path("/usr/output/pr"+i);
				if(fs.exists(outpath)){
					fs.delete(outpath, true);
				}
				FileOutputFormat.setOutputPath(job, outpath);
				
				boolean f= job.waitForCompletion(true);
				if(f){
					System.out.println("success.");
					long sum= job.getCounters().findCounter(Mycounter.my).getValue();
					System.out.println(sum);
					//計算所有網頁Node的PR差值之和除以網頁個數,再縮小1000倍
					double avgd= sum/4000.0;
					//比較結果和收斂標準
					if(avgd < d){
						break;
					}
				}
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
		
	}
	
	/**
	 * key為輸入檔案的每一行的第一個字串即網頁Node
	 * value為輸入檔案的每一行除第一個字串外的其他字串,即key對應網頁所有的超連結
	 * @author jeffSheng
	 * 2018年9月27日
	 */
	static class PageRankMapper extends Mapper<Text, Text, Text, Text>{
		protected void map(Text key, Text value,Context context) throws IOException, InterruptedException {
			//設定runCount即執行次數,沒有設定則預設為1
			int runCount= context.getConfiguration().getInt("runCount", 1);
			//當前網頁
			String page = key.toString();
			//當前網頁對應的Node物件
			Node node = null;
			//第一次執行則當前網頁Node的pageRank為1.0,符合谷歌的規定
			if(runCount==1){
				node = Node.fromMR("1.0" + "\t" + value.toString());
			}else{
				node = Node.fromMR(value.toString());
			}
			//輸出資料key為當前網頁Page,比如A網頁。value為當前網頁對應的Node比如:1.0	B	D
			context.write(new Text(page), new Text(node.toString()));//A:1.0	B	D
			//如果node包含超連結,則說明是不是孤立網頁
			if(node.containsAdjacentNodes()){
				//根據公式計算當前網頁超連結的PR平均值outValue
				double outValue = node.getPageRank() / node.getAdjacentNodeNames().length;
				//迭代每一個超連結對應指向的網頁Node,並輸出其PR值
				for (int i = 0; i < node.getAdjacentNodeNames().length; i++) {
					String  outPage = node.getAdjacentNodeNames()[i];
					context.write(new Text(outPage), new Text(outValue+""));//B:0.5  D:0.5
				}
			}
		}
	}
	
	/**
	 * 累加所有網頁的PR差值
	 * reduceTask:
	 * 		輸入資料Key為網頁page比如A即sourceNode,Value為對應的PR值,或者是包含PR值和超連結的字串
	 * @author jeffSheng
	 * 2018年9月27日
	 */
	static class PageRankReducer extends Reducer<Text, Text, Text, Text>{
		protected void reduce(Text key, Iterable<Text> arg1, Context context) throws IOException, InterruptedException {
			double sum =0.0;
			Node sourceNode = null;
			//迭代每一組的page對應的PR值,並轉化為網頁Node,這個Node可能只是PR值,或者是包含PR和Node的超連結
			for(Text i : arg1){
				Node node = Node.fromMR(i.toString());
				//原來的那個Node(最初包含超連結的網頁Page所擁有的PR+超連結)的Node,比如A:1.0	 B	D
				if(node.containsAdjacentNodes()){
					sourceNode = node;//A:1.0	 B	D
				}else{
					//當前sourceNode作為某網頁的子超連結節點Node的PageRank累加,比如A:0.5
					sum = sum + node.getPageRank();//A:0.5
				}
			}
			//當前sourceNode根據公式計算出來的PageRank
			double newPR=(0.15/4)+(0.85*sum);
			System.out.println("*********** new pageRank value is "+newPR);
			
			//把新的pr值和計算之前的pr比較
			double d= newPR -sourceNode.getPageRank();
			//把pr之差放大1000倍,取絕對值後累加所有網頁的PR差值
			int j=(int)( d*1000.0);
			j=Math.abs(j);
			System.out.println(j+"___________");
			context.getCounter(Mycounter.my).increment(j);;
			//重新給sourceNode賦值pageRank
			sourceNode.setPageRank(newPR);
			context.write(key, new Text(sourceNode.toString()));
		}
	}
}

 

package com.jeff.mr.pagerank;

import java.io.IOException;
import java.util.Arrays;

import org.apache.commons.lang.StringUtils;

public class Node {

	 private double pageRank=1.0;
	  private String[] adjacentNodeNames;

	  public static final char fieldSeparator = '\t';

	  public double getPageRank() {
	    return pageRank;
	  }

	  public Node setPageRank(double pageRank) {
	    this.pageRank = pageRank;
	    return this;
	  }

	  public String[] getAdjacentNodeNames() {
	    return adjacentNodeNames;
	  }

	  public Node setAdjacentNodeNames(String[] adjacentNodeNames) {
	    this.adjacentNodeNames = adjacentNodeNames;
	    return this;
	  }

	  public boolean containsAdjacentNodes() {
	    return adjacentNodeNames != null && adjacentNodeNames.length>0;
	  }

	  @Override
	  public String toString() {
	    StringBuilder sb = new StringBuilder();
	    sb.append(pageRank);

	    if (getAdjacentNodeNames() != null) {
	      sb.append(fieldSeparator)
	          .append(StringUtils
	              .join(getAdjacentNodeNames(), fieldSeparator));
	    }
	    return sb.toString();
	  }

	  //value =1.0	B	D
	  public static Node fromMR(String value) throws IOException {
		//將字串按照分隔符拆分成陣列
	    String[] parts = StringUtils.splitPreserveAllTokens(value, fieldSeparator);
	    if (parts.length < 1) {
	      throw new IOException(
	          "Expected 1 or more parts but received " + parts.length);
	    }
	    //建立網頁物件即Node,陣列的第一個元素作為網頁Node的pageRank值
	    Node node = new Node().setPageRank(Double.valueOf(parts[0]));
	    //陣列個數大於1,表示當前的字串表示的是含有超連結的網頁Node,設定除第一位外的其他Node作為當前網頁的超連結陣列
	    if (parts.length > 1) {
	      node.setAdjacentNodeNames(Arrays.copyOfRange(parts, 1,parts.length));
	    }
	    return node;
	  }
}