1. 程式人生 > >Hadoop 2.6 使用MapReduce實現基於物品的推薦系統

Hadoop 2.6 使用MapReduce實現基於物品的推薦系統

一、基於物品的推薦系統

1、餘弦相似度



例如,如果兩個向量完全相同,則其夾角為0度,cos = 1

如果兩個向量相互垂直,則其夾角為90度,cos=0,此時相似度最低

2、基於物品的協同過濾推薦演算法

思想:給使用者推薦那些和他們之前喜歡的商品相似的商品

步驟:





二、輸入

將useraction.txt檔案上傳到Hadoop HDFS /input目錄下

hadoop fs -put useraction.txt /input

(如果沒有該目錄則建立之)

hadoop fs -mkdir /input

A,1,1
C,3,5
B,2,3
B,5,3
B,6,5
A,2,10
C,3,10
C,4,5
C,1,5
A,1,1
A,6,5
A,4,3

每一行代表一次使用者行為,其中第一列為使用者ID,第二列為商品ID,第三列為事件分值

三、程式碼實現

step1:

根據使用者的行為列表計算使用者-商品的評分矩陣

package hadoop2;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;


public class Step1 {
	
	
	/***
	 * input:  /input/useraction.txt
	 * userID,itemID,score
	 * 	A,1,1
		C,3,5
		B,2,3
		B,5,3
		B,6,5
		A,2,10
		C,3,10
		C,4,5
		C,1,5
		A,1,1
		A,6,5
		A,4,3		
	 * output:
	 * (itemID,userID_score)
	 *	("1","A_1")
		("3","C_5")
		("2","B_3")
		("5","B_3")
		("6","B_5")
		("2","A_10")
		("3","C_10")
		("4","C_5")
		("1","C_5")
		("1","A_1")
		("6","A_5")
		("4","A_3")
	 * 
	 * 即map操作是將(使用者ID,物品ID,行為分值)轉為(物品ID,使用者ID,行為分值)
	 * @author chenjie
	 *
	 */
	public static class Mapper1 extends Mapper<LongWritable,Text,Text,Text>
	{
		private Text outKey = new Text();
		private Text outValue = new Text();
		
		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			//System.out.println("map,key=" + key + ",value=" + value.toString());
			String values[] = value.toString().split(",");
			String userID = values[0];
			String itemID = values[1];
			String score  = values[2];
			outKey.set(itemID);
			outValue.set(userID + "_" + score);
			context.write(outKey, outValue);
			System.out.println("(\"" + itemID + "\",\"" + userID + "_" + score + "\")");
		}
	}
	
	/***
	 * input:
	 * itemID [userID_socre...]
	 * 	("1",["A_1","C_5","A_1"])
		("2",["A_10","B_3"])
		("3",["C_10","C_5"])
		("4",["A_3","C_5"])
		("5",["B_3"])
		("6",["A_5","B_5"])
		
		output:
		itemID [userID_sumScore...]
		1	A_2,C_5
		2	A_10,B_3
		3	C_15
		4	A_3,C_5
		5	B_3
		6	A_5,B_5
		
		即reduce操作是將(物品ID,使用者ID,行為分值)中對於物品ID和使用者ID相同的行為分值進行累加
		如	("1",["A_1","C_5","A_1"])中對於1號物品,A號使用者,1+1=2
		那麼將1號物品,A號使用者,總分2分存在map中,(1,“A_2”)
		同理將1號物品,C號使用者,總分5分存在map中,(1,“C_5”)
		...
		然後將1號物品的所有資訊輸出  key:1	value:A_2,C_5
		同理將2號物品的所有資訊輸出  key:2  value:A_10,B_3
		...
	 * @author chenjie
	 *
	 */
	public static class Reducer1 extends Reducer<Text,Text,Text,Text>
	{
		private Text outKey = new Text();
		private Text outValue = new Text();
		
		@Override
		protected void reduce(Text key, Iterable<Text> values, Context context)
				throws IOException, InterruptedException {
			String itemID = key.toString();
			StringBuilder log = new StringBuilder();
			log.append("(\"" + itemID + "\",[");
			Map<String,Integer> map = new HashMap<String,Integer>();
			for(Text value : values)
			{
				log.append("\"" + value + "\",");
				String userID = value.toString().split("_")[0];
				String score = value.toString().split("_")[1];
				if(map.get(userID) == null)
				{
					map.put(userID, Integer.valueOf(score));
				}
				else
				{
					Integer preScore = map.get(userID);
					map.put(userID, preScore + Integer.valueOf(score));
				}
			}
			if(log.toString().endsWith(","))
				log.deleteCharAt(log.length()-1);
			log.append("])");
			System.out.println(log);
			StringBuilder sb = new StringBuilder();
			for(Map.Entry<String, Integer> entry : map.entrySet())
			{
				String userID = entry.getKey();
				String score = String.valueOf(entry.getValue());
				sb.append(userID + "_" + score + ",");
			}
			String line = null;
			if(sb.toString().endsWith(","))
			{
				line = sb.substring(0, sb.length()-1);
			}
			outKey.set(itemID);
			outValue.set(line);
			context.write(outKey, outValue);
		}
		
	}
	
	private static final String INPATH = "/input/useraction.txt";//輸入檔案路徑
	private static final String OUTPATH = "/output/tuijian1";//輸出檔案路徑
	private static final String HDFS = "hdfs://pc1:9000";//HDFS路徑
	
	public int run() throws IOException, ClassNotFoundException, InterruptedException {
		 Configuration conf = new Configuration();
		 conf.set("fs.defaultFS",HDFS);
		    //String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
		    //String[] otherArgs = {"hdfs://pc1:9000/input/chenjie.txt","hdfs://pc1:9000/output/out4"};
		    String[] otherArgs = {INPATH,OUTPATH};
		    //這裡需要配置引數即輸入和輸出的HDFS的檔案路徑
		    if (otherArgs.length != 2) {
		      System.err.println("Usage: wordcount <in> <out>");
		      System.exit(2);
		    }
		    //conf.set("fs.defaultFS",HDFS);
		   // JobConf conf1 = new JobConf(WordCount.class);
		    @SuppressWarnings("deprecation")
			Job job = new Job(conf, "step1");//Job(Configuration conf, String jobName) 設定job名稱和
		    job.setJarByClass(Step1.class);
		    job.setMapperClass(Mapper1.class); //為job設定Mapper類 
		    //job.setCombinerClass(IntSumReducer.class); //為job設定Combiner類  
		    job.setReducerClass(Reducer1.class); //為job設定Reduce類 

		    job.setMapOutputKeyClass(Text.class);  
		    job.setMapOutputValueClass(Text.class); 

		    job.setOutputKeyClass(Text.class);        //設定輸出key的型別
		    job.setOutputValueClass(Text.class);//  設定輸出value的型別

		    //TODO
		    job.setOutputFormatClass(TextOutputFormat.class);
		    FileInputFormat.addInputPath(job, new Path(otherArgs[0])); //為map-reduce任務設定InputFormat實現類   設定輸入路徑

		    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));//為map-reduce任務設定OutputFormat實現類  設定輸出路徑
		   
		    FileSystem fs = FileSystem.get(conf);
			Path outPath = new Path(OUTPATH);
			if(fs.exists(outPath))
			{
				fs.delete(outPath, true);
			}
		    
		
		    return job.waitForCompletion(true) ? 1 : -1;
		
		/*Configuration conf = new Configuration();
		conf.set("fs.defaultFS",HDFS);
		Job job = Job.getInstance(conf,"step1");
		job.setJarByClass(Step1.class);
		job.setMapperClass(Mapper1.class);
		job.setReducerClass(Reducer1.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		FileSystem fs = FileSystem.get(conf);
		Path inPath = new Path(INPATH);
		if(fs.exists(inPath))
		{
			//FileInputFormat.addInputPath(conf, inPath);
		}
		Path outPath = new Path(OUTPATH);
		if(fs.exists(outPath))
		{
			fs.delete(outPath, true);
		}*/
		
	}
	
	public static void main(String[] args)
	{
		try {
			new Step1().run();
		} catch (ClassNotFoundException | IOException | InterruptedException e) {
			e.printStackTrace();
		}
	}
	
}


