1. 程式人生 > >MapReduce部分練習使用API程式設計示例之PageRank

MapReduce部分練習使用API程式設計示例之PageRank

package com.sxt.hadoop.mr.pagerank;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class RunJob {

	public static enum Mycounter {
		my
	}

	public static void main(String[] args) {
		
		Configuration conf = new Configuration(true);
		conf.set("mapreduce.app-submission.corss-paltform", "true");
		//如果分散式執行,必須打jar包
		//且,client在叢集外非hadoop jar 這種方式啟動,client中必須配置jar的位置
		conf.set("mapreduce.framework.name", "local");
		//這個配置,只屬於,切換分散式到本地單程序模擬執行的配置
		//這種方式不是分散式,所以不用打jar包
		
		
		double d = 0.00001;
		int i = 0;
		while (true) {
			i++;
			try {
				conf.setInt("runCount", i);
				
				FileSystem fs = FileSystem.get(conf);
				Job job = Job.getInstance(conf);				
				job.setJarByClass(RunJob.class);
				job.setJobName("pr" + i);
				job.setMapperClass(PageRankMapper.class);
				job.setReducerClass(PageRankReducer.class);
				job.setMapOutputKeyClass(Text.class);
				job.setMapOutputValueClass(Text.class);
//				job.setJar("/ooxx/jar");
				//使用了新的輸入格式化類
				job.setInputFormatClass(KeyValueTextInputFormat.class);
				
				
				Path inputPath = new Path("/data/pagerank/input/");
				
				if (i > 1) {
					inputPath = new Path("/data/pagerank/output/pr" + (i - 1));
				}
				FileInputFormat.addInputPath(job, inputPath);

				Path outpath = new Path("/data/pagerank/output/pr" + i);
				if (fs.exists(outpath)) {
					fs.delete(outpath, true);
				}
				FileOutputFormat.setOutputPath(job, outpath);

				boolean f = job.waitForCompletion(true);
				if (f) {
					System.out.println("success.");
					long sum = job.getCounters().findCounter(Mycounter.my).getValue();
					
					System.out.println(sum);
					double avgd = sum / 4000.0;
					if (avgd < d) {
						break;
					}
				}
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	}

	static class PageRankMapper extends Mapper<Text, Text, Text, Text> {
		protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
			
			int runCount = context.getConfiguration().getInt("runCount", 1);
			
			//A	   B D
			
			
			
			//A	   B D 0.3
			
			
			//K:A
			//V:B D
			//K:A
			//V:0.3 B D
			String page = key.toString();
			Node node = null;
			if (runCount == 1) {
				node = Node.fromMR("1.0" , value.toString());
			} else {
				node = Node.fromMR(value.toString());
			}
			// A   1.0 B D  傳遞老的pr值和對應的頁面關係
			context.write(new Text(page), new Text(node.toString()));
			
			if (node.containsAdjacentNodes()) {
				//票面值的計算
				double outValue = node.getPageRank() / node.getAdjacentNodeNames().length;
				for (int i = 0; i < node.getAdjacentNodeNames().length; i++) {
					String outPage = node.getAdjacentNodeNames()[i];
					// B:0.5
					// D:0.5    頁面A投給誰,誰作為key,val是票面值,票面值為:A的pr值除以超連結數量
					context.write(new Text(outPage), new Text(outValue + ""));
				}
			}
		}
	}

	static class PageRankReducer extends Reducer<Text, Text, Text, Text> {
		protected void reduce(Text key, Iterable<Text> iterable, Context context)
				throws IOException, InterruptedException {
			
			//相同的key為一組
			//key:頁面名稱比如B 
			//包含兩類資料
			//B:1.0 C  //頁面對應關係及老的pr值
			
			//B:0.5		//投票值
			//B:0.5
			
			
			double sum = 0.0;
			
			Node sourceNode = null;
			for (Text i : iterable) {
				Node node = Node.fromMR(i.toString());
				if (node.containsAdjacentNodes()) {
					sourceNode = node;
				} else {
					sum = sum + node.getPageRank();
				}
			}

			// 4為頁面總數
			double newPR = (0.15 / 4.0) + (0.85 * sum);
			System.out.println("*********** new pageRank value is " + newPR);

			// 把新的pr值和計算之前的pr比較
			double d = newPR - sourceNode.getPageRank();

			int j = (int) (d * 1000.0);
			j = Math.abs(j);
			System.out.println(j + "___________");
			context.getCounter(Mycounter.my).increment(j);

			sourceNode.setPageRank(newPR);
			context.write(key, new Text(sourceNode.toString()));
			//A  B D 0.8
		}
	}
}
package com.sxt.hadoop.mr.pagerank;

import java.io.IOException;
import java.util.Arrays;

import org.apache.commons.lang.StringUtils;

public class Node {

	private double pageRank = 1.0;
	private String[] adjacentNodeNames;

	public static final char fieldSeparator = '\t';

	public double getPageRank() {
		return pageRank;
	}

	public Node setPageRank(double pageRank) {
		this.pageRank = pageRank;
		return this;
	}

	public String[] getAdjacentNodeNames() {
		return adjacentNodeNames;
	}

	public Node setAdjacentNodeNames(String[] adjacentNodeNames) {
		this.adjacentNodeNames = adjacentNodeNames;
		return this;
	}

	public boolean containsAdjacentNodes() {
		return adjacentNodeNames != null && adjacentNodeNames.length > 0;
	}

	@Override
	public String toString() {
		StringBuilder sb = new StringBuilder();
		sb.append(pageRank);

		if (getAdjacentNodeNames() != null) {
			sb.append(fieldSeparator).append(
					StringUtils.join(getAdjacentNodeNames(), fieldSeparator));
		}
		return sb.toString();
	}

	// value =1.0 B D
	public static Node fromMR(String value) throws IOException {
		String[] parts = StringUtils.splitPreserveAllTokens(value,
				fieldSeparator);
		if (parts.length < 1) {
			throw new IOException("Expected 1 or more parts but received "
					+ parts.length);
		}
		Node node = new Node().setPageRank(Double.valueOf(parts[0]));
		if (parts.length > 1) {
			node.setAdjacentNodeNames(Arrays
					.copyOfRange(parts, 1, parts.length));
		}
		return node;
	}
	public static Node fromMR(String v1,String v2) throws IOException {
		return fromMR(v1+fieldSeparator+v2);
		//1.0	B D
	}
}