step2:

計算每兩行的相似度,最終形成一個相似度矩陣

package hadoop2;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;


/***
 * 
 *計算每兩行的相似度,最終形成一個相似度矩陣
 * 
 * input
 * itemID [userID_sumScore...]
		1	A_2,C_5
		2	A_10,B_3
		3	C_15
		4	A_3,C_5
		5	B_3
		6	A_5,B_5
	同時將input拷貝一份到快取cache中,然後對input和cache的每一行就行求值
	output:
	itemID	[itemID_cos...]
	1	1_1.00,2_0.36,3_0.93,4_0.99,6_0.26
	2	1_0.36,2_1.00,4_0.49,5_0.29,6_0.88
	3	4_0.86,3_1.00,1_0.93
	4	1_0.99,4_1.00,6_0.36,3_0.86,2_0.49
	5	2_0.29,5_1.00,6_0.71
	6	1_0.26,5_0.71,6_1.00,2_0.88,4_0.36
 * @author chenjie
 *
 */
public class Step2 {
	
	/***
	 * input:
	 * itemID [userID_sumScore...]
		1	A_2,C_5
		2	A_10,B_3
		3	C_15
		4	A_3,C_5
		5	B_3
		6	A_5,B_5
		cache : = input
		output:
		1	1_1.00
		1	2_0.36
		1	3_0.93
		1	4_0.99
		1	6_0.26
		2	1_0.36
		2	2_1.00
		2	4_0.49
		2	5_0.29
		2	6_0.88
		3	1_0.93
		3	3_1.00
		3	4_0.86
		4	1_0.99
		4	2_0.49
		4	3_0.86
		4	4_1.00
		4	6_0.36
		5	2_0.29
		5	5_1.00
		5	6_0.71
		6	1_0.26
		6	2_0.88
		6	4_0.36
		6	5_0.71
		6	6_1.00
	 * @author chenjie
	 *
	 */
	public static class Mapper2 extends Mapper<LongWritable,Text,Text,Text>
	{
		private Text outKey = new Text();
		private Text outValue = new Text();
		private List<String> cacheList = new ArrayList<String>();
		
		private DecimalFormat df = new DecimalFormat("0.00");
		
		/***
		 * 	將檔案快取到記憶體中,每一行為一個字串,是所有行構成list
		 */
		@Override
		protected void setup(Context context)
				throws IOException, InterruptedException {
			FileReader fr = new FileReader("itemUserScore1");
			BufferedReader br = new BufferedReader(fr);
			String line = null;
			while((line = br.readLine()) != null)
			{
				cacheList.add(line);
			}
			fr.close();
			br.close();
		}
		
		/***
		 * 	以
		 * 	value :1	A_2,C_5
			cacheList : 2	A_10,B_3
			為例
		 */
		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			System.out.println("map,key=" + key + ",value=" + value.toString());
			String[] rowAndline = value.toString().split("\t");
			//獲得行號
			//rowAndline : 1	A_2,C_5
			String row_matrix1 = rowAndline[0];
			//row_matrix1 :1
			String[] column_value_array_matrix1 = rowAndline[1].split(",");
			//獲得各列
			//rowAndline[1] : A_2,C_5
			//column_value_array_matrix1 : [A_2,C_5]
			
			//|x|=sqrt(x1^2+x2^2+...)
			double denominator1 = 0;
			//定義向量1的模
			for(String colunm : column_value_array_matrix1)//對於向量1的每一個分量
			{
				String score = colunm.split("_")[1];
				denominator1 +=  Double.valueOf(score) * Double.valueOf(score);
				//計算分量的平方並累加到模
			}
			denominator1 = Math.sqrt(denominator1);//開跟號得到模
			
			
			for(String line : cacheList)// 以line 2	A_10,B_3  為例
			{
				String[] rowAndline2 = line.toString().split("\t");
				//rowAndline2 : 2	A_10,B_3
				String row_matrix2 = rowAndline2[0];
				//row_matrix2 :2
				String[] column_value_array_matrix2 = rowAndline2[1].split(",");
				//column_value_array_matrix2 : A_10,B_3
				
				double denominator2 = 0;//求向量2的模
				for(String colunm : column_value_array_matrix2)
				{
					String score = colunm.split("_")[1];
					denominator2 +=  Double.valueOf(score) * Double.valueOf(score);
				}
				denominator2 = Math.sqrt(denominator2);
				
				
				int numerator = 0;
				//儲存成績累加結果
				for(String column_value_matrix1 : column_value_array_matrix1)//對於向量1的每一列(分量) A_2,C_5
				{
					String column_maxtrix1 = column_value_matrix1.split("_")[0];
					//獲得使用者ID
					String value_matrix1 = column_value_matrix1.split("_")[1];
					//獲得分數
					
					for(String column_value_matrix2 : column_value_array_matrix2)//對於向量2的每一列(分量) A_10,B_3
					{
						String column_maxtrix2 = column_value_matrix2.split("_")[0];
						//獲得使用者ID
						String value_matrix2 = column_value_matrix2.split("_")[1];
						//獲得分數
						
						//如果是同一個分量
						if(column_maxtrix2.equals(column_maxtrix1))//這裡也體現了為什麼要標明列號,只有列號明確且相等,才證明是同一個位置的分量
						{
							numerator += Integer.valueOf(value_matrix1) * Integer.valueOf(value_matrix2);
							//numerator += 2×10
						}
					}
				}
				
				double cos = numerator / (denominator1 * denominator2);
				//求餘弦
				if(cos == 0)
					continue;
				outKey.set(row_matrix1);//輸出的key值設定為左側矩陣的行號
				outValue.set(row_matrix2 + "_" + df.format(cos));//輸出的value值設定為右側轉置矩陣的行號(實際矩陣的列號)_該位置的值
				context.write(outKey, outValue);
				System.out.println(outKey + "\t" + outValue);
			}
		}
	}
	
	/***
	 * input:
	 *  ("1",["1_1.00","2_0.36","3_0.93","4_0.99","6_0.26"])
		("2",["1_0.36","2_1.00","4_0.49","5_0.29","6_0.88"])
		("3",["4_0.86","3_1.00","1_0.93"])
		("4",["1_0.99","4_1.00","6_0.36","3_0.86","2_0.49"])
		("5",["2_0.29","5_1.00","6_0.71"])
		("6",["1_0.26","5_0.71","6_1.00","2_0.88","4_0.36"])
		
		output:
		1	1_1.00,2_0.36,3_0.93,4_0.99,6_0.26
		2	1_0.36,2_1.00,4_0.49,5_0.29,6_0.88
		3	4_0.86,3_1.00,1_0.93
		4	1_0.99,4_1.00,6_0.36,3_0.86,2_0.49
		5	2_0.29,5_1.00,6_0.71
		6	1_0.26,5_0.71,6_1.00,2_0.88,4_0.36

		即將分量連起來
		得到最終的相似度矩陣
	 * 
	 * @author chenjie
	 *
	 */
	public static class Reducer2 extends Reducer<Text,Text,Text,Text>
	{
		private Text outKey = new Text();
		private Text outValue = new Text();
		
	
		@Override
		protected void reduce(Text key, Iterable<Text> values, Context context)
				throws IOException, InterruptedException {
			//System.out.println(ReduceUtils.getReduceInpt(key, values));
			//只能遍歷一次?
			StringBuilder sb = new StringBuilder();
			for(Text text : values)
			{
				sb.append(text + ",");
			}
			String line = "";
			if(sb.toString().endsWith(","))
			{
				line = sb.substring(0,sb.length()-1);
			}
			outKey.set(key);
			outValue.set(line);
			context.write(outKey, outValue);
		}
		
	}
	
	
	//private static final String INPATH = "/input/itemUserScore1.txt";
	private static final String INPATH = "/output/tuijian1/part-r-00000";
	private static final String OUTPATH = "/output/tuijian2";
	
	//private static final String CACHE = "/input/itemUserScore1.txt";
	private static final String CACHE = "/output/tuijian1/part-r-00000";
	private static final String HDFS = "hdfs://pc1:9000";
	
	public int run() throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
		 Configuration conf = new Configuration();
		 conf.set("fs.defaultFS",HDFS);
		    //String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
		    //String[] otherArgs = {"hdfs://pc1:9000/input/chenjie.txt","hdfs://pc1:9000/output/out4"};
		    String[] otherArgs = {INPATH,OUTPATH};
		    //這裡需要配置引數即輸入和輸出的HDFS的檔案路徑
		    if (otherArgs.length != 2) {
		      System.err.println("Usage: wordcount <in> <out>");
		      System.exit(2);
		    }
		    //conf.set("fs.defaultFS",HDFS);
		   // JobConf conf1 = new JobConf(WordCount.class);
		    @SuppressWarnings("deprecation")
			Job job = new Job(conf, "step2");//Job(Configuration conf, String jobName) 設定job名稱和
		    job.setJarByClass(Step2.class);
		    job.setMapperClass(Mapper2.class); //為job設定Mapper類 
		    //job.setCombinerClass(IntSumReducer.class); //為job設定Combiner類  
		    job.setReducerClass(Reducer2.class); //為job設定Reduce類 

		    job.addCacheArchive(new URI(CACHE + "#itemUserScore1"));
		    
		    job.setMapOutputKeyClass(Text.class);  
		    job.setMapOutputValueClass(Text.class); 

		    job.setOutputKeyClass(Text.class);        //設定輸出key的型別
		    job.setOutputValueClass(Text.class);//  設定輸出value的型別

		    //TODO  
		    //job.setOutputFormatClass(SequenceFileOutputFormat.class);
		    job.setOutputFormatClass(TextOutputFormat.class);
		    FileInputFormat.addInputPath(job, new Path(otherArgs[0])); //為map-reduce任務設定InputFormat實現類   設定輸入路徑
		    	
		    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));//為map-reduce任務設定OutputFormat實現類  設定輸出路徑
		  
		    FileSystem fs = FileSystem.get(conf);
			Path outPath = new Path(OUTPATH);
			if(fs.exists(outPath))
			{
				fs.delete(outPath, true);
			}
		    
			return job.waitForCompletion(true) ? 1 : -1;
		    
		    
		
		/*Configuration conf = new Configuration();
		conf.set("fs.defaultFS",HDFS);
		Job job = Job.getInstance(conf,"step1");
		job.setJarByClass(Step1.class);
		job.setMapperClass(Mapper1.class);
		job.setReducerClass(Reducer1.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		FileSystem fs = FileSystem.get(conf);
		Path inPath = new Path(INPATH);
		if(fs.exists(inPath))
		{
			//FileInputFormat.addInputPath(conf, inPath);
		}
		Path outPath = new Path(OUTPATH);
		if(fs.exists(outPath))
		{
			fs.delete(outPath, true);
		}*/
		
	}
	
	public static void main(String[] args)
	{
		try {
			new Step2().run();
		} catch (ClassNotFoundException | IOException | InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (URISyntaxException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
}

step3:轉置推薦列表(為什麼要轉置以及如何轉置的細節請看我之前的博文)

package hadoop2;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

/**
 * 矩陣相乘:
 * 1、轉置評分矩陣
 * 2、相似度矩陣 與 (轉置評分矩陣)
 * 這裡進行1:轉置
 * 
 * input:
 * 	1	A_2,C_5
	2	A_10,B_3
	3	C_15
	4	A_3,C_5
	5	B_3
	6	A_5,B_5
	output:
	A	6_5,4_3,2_10,1_2
	B	6_5,5_3,2_3
	C	4_5,3_15,1_5
 * @author chenjie
 *
 */
public class Step3 {
	public static class Mapper3 extends Mapper<LongWritable,Text,Text,Text>
	{
		private Text outKey = new Text();
		private Text outValue = new Text();
		
		
		//對於每一行,以第一行為例
		//key : 1
		//value : "1	1_0,2_3,3_-1,4_2,5_-3"
		@Override
		protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
				throws IOException, InterruptedException {
			String[] rowAndline = value.toString().split("\t");
			//rowAndline : {"1","1_0,2_3,3_-1,4_2,5_-3"}
			String row = rowAndline[0];
			//row "1"
			String[] lines = rowAndline[1].split(",");
			//rowAndline[1] : "1_0,2_3,3_-1,4_2,5_-3"
			//lines : {"1_0","2_3","3_-1","4_2","5_-3"}
			for(String line : lines)//對於每一列,以第一列為例,line "1_0"
			{
				String colunm = line.split("_")[0];
				//colunm : 1
				String valueStr = line.split("_")[1];
				//valueStr : 0 
				outKey.set(colunm);
				//將列作為行
				outValue.set(row + "_" + valueStr);
				//將行作為列
				context.write(outKey, outValue);
				// 產生(1,"1_0")
			}
			//迴圈結束,對於{"1_0","2_3","3_-1","4_2","5_-3"}
			//產生(1,"1_0") 第一行,第一列_0    (2,"1_3")  第二行,第一列_3		(3,"1_-1") (4,"1_2")(5,"1_-3")
			/*
			目標轉置矩陣
			0	1	1	-2
			3	3	1	2
			-1	5	4	-1
			2	-2	-1	1
			-3	-1	2	2
			*/
			//正好對應於轉置矩陣的第一列
		}
		/*
			所有map操作產生
			 ("1","1_0")	("2","1_3") 	("3","1_-1")	("4","1_2")		("5","1_-3")
			("1","2_1")	("2","2_3") 	("3","2_5")	    ("4","2_-2")	("5","2_-1")
			("1","3_0")	("2","3_1")	    ("3","3_4")		("4","3_-1")	("5","3_2")
			("1","4_-2")  ("2","4_2")	    ("3","4_-1")	("4","4_1")		("5","4_2")
		*/

	}
	

	/*
		Reduce任務,將map操作產生的所有鍵值對集合進行合併,生成轉置矩陣的儲存表示
		key值相同的值會組成值的集合
		如:
		key:"1"時
		values:{"3_0","1_0","4_-2","2_1"} 
		注意:這裡就是為什麼要進行列標號的原因,values的順序不一定就是原來矩陣列的順序
	*/
	
	public static class Reducer3 extends Reducer<Text,Text,Text,Text>
	{
		private Text outKey = new Text();
		private Text outValue = new Text();
		
		@Override
		protected void reduce(Text key, Iterable<Text> values, Context context)
				throws IOException, InterruptedException {
			
			StringBuilder sb = new StringBuilder();
			for(Text text : values)
			{
				sb.append(text + ",");
			}
			//sb : "3_0,1_0,4_-2,2_1,"
			//注意這裡末尾有個逗號
			String line = "";
			if(sb.toString().endsWith(","))
			{
				line = sb.substring(0,sb.length()-1);
			}
			//去掉逗號
			//line : "3_0,1_0,4_-2,2_1"
			outKey.set(key);
			outValue.set(line);
			//("1","3_0,1_0,4_-2,2_1")
			context.write(outKey, outValue);
		}
		
	}
	
	private static final String INPATH = "hdfs://pc1:9000/output/tuijian1/part-r-00000";//輸入檔案路徑
	private static final String OUTPATH = "hdfs://pc1:9000/output/tuijian3";//輸出檔案路徑
	private static final String HDFS = "hdfs://pc1:9000";//HDFS路徑
	
	public int run() throws IOException, ClassNotFoundException, InterruptedException {
		 Configuration conf = new Configuration();
		 conf.set("fs.defaultFS",HDFS);
		    //String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
		    //String[] otherArgs = {"hdfs://pc1:9000/input/chenjie.txt","hdfs://pc1:9000/output/out4"};
		    String[] otherArgs = {INPATH,OUTPATH};
		    //這裡需要配置引數即輸入和輸出的HDFS的檔案路徑
		    if (otherArgs.length != 2) {
		      System.err.println("Usage: wordcount <in> <out>");
		      System.exit(2);
		    }
		    //conf.set("fs.defaultFS",HDFS);
		   // JobConf conf1 = new JobConf(WordCount.class);
		    Job job = new Job(conf, "step3");//Job(Configuration conf, String jobName) 設定job名稱和
		    job.setJarByClass(Step3.class);
		    job.setMapperClass(Mapper3.class); //為job設定Mapper類 
		    //job.setCombinerClass(IntSumReducer.class); //為job設定Combiner類  
		    job.setReducerClass(Reducer3.class); //為job設定Reduce類 

		    job.setMapOutputKeyClass(Text.class);  
		    job.setMapOutputValueClass(Text.class); 

		    job.setOutputKeyClass(Text.class);        //設定輸出key的型別
		    job.setOutputValueClass(Text.class);//  設定輸出value的型別

		    job.setOutputFormatClass(TextOutputFormat.class);
		    FileInputFormat.addInputPath(job, new Path(otherArgs[0])); //為map-reduce任務設定InputFormat實現類   設定輸入路徑

		    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));//為map-reduce任務設定OutputFormat實現類  設定輸出路徑
		    FileSystem fs = FileSystem.get(conf);
			Path outPath = new Path(OUTPATH);
			if(fs.exists(outPath))
			{
				fs.delete(outPath, true);
			}
		    
			return job.waitForCompletion(true) ? 1 : -1;
		
		
		/*Configuration conf = new Configuration();
		conf.set("fs.defaultFS",HDFS);
		Job job = Job.getInstance(conf,"step1");
		job.setJarByClass(Step1.class);
		job.setMapperClass(Mapper1.class);
		job.setReducerClass(Reducer1.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		FileSystem fs = FileSystem.get(conf);
		Path inPath = new Path(INPATH);
		if(fs.exists(inPath))
		{
			//FileInputFormat.addInputPath(conf, inPath);
		}
		Path outPath = new Path(OUTPATH);
		if(fs.exists(outPath))
		{
			fs.delete(outPath, true);
		}*/
		
	}
	
	public static void main(String[] args)
	{
		try {
			new Step3().run();
		} catch (ClassNotFoundException | IOException | InterruptedException e) {
			e.printStackTrace();
		}
	}
	
}

step4:相似度矩陣 × 轉置評分矩陣=推薦列表(矩陣乘法請看之前博文)

package hadoop2;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

/**
 * 矩陣相乘:
 * 1、轉置評分矩陣
 * 2、相似度矩陣 與 (轉置評分矩陣)
 * 這裡進行2:相似度矩陣 與 (轉置評分矩陣)相乘
 * input:
 	1	1_1.00,2_0.36,3_0.93,4_0.99,6_0.26
	2	1_0.36,2_1.00,4_0.49,5_0.29,6_0.88
	3	4_0.86,3_1.00,1_0.93
	4	1_0.99,4_1.00,6_0.36,3_0.86,2_0.49
	5	2_0.29,5_1.00,6_0.71
	6	1_0.26,5_0.71,6_1.00,2_0.88,4_0.36
 * 
 * cache:
 * 	A	6_5,4_3,2_10,1_2
	B	6_5,5_3,2_3
	C	4_5,3_15,1_5
	
	output:
	1	A_9.87,B_2.38,C_23.90
	2	A_16.59,B_8.27,C_4.25
	3	C_23.95,A_4.44
	4	B_3.27,C_22.85,A_11.68
	5	A_6.45,B_7.42
	6	C_3.10,A_15.40,B_9.77
	
	如:
	map
	1	1_1.00,2_0.36,3_0.93,4_0.99,6_0.26
	×
	A	6_5,4_3,2_10,1_2
	=
	1.00*2+0.36*10+0.99*3+0.26*5
	=9.87
	生成(1,A_9.9)
	reduce 將所有的合併生成推薦列表
 * @author chenjie
 *
 */
public class Step4 {
	public static class Mapper4 extends Mapper<LongWritable,Text,Text,Text>
	{
		private Text outKey = new Text();
		private Text outValue = new Text();
		private List<String> cacheList = new ArrayList<String>();
		
		private DecimalFormat df = new DecimalFormat("0.00");
		
		/***
		 * 	將儲存右側矩陣的檔案快取到記憶體中,每一行為一個字串,是所有行構成list
		 */
		@Override
		protected void setup(Context context)
				throws IOException, InterruptedException {
			FileReader fr = new FileReader("myfile");
			BufferedReader br = new BufferedReader(fr);
			String line = null;
			while((line = br.readLine()) != null)
			{
				cacheList.add(line);
				System.out.println("----------------------cache line :" + line);
			}
			fr.close();
			br.close();
		}
		
		
		/*	左側矩陣邏輯形式
		 * 1	2	-2	0
		 * 3	3	4	-3
		 * -2	0	2	3
		 * 5	3	-1	2
		 * -4	2	0	2
		 * 左側矩陣物理形式
		 * 1	1_1,2_2,3_-2,4_0
		 * 2	1_3,2_3,3_4,4_-3
		 * 3	1_-2,2_0,3_2,4_3
		 * 4	1_5,2_3,3_-1,4_2
		 * 5	1_-4,2_2,3_0,4_2
		 * 
		 * 右側矩陣(已轉置)物理形式
		 *  1	3_0,1_0,4_-2,2_1
			2	3_1,4_2,2_3,1_3
			3	4_-1,1_-1,3_4,2_5
			4	1_2,3_-1,4_1,2_-2
			5	4_2,3_2,1_-3,2_-1
			
			key: "1"
			value: "1	1_1,2_2,3_-2,4_0"
		 * */
		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException {
			System.out.println("-------------------map,key=" + key + "value=" + value);
			String[] rowAndline = value.toString().split("\t");
			//獲得行號
			//rowAndline : {"1","1_1,2_2,3_-2,4_0"}
			String row_matrix1 = rowAndline[0];
			//row_matrix1 :"1"
			String[] column_value_array_matrix1 = rowAndline[1].split(",");
			//獲得各列
			//rowAndline[1] : "1_1,2_2,3_-2,4_0"
			//column_value_array_matrix1 : {"1_1","2_2","3_-2","4_0"}
			for(String line : cacheList)// 以line:"3		4_-1,1_-1,3_4,2_5"為例
			{
				String[] rowAndline2 = line.toString().split("\t");
				//rowAndline2 : {"3","4_-1,1_-1,3_4,2_5"}
				String row_matrix2 = rowAndline2[0];
				//獲得轉置矩陣line行的行號(原右矩陣的列號)
				String[] column_value_array_matrix2 = rowAndline2[1].split(",");
				//rowAndline2[1] : "4_-1,1_-1,3_4,2_5"
				//column_value_array_matrix2 : {"4_-1","1,-1","3_4","2_5"}
				double result = 0;
				//儲存成績累加結果
				for(String column_value_matrix1 : column_value_array_matrix1)//對於左側矩陣line行的每一列(分量) "1_1","2_2","3_-2","4_0"
				{
					String column_maxtrix1 = column_value_matrix1.split("_")[0];
					//獲得列號
					String value_matrix1 = column_value_matrix1.split("_")[1];
					//獲得該列的值
					
					for(String column_value_matrix2 : column_value_array_matrix2)//對於右側矩陣的line行的每一列(分量) "4_-1","1,-1","3_4","2_5"
					{
						String column_maxtrix2 = column_value_matrix2.split("_")[0];
						//獲得列號
						String value_matrix2 = column_value_matrix2.split("_")[1];
						//獲得該列的值
						
						if(column_maxtrix2.equals(column_maxtrix1))//這裡也體現了為什麼要標明列號,只有列號明確且相等,才證明是同一個位置的分量
						{
							result += Double.valueOf(value_matrix1) * Double.valueOf(value_matrix2);
							//result += 1 * (-1)
							//result += 2 * 5
							//result += -2 * 4
							//result += 0 * (-1)
						}
					}
				}
				if(result == 0)
					continue;
				
				outKey.set(row_matrix1);//輸出的key值設定為左側矩陣的行號
				outValue.set(row_matrix2 + "_" +df.format(result));//輸出的value值設定為右側轉置矩陣的行號(實際矩陣的列號)_該位置的值
				context.write(outKey, outValue);
				//("1","3_1") 
			}
			//("1","2_7")("1,"3_1")("1","2_4")("1","4_0")("1","5_9")
			//("2","1_9")...
			//....
		}
	}
	
	
	public static class Reducer4 extends Reducer<Text,Text,Text,Text>
	{
		private Text outKey = new Text();
		private Text outValue = new Text();
		
		/**
		 * 將map產生的key-value對進行組合,拼接成結果矩陣的物理形式
		 * ("1","2_7")("1,"3_1")("1","2_4")("1","4_0")("1","5_9")
		 * ("2","1_9")...
		 * ...
		 * 對於key值相同的元素("1","2_7")("1,"3_1")("1","2_4")("1","4_0")("1","5_9")
		 * 會將其組合
		 * key : "1"
		 * values : {"2_7","3_1","2_4","4_0","5_9"}
		 *
		 */
		@Override
		protected void reduce(Text key, Iterable<Text> values, Context context)
				throws IOException, InterruptedException {
			
			StringBuilder sb = new StringBuilder();
			for(Text text : values)
			{
				sb.append(text + ",");
			}
			// sb : "2_7,3_1,2_4,4_0,5_9,"
			String line = "";
			if(sb.toString().endsWith(","))
			{
				line = sb.substring(0,sb.length()-1);
			}
			//line :"2_7,3_1,2_4,4_0,5_9"
			outKey.set(key);
			outValue.set(line);
			context.write(outKey, outValue);
			// ("1","2_7,3_1,2_4,4_0,5_9")
		}
		
	}
	
	
	private static final String INPATH = "hdfs://pc1:9000/output/tuijian2/part-r-00000";
	private static final String OUTPATH = "hdfs://pc1:9000/output/tuijian4";
	
	private static final String CACHE = "hdfs://pc1:9000/output/tuijian3/part-r-00000";
	private static final String HDFS = "hdfs://pc1:9000";
	
	public int run() throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
		 Configuration conf = new Configuration();
		 conf.set("fs.defaultFS",HDFS);
		    //String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
		    //String[] otherArgs = {"hdfs://pc1:9000/input/chenjie.txt","hdfs://pc1:9000/output/out4"};
		    String[] otherArgs = {INPATH,OUTPATH};
		    //這裡需要配置引數即輸入和輸出的HDFS的檔案路徑
		    if (otherArgs.length != 2) {
		      System.err.println("Usage: wordcount <in> <out>");
		      System.exit(2);
		    }
		    //conf.set("fs.defaultFS",HDFS);
		   // JobConf conf1 = new JobConf(WordCount.class);
		    Job job = new Job(conf, "step4");//Job(Configuration conf, String jobName) 設定job名稱和
		    job.setJarByClass(Step4.class);
		    job.setMapperClass(Mapper4.class); //為job設定Mapper類 
		    //job.setCombinerClass(IntSumReducer.class); //為job設定Combiner類  
		    job.setReducerClass(Reducer4.class); //為job設定Reduce類 

		    job.addCacheArchive(new URI(CACHE + "#myfile"));
		    
		    job.setMapOutputKeyClass(Text.class);  
		    job.setMapOutputValueClass(Text.class); 

		    job.setOutputKeyClass(Text.class);        //設定輸出key的型別
		    job.setOutputValueClass(Text.class);//  設定輸出value的型別

		    job.setOutputFormatClass(TextOutputFormat.class);
		    FileInputFormat.addInputPath(job, new Path(otherArgs[0])); //為map-reduce任務設定InputFormat實現類   設定輸入路徑

		    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));//為map-reduce任務設定OutputFormat實現類  設定輸出路徑
		    
		    FileSystem fs = FileSystem.get(conf);
			Path outPath = new Path(OUTPATH);
			if(fs.exists(outPath))
			{
				fs.delete(outPath, true);
			}
			return job.waitForCompletion(true) ? 1 : -1;
		
		
		/*Configuration conf = new Configuration();
		conf.set("fs.defaultFS",HDFS);
		Job job = Job.getInstance(conf,"step1");
		job.setJarByClass(Step1.class);
		job.setMapperClass(Mapper1.class);
		job.setReducerClass(Reducer1.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		FileSystem fs = FileSystem.get(conf);
		Path inPath = new Path(INPATH);
		if(fs.exists(inPath))
		{
			//FileInputFormat.addInputPath(conf, inPath);
		}
		Path outPath = new Path(OUTPATH);
		if(fs.exists(outPath))
		{
			fs.delete(outPath, true);
		}*/
		
	}
	
	public static void main(String[] args)
	{
		try {
			new Step4().run();
		} catch (ClassNotFoundException | IOException | InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (URISyntaxException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}
}

5、推薦列表去重

package hadoop2;

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.Reducer.Context;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;


/***
 * 去掉推薦列表中,使用者已經操作過的商品,例如使用者A已經購買過iphone7,則將iphone7從推薦列表中刪除
 * input:相似度矩陣
 *	1	A_9.87,B_2.38,C_23.90
	2	A_16.59,B_8.27,C_4.25
	3	C_23.95,A_4.44
	4	B_3.27,C_22.85,A_11.68
	5	A_6.45,B_7.42
	6	C_3.10,A_15.40,B_9.77
 * cache:操作記錄
 *  1	A_2,C_5
	2	A_10,B_3
	3	C_15
	4	A_3,C_5
	5	B_3
	6	A_5,B_5
	
	map:
	例如
	1商品的推薦列表:1		A_9.87,B_2.38,C_23.90
	1商品的操作記錄:1		A_2,C_5
	則對於1商品,由於A已經有2分,C已經右5分
	應該把A和C從1的推薦列表中刪除,
	只保留B
	而最終是要根據使用者來推薦商品,於是將使用者作為key,物品和推薦度作為value返回
	(B,1_2.38)
	
	reduce:
	將同一使用者推薦的商品合併輸出
	
	output:
	A	5_6.45,3_4.44
	B	4_3.27,1_2.38
	C	6_3.10,2_4.25
 * @author chenjie
 *
 */
public class Step5 {

	public static class Mapper5  extends Mapper<LongWritable,Text,Text,Text>
	{
		private Text outKey = new Text();
		private Text outValue = new Text();
		private List<String> cacheList = new ArrayList<String>();
		
		private DecimalFormat df = new DecimalFormat("0.00");
		
		/***
		 * 	將儲存右側矩陣的檔案快取到記憶體中,每一行為一個字串,是所有行構成list
		 */
		@Override
		protected void setup(Context context)
				throws IOException, InterruptedException {
			FileReader fr = new FileReader("itemUserScore3");
			BufferedReader br = new BufferedReader(fr);
			String line = null;
			while((line = br.readLine()) != null)
			{
				cacheList.add(line);
				System.out.println("----------------------cache line :" + line);
			}
			fr.close();
			br.close();
		}
		
		
		/**
		 * 以
		 * 	1商品的推薦列表:1		A_9.87,B_2.38,C_23.90
			1商品的操作記錄:1		A_2,C_5
			為例
		 */
		@Override
		protected void map(LongWritable key, Text value, Context context)
				throws IOException, InterruptedException 
		{
			System.out.println("-------------------map,key=" + key + "value=" + value);
			String item_matrix1 = value.toString().split("\t")[0];
			//推薦列表商品號 1
			String[] user_score_array_matrix1 = value.toString().split("\t")[1].split(",");
			//推薦列表 A_9.87,B_2.38,C_23.90
			
			for(String line : cacheList)//商品的操作記錄列表
			{
				String item_matrix2 = line.toString().split("\t")[0];
				//操作記錄商品號 1
				
				String[] user_score_array_matrix2 = line.toString().split("\t")[1].split(",");
				//操作記錄  A_2,C_5
				
				if(item_matrix1.equals(item_matrix2))//如果推薦列表商品號==操作記錄商品號,證明是同一商品,才能操作
				{
					for(String user_score : user_score_array_matrix1)//對於推薦列表中每一個使用者  A_9.87,B_2.38,C_23.90
					{
						boolean flag = false;//預設操作過標誌位
						String user_matrix1 = user_score.split("_")[0];
						//使用者ID 
						String score_matrix1 = user_score.split("_")[1];
						//推薦度
						
						for(String user_score2 : user_score_array_matrix2)//對於操作記錄中的每一條記錄  A_2,C_5
						{
							String user_matrix2 = user_score2.split("_")[0];
							//使用者ID
							if(user_matrix1.equals(user_matrix2))//如果兩個ID相等 如A_9.87 和A_2 則證明使用者A操作過該商品
							{
								flag = true;
							}
						}
						if(flag == false)//如果使用者A沒有操作過該物品
						{
							outKey.set(user_matrix1);//將使用者ID作為Key
							outValue.set(item_matrix1 + "_" +score_matrix1 );//將商品ID_推薦度作為value
							context.write(outKey, outValue);//寫入結果集
						}
					}
				}
			}
		}
	}
	
	
	
	public static class Reducer5 extends Reducer<Text,Text,Text,Text>
	{
		private Text outKey = new Text();
		private Text outValue = new Text();
		
		/**
		 * 將map產生的key-value對進行組合,拼接成結果矩陣的物理形式
		 * ("1","2_7")("1,"3_1")("1","2_4")("1","4_0")("1","5_9")
		 * ("2","1_9")...
		 * ...
		 * 對於key值相同的元素("1","2_7")("1,"3_1")("1","2_4")("1","4_0")("1","5_9")
		 * 會將其組合
		 * key : "1"
		 * values : {"2_7","3_1","2_4","4_0","5_9"}
		 *
		 */
		@Override
		protected void reduce(Text key, Iterable<Text> values, Context context)
				throws IOException, InterruptedException {
			
			StringBuilder sb = new StringBuilder();
			for(Text text : values)
			{
				sb.append(text + ",");
			}
			// sb : "2_7,3_1,2_4,4_0,5_9,"
			String line = "";
			if(sb.toString().endsWith(","))
			{
				line = sb.substring(0,sb.length()-1);
			}
			//line :"2_7,3_1,2_4,4_0,5_9"
			outKey.set(key);
			outValue.set(line);
			context.write(outKey, outValue);
			// ("1","2_7,3_1,2_4,4_0,5_9")
		}
	}
	
	
	private static final String INPATH = "hdfs://pc1:9000/output/tuijian4/part-r-00000";
	private static final String OUTPATH = "hdfs://pc1:9000/output/tuijian5";
	
	private static final String CACHE = "hdfs://pc1:9000/output/tuijian1/part-r-00000";
	private static final String HDFS = "hdfs://pc1:9000";
	
	public int run() throws IOException, ClassNotFoundException, InterruptedException, URISyntaxException {
		 Configuration conf = new Configuration();
		 conf.set("fs.defaultFS",HDFS);
		    //String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
		    //String[] otherArgs = {"hdfs://pc1:9000/input/chenjie.txt","hdfs://pc1:9000/output/out4"};
		    String[] otherArgs = {INPATH,OUTPATH};
		    //這裡需要配置引數即輸入和輸出的HDFS的檔案路徑
		    if (otherArgs.length != 2) {
		      System.err.println("Usage: wordcount <in> <out>");
		      System.exit(2);
		    }
		    //conf.set("fs.defaultFS",HDFS);
		   // JobConf conf1 = new JobConf(WordCount.class);
		    Job job = new Job(conf, "step4");//Job(Configuration conf, String jobName) 設定job名稱和
		    job.setJarByClass(Step5.class);
		    job.setMapperClass(Mapper5.class); //為job設定Mapper類 
		    //job.setCombinerClass(IntSumReducer.class); //為job設定Combiner類  
		    job.setReducerClass(Reducer5.class); //為job設定Reduce類 

		    job.addCacheArchive(new URI(CACHE + "#itemUserScore3"));
		    
		    job.setMapOutputKeyClass(Text.class);  
		    job.setMapOutputValueClass(Text.class); 

		    job.setOutputKeyClass(Text.class);        //設定輸出key的型別
		    job.setOutputValueClass(Text.class);//  設定輸出value的型別

		    job.setOutputFormatClass(TextOutputFormat.class);
		    FileInputFormat.addInputPath(job, new Path(otherArgs[0])); //為map-reduce任務設定InputFormat實現類   設定輸入路徑

		    FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));//為map-reduce任務設定OutputFormat實現類  設定輸出路徑
		    
		    FileSystem fs = FileSystem.get(conf);
			Path outPath = new Path(OUTPATH);
			if(fs.exists(outPath))
			{
				fs.delete(outPath, true);
			}
			return job.waitForCompletion(true) ? 1 : -1;
		
		
		/*Configuration conf = new Configuration();
		conf.set("fs.defaultFS",HDFS);
		Job job = Job.getInstance(conf,"step1");
		job.setJarByClass(Step1.class);
		job.setMapperClass(Mapper1.class);
		job.setReducerClass(Reducer1.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(Text.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(Text.class);
		FileSystem fs = FileSystem.get(conf);
		Path inPath = new Path(INPATH);
		if(fs.exists(inPath))
		{
			//FileInputFormat.addInputPath(conf, inPath);
		}
		Path outPath = new Path(OUTPATH);
		if(fs.exists(outPath))
		{
			fs.delete(outPath, true);
		}*/
		
	}
	
	public static void main(String[] args)
	{
		try {
			new Step5().run();
		} catch (ClassNotFoundException | IOException | InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		} catch (URISyntaxException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
	}

}

6、排程程式

每個程式可單獨執行檢視結果,也可序列

package hadoop2;

import java.io.IOException;
import java.net.URISyntaxException;

public class JobRunner {

	public static void main(String[] args) {
		int result1 = -1;
		int result2 = -1;
		int result3 = -1;
		int result4 = -1;
		int result5 = -1;
		try {
			result1 = new Step1().run();
		}
		catch (Exception e) {
			result1 = -1;
		}
		if(result1 == 1)
		{
			System.out.println("Step1 run success");
			try {
				result2 = new Step2().run();
			} catch (ClassNotFoundException | IOException | InterruptedException | URISyntaxException e) {
				result2 = -1;
			}
		}
		else
		{
			System.out.println("Step1 run failed");
		}
		
		if(result2 == 1)
		{
			System.out.println("Step2 run success");
			try {
				result3 = new Step3().run();
			} catch (Exception e) {
				result3 = -1;
			}
		}
		else
		{
			System.out.println("Step2 run failed");
		}
		
		
		if(result3 == 1)
		{
			System.out.println("Step3 run success");
			try {
				result4 = new Step4().run();
			} catch (Exception e) {
				result4 = -1;
			}
		}
		else
		{
			System.out.println("Step3 run failed");
		}
		
		if(result4 == 1)
		{
			System.out.println("Step4 run success");
			try {
				result5 = new Step5().run();
			} catch (Exception e) {
				result5 = -1;
			}
		}
		else
		{
			System.out.println("Step4 run failed");
		}
		
		if(result5 == 1)
		{
			System.out.println("Step5 run success");
			System.out.println("job finished ");
		}
		else
		{
			System.out.println("Step5 run failed");
		}
		
	}

}

7、各步的結果截圖